Shaun Xu

The Sheep-Pen of the Shaun


News

logo

Shaun, the author of this blog is a semi-geek, clumsy developer, passionate speaker and incapable architect with about 10 years experience in .NET. He hopes to prove that software development is art rather than manufacturing. He's into cloud computing platform and technologies (Windows Azure, Aliyun) as well as WCF and ASP.NET MVC. Recently he's falling in love with JavaScript and Node.js.

Currently Shaun is working at IGT Technology Development (Beijing) Co., Ltd. as the architect responsible for product framework design and development.

MVP


In the previous posts we talked about the transport extension that scaling-out our WCF services over a message bus transportation. What we have done is to use request reply MEP as a example, and implement the asynchronous methods that makes our transport support the normal WCF usage – ServiceHost and ChannelFactory on service hosting and client invoking.

Request reply MEP is the most common mode when we are using WCF, but there are two MEPs available in WCF as well: datagram and duplex. In this post I will demonstrate how to implement the datagram MEP in our solution, with some code refectoring.

 

Datagram MEP

Datagram MEP, also known as the one-way, request-forget or fire-forget MEP. Different from the request reply, in this mode the client will send the request to the server and no need to wait for the reply. This means, on one hand, the client will not be blocked by the server reply. On the other hand, the client will neither know if the server received the message, nor the service works properly. This MEP is widely used in scenario like logging, auditing, etc..

image

On the client side, the OutputChannel will send the request message to the underlying transportation, while on the server side, the InputChannel will receive them and dispatched by WCF to the service class and related method.

The OutputChannel just sends the message to the message bus, in our case, and will not wait for any response. It will return back immediately. And the InputChannel on the server will receive the message and process the business logic without sending back any reply.

Hence in order to implement the datagram MEP what we need to do is,

  • Client side, implement a ChannelFactory that returns an OutputChannel, and implement the OutputChannel which can send message to the bus.
  • Server side, implement a ChannelListener that returns an InputChannel, and implement the InputChannel which can receive message from the bus.

Since we had created a ChannelFactory and ChannelListener for the request-reply mode, and the channels works well with our message bus before, then we will do some code refactoring first.

 

ChannelBase, ChannelFactoryBase and ChannelListenerBase

We will have three base classes that covers some common procedures for channels, channel factories and channel listeners. The channel base class will just receive the buffer manager, encoder factory and provide them as a protected readonly properties. We will also have a property named ChannelID, which will be used in the duplex MEP in the next post.

In the request channel and reply channel we have a lot of code that convert the WCF message into string and vice versa, which using the buffer manager and encoder. Here we will create two methods for them so that in the future the channels can convert between string and message by just invoking them.

   1: public abstract class MessageBusChannelBase : ChannelBase
   2: {
   3:     private const int CST_MAXBUFFERSIZE = 64 * 1024;
   4:  
   5:     private readonly Guid _id;
   6:     private readonly BufferManager _bufferManager;
   7:     private readonly MessageEncoder _encoder;
   8:  
   9:     protected BufferManager BufferManager
  10:     {
  11:         get
  12:         {
  13:             return _bufferManager;
  14:         }
  15:     }
  16:  
  17:     protected MessageEncoder Encoder
  18:     {
  19:         get
  20:         {
  21:             return _encoder;
  22:         }
  23:     }
  24:  
  25:     public string ChannelID
  26:     {
  27:         get
  28:         {
  29:             return _id.ToString();
  30:         }
  31:     }
  32:  
  33:     protected MessageBusChannelBase(BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent)
  34:         : base(parent)
  35:     {
  36:         _id = Guid.NewGuid();
  37:  
  38:         _bufferManager = bufferManager;
  39:         _encoder = encoder.CreateSessionEncoder();
  40:     }
  41:  
  42:     internal Message GetWcfMessageFromString(string content)
  43:     {
  44:         var raw = Encoding.UTF8.GetBytes(content);
  45:         var data = _bufferManager.TakeBuffer(raw.Length);
  46:         Buffer.BlockCopy(raw, 0, data, 0, raw.Length);
  47:         var buffer = new ArraySegment<byte>(data, 0, raw.Length);
  48:         var message = _encoder.ReadMessage(buffer, _bufferManager);
  49:         return message;
  50:     }
  51:  
  52:     internal string GetStringFromWcfMessage(Message message, EndpointAddress to)
  53:     {
  54:         ArraySegment<byte> buffer;
  55:         string content;
  56:         using (message)
  57:         {
  58:             to.ApplyTo(message);
  59:             buffer = _encoder.WriteMessage(message, CST_MAXBUFFERSIZE, _bufferManager);
  60:         }
  61:         content = Encoding.UTF8.GetString(buffer.Array, buffer.Offset, buffer.Count);
  62:         _bufferManager.ReturnBuffer(buffer.Array);
  63:         return content;
  64:     }
  65:  
  66:     protected override void OnAbort()
  67:     {
  68:     }
  69:  
  70:     protected override void OnClose(TimeSpan timeout)
  71:     {
  72:     }
  73:  
  74:     protected override void OnOpen(TimeSpan timeout)
  75:     {
  76:     }
  77:  
  78:     #region Not Implemented
  79:  
  80:     protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  81:     {
  82:         throw new NotImplementedException();
  83:     }
  84:  
  85:     protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  86:     {
  87:         throw new NotImplementedException();
  88:     }
  89:  
  90:     protected override void OnEndClose(IAsyncResult result)
  91:     {
  92:         throw new NotImplementedException();
  93:     }
  94:  
  95:     protected override void OnEndOpen(IAsyncResult result)
  96:     {
  97:         throw new NotImplementedException();
  98:     }
  99:  
 100:     #endregion
 101: }

The main responsibility of the channel factory and channel listener is to create a related channel object. Each factories and listeners will has very similar logic, except which type of the channel it will create. So we can have two base classes to wrap almost all complexity and just let the sub classes to take the responsible to create the actual channel object.

   1: public abstract class MessageBusChannelFactoryBase<TChannel> : ChannelFactoryBase<TChannel> where TChannel : class, IChannel
   2: {
   3:     private readonly MessageBusTransportBindingElement _transportElement;
   4:     private readonly BufferManager _bufferManager;
   5:     private readonly MessageEncoderFactory _encoder;
   6:  
   7:     private readonly IBus _bus;
   8:  
   9:     public IBus Bus
  10:     {
  11:         get
  12:         {
  13:             return _bus;
  14:         }
  15:     }
  16:  
  17:     protected MessageBusTransportBindingElement TransportElement
  18:     {
  19:         get
  20:         {
  21:             return _transportElement;
  22:         }
  23:     }
  24:  
  25:     protected MessageBusChannelFactoryBase(MessageBusTransportBindingElement transportElement, BindingContext context)
  26:         : base(context.Binding)
  27:     {
  28:         _transportElement = transportElement;
  29:         _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
  30:         var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
  31:         if (encodingElement == null)
  32:         {
  33:             _encoder = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
  34:         }
  35:         else
  36:         {
  37:             _encoder = encodingElement.CreateMessageEncoderFactory();
  38:         }
  39:         _bus = transportElement.Bus;
  40:     }
  41:  
  42:     protected abstract TChannel CreateChannel(
  43:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  44:         MessageBusChannelFactoryBase<TChannel> parent,
  45:         Uri via,
  46:         IBus bus);
  47:  
  48:     protected override void OnOpen(TimeSpan timeout)
  49:     {
  50:     }
  51:  
  52:     protected override void OnClosed()
  53:     {
  54:         base.OnClosed();
  55:  
  56:         _bufferManager.Clear();
  57:         _bus.Dispose();
  58:     }
  59:  
  60:     protected override TChannel OnCreateChannel(System.ServiceModel.EndpointAddress address, Uri via)
  61:     {
  62:         return CreateChannel(_bufferManager, _encoder, address, this, via, _bus);
  63:     }
  64:  
  65:     protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  66:     {
  67:         throw new NotImplementedException();
  68:     }
  69:  
  70:     protected override void OnEndOpen(IAsyncResult result)
  71:     {
  72:         throw new NotImplementedException();
  73:     }
  74: }
   1: public abstract class MessageBusChannelListenerBase<TChannel> : ChannelListenerBase<TChannel> where TChannel : class, IChannel
   2: {
   3:     private readonly MessageBusTransportBindingElement _transportElement;
   4:  
   5:     private readonly BufferManager _bufferManager;
   6:     private readonly MessageEncoderFactory _encoder;
   7:     private readonly string _scheme;
   8:     private readonly Uri _uri;
   9:  
  10:     private readonly IBus _bus;
  11:  
  12:     private readonly InputQueue<TChannel> _channelQueue;
  13:     private readonly object _currentChannelLock;
  14:  
  15:     private TChannel _currentChannel;
  16:  
  17:     public IBus Bus
  18:     {
  19:         get
  20:         {
  21:             return _bus;
  22:         }
  23:     }
  24:  
  25:     public override Uri Uri
  26:     {
  27:         get
  28:         {
  29:             return _uri;
  30:         }
  31:     }
  32:  
  33:     public string Scheme
  34:     {
  35:         get
  36:         {
  37:             return _scheme;
  38:         }
  39:     }
  40:  
  41:     protected MessageBusTransportBindingElement TransportElement
  42:     {
  43:         get
  44:         {
  45:             return _transportElement;
  46:         }
  47:     }
  48:  
  49:     protected MessageBusChannelListenerBase(MessageBusTransportBindingElement transportElement, BindingContext context)
  50:         : base(context.Binding)
  51:     {
  52:         _transportElement = transportElement;
  53:         _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
  54:         var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
  55:         if (encodingElement == null)
  56:         {
  57:             _encoder = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
  58:         }
  59:         else
  60:         {
  61:             _encoder = encodingElement.CreateMessageEncoderFactory();
  62:         }
  63:         _scheme = transportElement.Scheme;
  64:         _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
  65:  
  66:         _bus = transportElement.Bus;
  67:  
  68:         _channelQueue = new InputQueue<TChannel>();
  69:         _currentChannelLock = new object();
  70:         _currentChannel = null;
  71:     }
  72:  
  73:     protected override void OnAbort()
  74:     {
  75:         try
  76:         {
  77:             lock (ThisLock)
  78:             {
  79:                 _channelQueue.Close();
  80:             }
  81:         }
  82:         catch { }
  83:     }
  84:  
  85:     protected override void OnClose(TimeSpan timeout)
  86:     {
  87:         try
  88:         {
  89:             lock (ThisLock)
  90:             {
  91:                 _channelQueue.Close();
  92:             }
  93:         }
  94:         catch { }
  95:     }
  96:  
  97:     protected override void OnClosed()
  98:     {
  99:         base.OnClosed();
 100:  
 101:         try
 102:         {
 103:             _bufferManager.Clear();
 104:             _bus.Dispose();
 105:         }
 106:         catch { }
 107:     }
 108:  
 109:     private void EnsureChannelAvailable()
 110:     {
 111:         TChannel newChannel = null;
 112:         bool channelCreated = false;
 113:  
 114:         if ((newChannel = _currentChannel) == null)
 115:         {
 116:             lock (_currentChannelLock)
 117:             {
 118:                 if ((newChannel = _currentChannel) == null)
 119:                 {
 120:                     newChannel = CreateChannel(_bufferManager, _encoder, new EndpointAddress(_uri), this, _bus);
 121:                     newChannel.Closed += new EventHandler(OnChannelClosed);
 122:                     _currentChannel = newChannel;
 123:                     channelCreated = true;
 124:                 }
 125:             }
 126:         }
 127:  
 128:         if (channelCreated)
 129:         {
 130:             _channelQueue.EnqueueAndDispatch(newChannel);
 131:         }
 132:     }
 133:  
 134:     protected abstract TChannel CreateChannel(
 135:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
 136:         MessageBusChannelListenerBase<TChannel> parent,
 137:         IBus bus);
 138:  
 139:     private void OnChannelClosed(object sender, EventArgs e)
 140:     {
 141:         var channel = sender as TChannel;
 142:         lock (_currentChannelLock)
 143:         {
 144:             if (channel == _currentChannel)
 145:             {
 146:                 _currentChannel = null;
 147:             }
 148:         }
 149:     }
 150:  
 151:     protected override TChannel OnAcceptChannel(TimeSpan timeout)
 152:     {
 153:         if (!IsDisposed)
 154:         {
 155:             EnsureChannelAvailable();
 156:         }
 157:  
 158:         TChannel channel = null;
 159:         if (_channelQueue.Dequeue(timeout, out channel))
 160:         {
 161:             return channel;
 162:         }
 163:         else
 164:         {
 165:             throw new TimeoutException();
 166:         }
 167:     }
 168:  
 169:     protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
 170:     {
 171:         if (!IsDisposed)
 172:         {
 173:             EnsureChannelAvailable();
 174:         }
 175:  
 176:         return _channelQueue.BeginDequeue(timeout, callback, state);
 177:     }
 178:  
 179:     protected override TChannel OnEndAcceptChannel(IAsyncResult result)
 180:     {
 181:         TChannel channel;
 182:         if (_channelQueue.EndDequeue(result, out channel))
 183:         {
 184:             return channel;
 185:         }
 186:         else
 187:         {
 188:             throw new TimeoutException();
 189:         }
 190:     }
 191:  
 192:     protected override void OnOpen(TimeSpan timeout)
 193:     {
 194:     }
 195:  
 196:     protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
 197:     {
 198:         throw new NotImplementedException();
 199:     }
 200:  
 201:     protected override bool OnEndWaitForChannel(IAsyncResult result)
 202:     {
 203:         throw new NotImplementedException();
 204:     }
 205:  
 206:     protected override bool OnWaitForChannel(TimeSpan timeout)
 207:     {
 208:         throw new NotImplementedException();
 209:     }
 210:  
 211:     protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
 212:     {
 213:         throw new NotImplementedException();
 214:     }
 215:  
 216:     protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
 217:     {
 218:         throw new NotImplementedException();
 219:     }
 220:  
 221:     protected override void OnEndClose(IAsyncResult result)
 222:     {
 223:         throw new NotImplementedException();
 224:     }
 225:  
 226:     protected override void OnEndOpen(IAsyncResult result)
 227:     {
 228:         throw new NotImplementedException();
 229:     }
 230: }

As you can see, the ChannelFactoryBase and the ChannelListenerBase I just expose an abstract method which is CreateChannel, that gives a chance to the sub class to instant the actual channel object. After these modification our request and reply part (that had been done in the previous post) could be turned simpler and clearer.

The RequestChannelFactory will be like this.

   1: public class MessageBusRequestChannelFactory : MessageBusChannelFactoryBase<IRequestChannel>
   2: {
   3:     public MessageBusRequestChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IRequestChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress, 
  10:         MessageBusChannelFactoryBase<IRequestChannel> parent, 
  11:         Uri via, 
  12:         IBus bus)
  13:     {
  14:         return new MessageBusRequestChannel(bufferManager, encoder, parent, remoteAddress, via, bus);
  15:     }
  16: }

And this is the ReplyChannelListener.

   1: public class MessageBusReplyChannelListener : MessageBusChannelListenerBase<IReplyChannel>
   2: {
   3:     public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IReplyChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress, 
  10:         MessageBusChannelListenerBase<IReplyChannel> parent, 
  11:         IBus bus)
  12:     {
  13:         return new MessageBusReplyChannel(bufferManager, encoder, localAddress, parent, bus);
  14:     }
  15: }

And below is the RequestChannel and ReplyChannel. I inherited them from the MessageBusChannelBase class and utilized the helper methods to convert between string and message.

   1: public class MessageBusRequestChannel : MessageBusChannelBase, IRequestChannel
   2: {
   3:     private readonly IBus _bus;
   4:     private readonly Uri _via;
   5:     private readonly EndpointAddress _remoteAddress;
   6:     private readonly object _aLock;
   7:  
   8:     public MessageBusRequestChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
  10:         EndpointAddress remoteAddress, Uri via, IBus bus)
  11:         : base(bufferManager, encoder, parent)
  12:     {
  13:         _via = via;
  14:         _remoteAddress = remoteAddress;
  15:         _bus = bus;
  16:         _aLock = new object();
  17:     }
  18:  
  19:     public Uri Via
  20:     {
  21:         get
  22:         {
  23:             return _via;
  24:         }
  25:     }
  26:  
  27:     public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  28:     {
  29:         throw new NotImplementedException();
  30:     }
  31:  
  32:     public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
  33:     {
  34:         throw new NotImplementedException();
  35:     }
  36:  
  37:     public Message EndRequest(IAsyncResult result)
  38:     {
  39:         throw new NotImplementedException();
  40:     }
  41:  
  42:     public System.ServiceModel.EndpointAddress RemoteAddress
  43:     {
  44:         get 
  45:         {
  46:             return _remoteAddress;
  47:         }
  48:     }
  49:  
  50:     public Message Request(Message message, TimeSpan timeout)
  51:     {
  52:         ThrowIfDisposedOrNotOpen();
  53:         lock (_aLock)
  54:         {
  55:             // unbox the message into string that will be sent into the bus
  56:             var content = GetStringFromWcfMessage(message,_remoteAddress);
  57:             // send the message into bus
  58:             var busMsgId = _bus.SendRequest(content, true, null);
  59:             // waiting for the reply message arrive from the bus
  60:             var replyMsg = _bus.Receive(false, busMsgId);
  61:             // box the message from the bus message content and return back
  62:             var reply = GetWcfMessageFromString(replyMsg.Content);
  63:             return reply;
  64:         }
  65:     }
  66:  
  67:     public Message Request(Message message)
  68:     {
  69:         return Request(message, DefaultSendTimeout);
  70:     }
  71: }
   1: public class MessageBusReplyChannel : MessageBusChannelBase, IReplyChannel
   2: {
   3:     private readonly EndpointAddress _localAddress;
   4:     private readonly object _aLock;
   5:  
   6:     private readonly IBus _bus;
   7:  
   8:     private delegate bool TryReceiveRequestDelegate(TimeSpan timeout, out RequestContext context);
   9:     private TryReceiveRequestDelegate _tryReceiveRequestDelegate;
  10:  
  11:     public MessageBusReplyChannel(
  12:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
  13:         ChannelManagerBase parent,
  14:         IBus bus)
  15:         : base(bufferManager, encoder, parent)
  16:     {
  17:         _localAddress = localAddress;
  18:         _bus = bus;
  19:         _aLock = new object();
  20:  
  21:         _tryReceiveRequestDelegate = (TimeSpan t, out RequestContext rc) =>
  22:         {
  23:             rc = null;
  24:             // receive the request message from the bus
  25:             var busMsg = _bus.Receive(true, null);
  26:             // box the wcf message
  27:             var message = GetWcfMessageFromString(busMsg.Content);
  28:             // initialize the request context and return
  29:             rc = new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
  30:             return true;
  31:         };
  32:     }
  33:  
  34:     public System.ServiceModel.EndpointAddress LocalAddress
  35:     {
  36:         get
  37:         {
  38:             return _localAddress;
  39:         }
  40:     }
  41:  
  42:     public bool WaitForRequest(TimeSpan timeout)
  43:     {
  44:         return true;
  45:     }
  46:  
  47:     public RequestContext ReceiveRequest()
  48:     {
  49:         return ReceiveRequest(DefaultReceiveTimeout);
  50:     }
  51:  
  52:     public RequestContext ReceiveRequest(TimeSpan timeout)
  53:     {
  54:         ThrowIfDisposedOrNotOpen();
  55:         lock (_aLock)
  56:         {
  57:             // receive the request message from the bus
  58:             var busMsg = _bus.Receive(true, null);
  59:             // box the wcf message
  60:             var message = GetWcfMessageFromString(busMsg.Content);
  61:             // initialize the request context and return
  62:             return new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
  63:         }
  64:     }
  65:  
  66:     public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
  67:     {
  68:         RequestContext context;
  69:         return _tryReceiveRequestDelegate.BeginInvoke(timeout, out context, callback, state);
  70:     }
  71:  
  72:     public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext context)
  73:     {
  74:         var ret = _tryReceiveRequestDelegate.EndInvoke(out context, result);
  75:         return ret;
  76:     }
  77:  
  78:     public IAsyncResult BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
  79:     {
  80:         throw new NotImplementedException();
  81:     }
  82:  
  83:     public IAsyncResult BeginReceiveRequest(AsyncCallback callback, object state)
  84:     {
  85:         throw new NotImplementedException();
  86:     }
  87:  
  88:     public IAsyncResult BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state)
  89:     {
  90:         throw new NotImplementedException();
  91:     }
  92:  
  93:     public RequestContext EndReceiveRequest(IAsyncResult result)
  94:     {
  95:         throw new NotImplementedException();
  96:     }
  97:  
  98:     public bool EndWaitForRequest(IAsyncResult result)
  99:     {
 100:         throw new NotImplementedException();
 101:     }
 102:  
 103:     public bool TryReceiveRequest(TimeSpan timeout, out RequestContext context)
 104:     {
 105:         throw new NotImplementedException();
 106:     }
 107: }

Also a little bit changes could be made in the RequestContext as well, which utilizes the message convert methods as well.

   1: public class MessageBusRequestContext : RequestContext
   2: {
   3:     private bool _aborted;
   4:     private readonly Message _message;
   5:     private readonly MessageBusReplyChannel _parent;
   6:     private readonly EndpointAddress _address;
   7:     private readonly object _aLock;
   8:     private readonly string _busMessageId;
   9:     private readonly IBus _bus;
  10:  
  11:     private CommunicationState _state;
  12:  
  13:     public MessageBusRequestContext(
  14:         Message message, MessageBusReplyChannel parent,
  15:         EndpointAddress address,
  16:         IBus bus,
  17:         string relatedTo)
  18:     {
  19:         _aborted = false;
  20:         _parent = parent;
  21:         _message = message;
  22:         _address = address;
  23:         _busMessageId = relatedTo;
  24:         _bus = bus;
  25:  
  26:         _aLock = new object();
  27:         _state = CommunicationState.Opened;
  28:     }
  29:  
  30:     public override void Abort()
  31:     {
  32:         lock (_aLock)
  33:         {
  34:             if (_aborted)
  35:             {
  36:                 return;
  37:             }
  38:             _aborted = true;
  39:             _state = CommunicationState.Faulted;
  40:         }
  41:     }
  42:  
  43:     public override IAsyncResult BeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  44:     {
  45:         throw new NotImplementedException();
  46:     }
  47:  
  48:     public override IAsyncResult BeginReply(Message message, AsyncCallback callback, object state)
  49:     {
  50:         throw new NotImplementedException();
  51:     }
  52:  
  53:     public override void Close(TimeSpan timeout)
  54:     {
  55:         lock (_aLock)
  56:         {
  57:             _state = CommunicationState.Closed;
  58:         }
  59:     }
  60:  
  61:     public override void Close()
  62:     {
  63:         Close(TimeSpan.MaxValue);
  64:     }
  65:  
  66:     public override void EndReply(IAsyncResult result)
  67:     {
  68:         throw new NotImplementedException();
  69:     }
  70:  
  71:     public override void Reply(Message message, TimeSpan timeout)
  72:     {
  73:         // unbox the reply message to string
  74:         var content = _parent.GetStringFromWcfMessage(message, _address);
  75:         // send the reply into bus
  76:         _bus.SendReply(content, false, _busMessageId);
  77:     }
  78:  
  79:     public override void Reply(Message message)
  80:     {
  81:         Reply(message, TimeSpan.MaxValue);
  82:     }
  83:  
  84:     public override Message RequestMessage
  85:     {
  86:         get
  87:         {
  88:             return _message;
  89:         }
  90:     }
  91: }

 

Channel Factory, Listener and Channels for Datagram

After the refactoring it would be easier to create the channel factory, listener and channels for datagram MEP. First we will create the client side OutputChannelFactory, which just need to implement the CreateChannel from its base class (ChannelFactoryBase) and instant an IOutputChannel object.

   1: public class MessageBusOutputChannelFactory : MessageBusChannelFactoryBase<IOutputChannel>
   2: {
   3:     public MessageBusOutputChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IOutputChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  10:         MessageBusChannelFactoryBase<IOutputChannel> parent,
  11:         Uri via,
  12:         IBus bus)
  13:     {
  14:         return new MessageBusOutputChannel(bufferManager, encoder, parent, remoteAddress, via, bus);
  15:     }
  16: }

The MessageBusOutputChannel class we are going to create will be inherited from the MessageBusChannelBase as well, and will implement the IOutputChannel interface. There’s only one important method we should implement which is Send. This method will be invoked when the client send the message to the server to do something. You will notice that this method doesn’t have any return value, which means, as the datagram MEP defined, the client should not care about whether the service will process its message.

Since we have had the convert method in the channel base class, we can simply convert the message into string and pass it to the underlying message bus.

   1: public virtual void Send(Message message, TimeSpan timeout)
   2: {
   3:     var content = GetStringFromWcfMessage(message, RemoteAddress);
   4:     _bus.SendRequest(content, true, ChannelID, null);
   5: }

The full code of the output channel is

   1: public class MessageBusOutputChannel : MessageBusChannelBase, IOutputChannel
   2: {
   3:     private readonly IBus _bus;
   4:     private readonly Uri _via;
   5:     private readonly EndpointAddress _remoteAddress;
   6:  
   7:     public MessageBusOutputChannel(
   8:         BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
   9:         EndpointAddress remoteAddress,
  10:         Uri via, IBus bus)
  11:         : base(bufferManager, encoder, parent)
  12:     {
  13:         _bus = bus;
  14:         _via = via;
  15:         _remoteAddress = remoteAddress;
  16:     }
  17:  
  18:     public Uri Via
  19:     {
  20:         get
  21:         {
  22:             return _via;
  23:         }
  24:     }
  25:  
  26:     public EndpointAddress RemoteAddress
  27:     {
  28:         get
  29:         {
  30:             return _remoteAddress;
  31:         }
  32:     }
  33:  
  34:     public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  35:     {
  36:         throw new NotImplementedException();
  37:     }
  38:  
  39:     public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
  40:     {
  41:         throw new NotImplementedException();
  42:     }
  43:  
  44:     public void EndSend(IAsyncResult result)
  45:     {
  46:         throw new NotImplementedException();
  47:     }
  48:  
  49:     public virtual void Send(Message message, TimeSpan timeout)
  50:     {
  51:         var content = GetStringFromWcfMessage(message, RemoteAddress);
  52:         _bus.SendRequest(content, true, ChannelID, null);
  53:     }
  54:  
  55:     public void Send(Message message)
  56:     {
  57:         Send(message, DefaultSendTimeout);
  58:     }
  59: }

On the server side, similarly we need to create the ChannelListener from the base class and just need to implement the code to create the relevant InputChannel instance.

   1: public class MessageBusInputChannelListener : MessageBusChannelListenerBase<IInputChannel>
   2: {
   3:     public MessageBusInputChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IInputChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
  10:         MessageBusChannelListenerBase<IInputChannel> parent, 
  11:         IBus bus)
  12:     {
  13:         return new MessageBusInputChannel(bufferManager, encoder, parent, localAddress, bus);
  14:     }
  15: }

The InputChannel need to receive the message sent from the client (InputChannel), by its BeginTryReceive, EndTryReceive, BeginReceive and EndReceive methods. We will also create two delegates for them and use the asynchronous invoke methods.

   1: public class MessageBusInputChannel : MessageBusChannelBase, IInputChannel
   2: {
   3:     private readonly IBus _bus;
   4:     private readonly EndpointAddress _localAddress;
   5:     private readonly object _aLock;
   6:  
   7:     private delegate bool TryReceiveDelegate(TimeSpan timeout, out Message message);
   8:     private TryReceiveDelegate _tryReceiveDelegate;
   9:  
  10:     private delegate Message ReceiveDelegate(TimeSpan timeout);
  11:     private ReceiveDelegate _receiveDelegate;
  12:  
  13:     public MessageBusInputChannel(
  14:         BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
  15:         EndpointAddress localAddress,
  16:         IBus bus)
  17:         : base(bufferManager, encoder, parent)
  18:     {
  19:         _localAddress = localAddress;
  20:         _bus = bus;
  21:         _aLock = new object();
  22:  
  23:         _tryReceiveDelegate = (TimeSpan timeout, out Message message) =>
  24:         {
  25:             message = null;
  26:             try
  27:             {
  28:                 var requestMessage = _bus.Receive(true, null);
  29:                 if (requestMessage != null)
  30:                 {
  31:                     message = GetWcfMessageFromString(requestMessage.Content);
  32:                     OnTryReceive(requestMessage);
  33:                 }
  34:             }
  35:             catch (Exception ex)
  36:             {
  37:                 throw new CommunicationException(ex.Message, ex);
  38:             }
  39:             return true;
  40:         };
  41:  
  42:         _receiveDelegate = (TimeSpan timeout) =>
  43:         {
  44:             var requestMessage = _bus.Receive(false, ChannelID);
  45:             return GetWcfMessageFromString(requestMessage.Content);
  46:         };
  47:     }
  48:  
  49:     public System.ServiceModel.EndpointAddress LocalAddress
  50:     {
  51:         get
  52:         {
  53:             return _localAddress;
  54:         }
  55:     }
  56:  
  57:     public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
  58:     {
  59:         return _receiveDelegate.BeginInvoke(timeout, callback, state);
  60:     }
  61:  
  62:     public IAsyncResult BeginReceive(AsyncCallback callback, object state)
  63:     {
  64:         return BeginReceive(DefaultReceiveTimeout, callback, state);
  65:     }
  66:  
  67:     public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
  68:     {
  69:         Message message;
  70:         return _tryReceiveDelegate.BeginInvoke(timeout, out message, callback, state);
  71:     }
  72:  
  73:     public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
  74:     {
  75:         throw new NotImplementedException();
  76:     }
  77:  
  78:     public Message EndReceive(IAsyncResult result)
  79:     {
  80:         return _receiveDelegate.EndInvoke(result);
  81:     }
  82:  
  83:     public virtual bool EndTryReceive(IAsyncResult result, out Message message)
  84:     {
  85:         var ret = _tryReceiveDelegate.EndInvoke(out message, result);
  86:         return ret;
  87:     }
  88:  
  89:     protected virtual void OnTryReceive(BusMessage message)
  90:     {
  91:     }
  92:  
  93:     public bool EndWaitForMessage(IAsyncResult result)
  94:     {
  95:         throw new NotImplementedException();
  96:     }
  97:  
  98:     public Message Receive(TimeSpan timeout)
  99:     {
 100:         throw new NotImplementedException();
 101:     }
 102:  
 103:     public Message Receive()
 104:     {
 105:         throw new NotImplementedException();
 106:     }
 107:  
 108:     public bool TryReceive(TimeSpan timeout, out Message message)
 109:     {
 110:         throw new NotImplementedException();
 111:     }
 112:  
 113:     public bool WaitForMessage(TimeSpan timeout)
 114:     {
 115:         throw new NotImplementedException();
 116:     }
 117: }

You might notice that we defined a virtual method named OnTryRecevie and invoked it after the delegate got a valid incoming message, but without doing anything. This method will be used in the future in session implement. You can ignore it for now.

Last thing we need to do is to update the transport binding element, which will make our transport support the InputChannel and OutputChannel, and return the related ChannelFactory and ChannelListener.

   1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
   2: {
   3:     return typeof(TChannel) == typeof(IRequestChannel) ||
   4:            typeof(TChannel) == typeof(IOutputChannel);
   5: }
   6:  
   7: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
   8: {
   9:     return typeof(TChannel) == typeof(IReplyChannel) ||
  10:            typeof(TChannel) == typeof(IInputChannel);
  11: }
  12:  
  13: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
  14: {
  15:     if (context == null)
  16:     {
  17:         throw new ArgumentNullException("context");
  18:     }
  19:     if (!CanBuildChannelFactory<TChannel>(context))
  20:     {
  21:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
  22:     }
  23:  
  24:     if (typeof(TChannel) == typeof(IRequestChannel))
  25:     {
  26:         return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
  27:     }
  28:     else if (typeof(TChannel) == typeof(IOutputChannel))
  29:     {
  30:         return (IChannelFactory<TChannel>)(object)new MessageBusOutputChannelFactory(this, context);
  31:     }
  32:     else
  33:     {
  34:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  35:     }
  36:  
  37: }
  38:  
  39: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
  40: {
  41:     if (context == null)
  42:     {
  43:         throw new ArgumentNullException("context");
  44:     }
  45:     if (!CanBuildChannelListener<TChannel>(context))
  46:     {
  47:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  48:     }
  49:  
  50:     if (typeof(TChannel) == typeof(IReplyChannel))
  51:     {
  52:         return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
  53:     }
  54:     else if (typeof(TChannel) == typeof(IInputChannel))
  55:     {
  56:         return (IChannelListener<TChannel>)(object)new MessageBusInputChannelListener(this, context);
  57:     }
  58:     else
  59:     {
  60:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  61:     }
  62: }

Now let’s test our datagram operation. Open the test console project and create a new service contract with one method which has the IsOneWay = true defined.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/")]
   2: public interface ISampleService
   3: {
   4:     [OperationContract(IsOneWay = true)]
   5:     void Ping();
   6: }
   7:  
   8: public class SampleService : ISampleService
   9: {
  10:     public void Ping()
  11:     {
  12:         Console.WriteLine("Service {0}: PONG!", OperationContext.Current.Host.GetHashCode());
  13:     }
  14: }

And update the console main function as well.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
   8:     var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
   9:     var host3 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
  10:  
  11:     // establish the client
  12:     var cliBinding = new MessageBusTransportBinding(bus);
  13:     var factory = new ChannelFactory<ISampleService>(cliBinding, address);
  14:     factory.Opened += (sender, e) =>
  15:         {
  16:             Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
  17:         };
  18:     var proxy = factory.CreateChannel();
  19:  
  20:     // invoke the service
  21:     using (proxy as IDisposable)
  22:     {
  23:         Console.WriteLine("Press 'p' to ping the service, otherwise to exit.");
  24:         var content = Console.ReadLine();
  25:         while (string.Compare(content, "p", true) == 0)
  26:         {
  27:             proxy.Ping();
  28:             content = Console.ReadLine();
  29:         }
  30:     }
  31: }

Again, I’m using the in process message bus so the multi-instances service will be hosted on threads instead of processes and machines. When we invoked the service many times we can see the datagram message send across to the three services.

image

 

IsOneWay = ture Always Means Input-Output Channel?

Let’s modified our test service contract and service implementation a little bit, and to see what happen. Let’s add another method in the contract with the IsOneWay = false, as well as the implementation class.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/")]
   2: public interface ISampleService
   3: {
   4:     [OperationContract(IsOneWay = true)]
   5:     void Ping();
   6:  
   7:     [OperationContract(IsOneWay = false)]
   8:     string Reverse(string content);
   9: }
  10:  
  11: public class SampleService : ISampleService
  12: {
  13:     public void Ping()
  14:     {
  15:         Console.WriteLine("Service {0}: PONG!", OperationContext.Current.Host.GetHashCode());
  16:     }
  17:  
  18:     public string Reverse(string content)
  19:     {
  20:         var result = new string(content.Reverse().ToArray());
  21:         Console.WriteLine("Service {0}: Reverse {1} => {2}", OperationContext.Current.Host.GetHashCode(), content, result);
  22:         return result;
  23:     }
  24: }

And then F5 our console application and try to invoke the Ping method. Now we found that the client sent the request, but the service method was not invoked and our client was dead.

image

More interesting, if we debugged the code we will find that when we invoked the Ping service method which is IsOneWay = true, the WCF tried to create a RequestChannel and ReplyChannel to us.

image

Why I invoked a service method with IsOneWay = true, but the WCF gave me a request and reply Channel? Well the channel the WCF chose is NOT based on the method you are invoking, it is based on the contract definition. The channel selection logic is complex but to be simple, WCF will select the channels which just satisfied with the requirement of your service contract. Kenny Wolf have a nice post that roughly summarized this logic.

In our case, we have a method marked IsOneWay = true and another marked IsOneWay = false. So the WCF will chose the request reply channels even though we are using the method that IsOneWay = true. The problem is that, when using request reply channel handles the datagram message, the reply message from WCF will always be null. This will cause the exception in our RequestContext.Reply method, since we need to send the reply message back to the client but now it’s null.

image

Thanks Jiang Jinnan, who helped me a lot to figure this problem out and the solution suggestion. For more information about this WCF MVP you can visit his blog (in Chinese).

When we knew the root caution, the solution would be very simple. In the RequestContext.Reply method we will check if the message is null. If yes, that means this is a datagram channel shape and we need to send a blank message back to the client to acknowledge and make sure the process is running.

   1: public override void Reply(Message message, TimeSpan timeout)
   2: {
   3:     if (message == null)
   4:     {
   5:         // this means this is a one way message
   6:         // we just need to send a blank message back to the client to acknowledge it
   7:         _bus.SendReply(string.Empty, false, _busMessageId);
   8:     }
   9:     else
  10:     {
  11:         // unbox the reply message to string
  12:         var content = _parent.GetStringFromWcfMessage(message, _address);
  13:         // send the reply into bus
  14:         _bus.SendReply(content, false, _busMessageId);
  15:     }
  16: }

And in the RequestChannel.Request we also need to check whether the reply message content is blank. If yes we will do nothing since we know that this is a datagram channel acknowledge message from the server side.

   1: public Message Request(Message message, TimeSpan timeout)
   2: {
   3:     ThrowIfDisposedOrNotOpen();
   4:     lock (_aLock)
   5:     {
   6:         // unbox the message into string that will be sent into the bus
   7:         var content = GetStringFromWcfMessage(message,_remoteAddress);
   8:         // send the message into bus
   9:         var busMsgId = _bus.SendRequest(content, true, null);
  10:         // waiting for the reply message arrive from the bus
  11:         var replyMsg = _bus.Receive(false, busMsgId);
  12:         if (string.IsNullOrWhiteSpace(replyMsg.Content))
  13:         {
  14:             // this means this is a one way channel acknowledge from server
  15:             // we just return null and do nothing
  16:             return null;
  17:         }
  18:         else
  19:         {
  20:             // box the message from the bus message content and return back
  21:             var reply = GetWcfMessageFromString(replyMsg.Content);
  22:             return reply;
  23:         }
  24:     }
  25: }

Now let’s execute our test console you can see the IsOneWay = ture method was invoked correctly.

image

 

Summary

In this post I implemented the datagram MEP channel factory, listener and the related input output channels. I also explained a potential problem if we have IsOneWay = true methods and IsOneWay = false methods mixed in one service contract, and how to solve it.

We have finished two MEPs in our transport extension, request reply and datagram. And both of them supports the service side scaling-out, as you can see our request can be handled by multiple services, based on the pulling approach.

In the next post I will demonstrate the most complexity and difficulty part, the duplex mode. I will implement the duplex channels over our message bus infrastructure with the server side scalability and sticky to the same client instance as well.

 

You can download the source code here.

 

Hope this helps,

Shaun

All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.

Comments

No comments posted yet.
Post A Comment
Title:
Name:
Email:
Comment:
Verification: