Shaun Xu

The Sheep-Pen of the Shaun



Shaun, the author of this blog is a semi-geek, clumsy developer, passionate speaker and incapable architect with about 10 years’ experience in .NET and JavaScript. He hopes to prove that software development is art rather than manufacturing. He's into cloud computing platform and technologies (Windows Azure, Amazon and Aliyun) and right now, Shaun is being attracted by JavaScript (Angular.js and Node.js) and he likes it.

Shaun is working at Worktile Inc. as the chief architect for overall design and develop worktile, a web-based collaboration and task management tool, and lesschat, a real-time communication aggregation tool.


My Stats

  • Posts - 122
  • Comments - 622
  • Trackbacks - 0

Tag Cloud

Recent Comments

Recent Posts

Article Categories


Post Categories

Image Galleries


In our last post I demonstrated how to implement the datagram channel shape, and in the second and third post I described the request reply shape. In this post I will explain the last MEP in WCF, duplex, which is the most complex one.


Basis of the Duplex Channel Shape (MEP)

In the MSDN document it said that “The duplex MEP allows an arbitrary number of messages to be sent by a client and received in any order. The duplex MEP is like a phone conversation, where each word being spoken is a message. Because both sides can send and receive in this MEP, the interface implemented by the client and service channels is IDuplexChannel.”.

It would be very easy to understand if we have one server instance and one client instance. You can assume that when they are using duplex mode, the client and the server can call each other freely. When the server calling the client, it will grab the client contract and invoke by using a proxy class, which is very similar as what we did to call a service from a client through the ChannelFactory<T>. In fact in duplex mode the client also play as a service role, which means when server invoked a client callback, the server will be a client and the client will be a server.


The above figure outlined how a duplex communication works. At the beginning the client send a request to the server. And during the server side business logic processed, it invoked the client side contract and got the result. These two client side invoking we normally call them “callback”. They can be invoked to the same client side method or different. And finally the server finished the business logic and send the reply back.

This figure only described the scenario that all communication are request-reply mode, which means client request and server callback are all need the reply. In duplex mode we can also using the datagram mode.The server can invoke a datagram callback to the client, while the client can invoke a datagram service method as well.

But the WCF will always use the duplex channel to send and receive message if we defined the callback contract in the service contract, even though there’s no callback invoked in the service implementation.

From the explanation above we might feel that the duplex channel may have a concept of the connection. When the client send the first request it somehow established a “connection” with the server. And the callback will use this “connection” to communicate with this client and send the request, reply back and forth. In the build-in WCF transports it does utilizes the connection to implement the duplex channel. For example, in NET.TCP the WCF use the TCP connection as the duplex connection. In WsDualHttp it would be a little bit complexity. Since the HTTP protocol is connection-less, it implements the duplex mode by introducing another HTTP address from the client to the server to handle the callback communication.


But in our case we only have a message bus which serves multiple service instances and clients. In order to make our duplex channel scalable as much as possible, but also need the ensure that once a client send the original request and received by a server instance, all messages must be between them, the server callback should not be handled by another client.



Duplex Channel

After clarified the duplex channel basis and the goal in our message bus and scaling-out case let’s have a look on the WCF duplex channel itself.

Different from the channel shapes we introduced before, the duplex channel will be created and used on both server and client side. If you are dig into the definition of the IDuplexChannel you will find that there’s no extra member for it but just inherited from the IInputChannel and IOutputChannel. When the duplex channel was established, no mater from the server side ChannelListener or the client side ChannelFactory, it will try to receive the message by its BeginTryReceive method. And no matter the client side request or callback reply, or the server side callback request or reply, the duplex channel will always use its Send method to send the message. So this makes our implementation a little bit complexity.

In fact you can have a duplex channel class for server while another for client. But I prefer to use one channel class to support both server and client side since the main procedure are all same.

On the server side, the channel needs to receive two kind of messages.

  • Original client request message. This kind of message can be received by any server instance that listening on the same endpoint (message bus queue).
  • Callback reply. The server can only receive the callback reply message which requested by itself.

It would be very easy to listen the first kind of message but the second one would be annoying. It’s similar as what we have done in request-reply mode. In request reply mode the client should only receive the reply which the request was sent by itself. But in the request-reply the client will send the request message and then wait for the reply. In that case it’s possible for us to know the request message ID, and use this ID to pick the related reply message. But in duplex mode, as I have said before, both on client and server side it will firstly try to receive message then send the message if needed. That means we must have some mechanism to let the channel know the identity of the request message BEFORE it was sent.


In duplex mode the restrict is that, a channel must receive the reply message which the original request was sent by itself. Channel cannot receive the reply which request by another channel. Hence we just need to find a way to identity if the message was related with the request that sent from the same channel. After figured out the key point the solution should be simple. If you remembered in the last post when I refactoring the channel base class there’s a property named ChannelID, which is a GUID assigned automatically when a channel was initialized. This is the identity we will use to check if the message should be received by this channel.

  • When a duplex channel was created and began to receive message, it should try to receive the message which is a request (client request or callback request), and the reply message which have the property saying that it’s only for this channel.
  • After the channel received the message it will unbox the SOAP message ID and the ChannelID where it came from and save into a dictionary.
  • When the channel sent a reply message, it will find the message ID from the reply message’s RelatedTo field and get the relevant ChannelID, and send the message with this ChannelID to indicate that only this channel can receive this message.
  • When the channel sent a callback request, it will find the original request message ID from the OperationContext.RequestContext.RequestMessge and get the relevant ChannelID, and send the message with this ID to indicate that only this client can receive the callback request and perform the client side logic.


The figure above demonstrated what will happened for a duplex communication with one callback request in our solution.

  • The client and server channel has their own channel ID.
  • Client and server start to receive the incoming messages. On the client side it only receive the message that send to its channel ID. On the server side it can receive messages from any channels as well as the message that only to it.
  • Client send the request message with the channel ID where it comes from, but no target channel ID specified. This message can be received by any service instance channels.
  • A service channel received this message and saved the message ID and the from channel ID.
  • Service channel start to try to receive message again.
  • After processed some business code it began to send the callback request message back to the client. From the OperationContext.RequestContext it retrieved the original request message ID and find the channel ID it came from, and append this channel ID into the callback request message. Then this message can only be received by this channel. It also append the channel ID it is.
  • Only this client received this callback request since the message have the “To ChannelID = 1”. It saved the message ID and the from channel ID.
  • Client channel start to try to receive message again.
  • After processed some business code it began to send the callback reply message back to the service. From the OperationContext.RequestContext it retrieved the original callback request message ID and find the channel ID it came from, and append this channel ID into the callback reply message. Then this message can only be received by this channel.
  • Only this service received this reply message since its channel ID equals the “To ChannelID” in the reply message.
  • Service channel start to try to receive message again.
  • After processed the business code it began to send the reply message back to the client. From the OperationContext.RequestContext it retrieved the original request message ID and find the channel ID it came from, and append this channel ID into the reply message. Then this message can only be received by this channel.
  • Only this client received this reply message since the message have the “To ChannelID = 1”.
  • Client channel start to try to receive message again.
  • Finished the duplex invoke.


Implement the Duplex Channel, Channel Factory and Channel Listener

After clarified the solution the implementation would be straight forward. Since I’m going to using one duplex channel class for both server and client, I need a local variant to indicate if it’s running on the server side or client side. I also need to save the server side address and a dictionary to save the relationship between the message ID and the channel ID.

Since our channel would be executed in multi-thread mode we should use the ConcurrentDictionary in .NET 4 System.Collections.Concurrent namespace to store the relationship of the channel ID and message ID.

And when implementing the receive message delegate we will specify the channel ID in the parameter, so that it will receive the message that has no “To Channel ID” specified, or specified only to this channel.

After it received a message we will check if it has the message ID field. If yes this means this message is a request message (client request or server callback request). Then we need to save the message ID and the “From Channel ID” into the local dictionary, so that when sending the reply message we can find which channel ID it should send to.

And also some logic of the local address and remote address properties etc. as well.

   1: private readonly IBus _bus;
   2: private readonly Uri _via;
   3: private readonly EndpointAddress _serverAddress;
   4: private readonly ConcurrentDictionary<UniqueId, string> _replyTos;
   5: private readonly bool _isClient;
   7: private delegate bool TryReceiveDelegate(TimeSpan timeout, out Message message);
   8: private readonly TryReceiveDelegate _tryReceiveDelegate;
  10: public MessageBusDuplexChannel(
  11:     BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  12:     ChannelManagerBase parent, Uri via,
  13:     IBus bus, bool isClient)
  14:     : base(bufferManager, encoder, parent)
  15: {
  16:     _serverAddress = remoteAddress;
  17:     _via = via;
  18:     _bus = bus;
  19:     _isClient = isClient;
  20:     _replyTos = new ConcurrentDictionary<UniqueId, string>();
  22:     _tryReceiveDelegate = (TimeSpan timeout, out Message message) =>
  23:     {
  24:         message = null;
  25:         try
  26:         {
  27:             // listen the message bus based on the sticky mode: 
  28:             // channel: only receive the message that reply to this channel's id
  29:             // scaling gourp: receive the message the reply to this channel's id and the scaling group id of this channel
  30:             var requestMessage = _bus.Receive(!_isClient, ChannelID);
  31:             if (requestMessage != null)
  32:             {
  33:                 message = GetWcfMessageFromString(requestMessage.Content);
  34:                 if (message.Headers.MessageId != null)
  35:                 {
  36:                     _replyTos.AddOrUpdate(message.Headers.MessageId, requestMessage.From, (key, value) => requestMessage.From);
  37:                 }
  38:             }
  39:         }
  40:         catch (Exception ex)
  41:         {
  42:             throw new CommunicationException(ex.Message, ex);
  43:         }
  44:         return true;
  45:     };
  46: }
  48: public EndpointAddress LocalAddress
  49: {
  50:     get 
  51:     {
  52:         if (_isClient)
  53:         {
  54:             return new EndpointAddress(EndpointAddress.AnonymousUri);
  55:         }
  56:         else
  57:         {
  58:             return _serverAddress;
  59:         }
  60:     }
  61: }
  63: public EndpointAddress RemoteAddress
  64: {
  65:     get 
  66:     {
  67:         if (_isClient)
  68:         {
  69:             return _serverAddress;
  70:         }
  71:         else
  72:         {
  73:             return new EndpointAddress(EndpointAddress.AnonymousUri);
  74:         }
  75:     }
  76: }
  78: public Uri Via
  79: {
  80:     get 
  81:     {
  82:         return _via;
  83:     }
  84: }
  86: public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
  87: {
  88:     Message message;
  89:     return _tryReceiveDelegate.BeginInvoke(timeout, out message, callback, state);
  90: }
  92: public bool EndTryReceive(IAsyncResult result, out Message message)
  93: {
  94:     var ret = _tryReceiveDelegate.EndInvoke(out message, result);
  95:     return ret;
  96: }

Next, let’s implement the send procedure. The send method will be used when send request message and reply message, which might be the client side request, server side callback request, client side callback reply and server side reply messages.

If the message has the RelatedTo field this means it’s a reply message, and the value of the RelatedTo is the message ID of the request. Then we will find the channel ID from the dictionary and appended to the message so that only this channel can receive this reply.

If there’s no RelatedTo field this means it’s a request message. On the client side we don’t need to do anything but send the message into the bus with the channel ID append, so that any servers can grab which channel it came. If on the server side, this means it’s a callback request which must be sent to the client channel that fired the original request. As we know we can get the original request from the OperationContext so we can find the original channel ID from the original request message ID from our dictionary.

   1: public void Send(Message message, TimeSpan timeout)
   2: {
   3:     if (message.Headers.RelatesTo != null)
   4:     {
   5:         // when relatesTo is not null it means this is a response message which must be send to the request channel
   6:         // and after sent out the original request had been finished so we don't need to store the original message any more
   7:         // hence we will remove and retrieve the original message id and append to the bus message and send out
   8:         var replyTo = string.Empty;
   9:         _replyTos.TryRemove(message.Headers.RelatesTo, out replyTo);
  10:         if (!string.IsNullOrWhiteSpace(replyTo))
  11:         {
  12:             var content = GetStringFromWcfMessage(message, RemoteAddress);
  13:             _bus.SendReply(content, _isClient, replyTo);
  14:         }
  15:         else
  16:         {
  17:             throw new CommunicationException(string.Format("Cannot find the ReplyTo valid for the message related to {0}.", message.Headers.RelatesTo));
  18:         }
  19:     }
  20:     else
  21:     {
  22:         // on the server side, when performing the callback request we will firstly retrieve the original request message id, 
  23:         // then find the related client channel id. so that we can send the callback request back to the same client channel.
  24:         var sendTo = string.Empty;
  25:         if (!_isClient &&
  26:             OperationContext.Current != null &&
  27:             OperationContext.Current.RequestContext != null &&
  28:             OperationContext.Current.RequestContext.RequestMessage != null &&
  29:             OperationContext.Current.RequestContext.RequestMessage.Headers.MessageId != null)
  30:         {
  31:             var requestMessageId = OperationContext.Current.RequestContext.RequestMessage.Headers.MessageId;
  32:             _replyTos.TryGetValue(requestMessageId, out sendTo);
  33:         }
  34:         var content = GetStringFromWcfMessage(message, RemoteAddress);
  35:         _bus.SendRequest(content, _isClient, ChannelID, sendTo);
  36:     }
  37: }

To finalize the implementation, just create the channel factory, channel listener and update the transport binding element to make it support duplex mode.

   1: public class MessageBusDuplexChannelFactory : MessageBusChannelFactoryBase<IDuplexChannel>
   2: {
   3:     public MessageBusDuplexChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   8:     protected override IDuplexChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  10:         MessageBusChannelFactoryBase<IDuplexChannel> parent,
  11:         Uri via,
  12:         IBus bus)
  13:     {
  14:         return new MessageBusDuplexChannel(bufferManager, encoder, remoteAddress, parent, via, bus, true);
  15:     }
  16: }
   1: public class MessageBusDuplexChannelListener : MessageBusChannelListenerBase<IDuplexChannel>
   2: {
   3:     public MessageBusDuplexChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   8:     protected override IDuplexChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
  10:         MessageBusChannelListenerBase<IDuplexChannel> parent, 
  11:         IBus bus)
  12:     {
  13:         return new MessageBusDuplexChannel(bufferManager, encoder, localAddress, parent, null, bus, false);
  14:     }
  15: }
   1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
   2: {
   3:     return typeof(TChannel) == typeof(IRequestChannel) ||
   4:            typeof(TChannel) == typeof(IOutputChannel) ||
   5:            typeof(TChannel) == typeof(IDuplexChannel);
   6: }
   8: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
   9: {
  10:     return typeof(TChannel) == typeof(IReplyChannel) ||
  11:            typeof(TChannel) == typeof(IInputChannel) ||
  12:            typeof(TChannel) == typeof(IDuplexChannel);
  13: }
  15: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
  16: {
  17:     if (context == null)
  18:     {
  19:         throw new ArgumentNullException("context");
  20:     }
  21:     if (!CanBuildChannelFactory<TChannel>(context))
  22:     {
  23:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
  24:     }
  26:     if (typeof(TChannel) == typeof(IRequestChannel))
  27:     {
  28:         return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
  29:     }
  30:     else if (typeof(TChannel) == typeof(IOutputChannel))
  31:     {
  32:         return (IChannelFactory<TChannel>)(object)new MessageBusOutputChannelFactory(this, context);
  33:     }
  34:     else if (typeof(TChannel) == typeof(IDuplexChannel))
  35:     {
  36:         return (IChannelFactory<TChannel>)(object)new MessageBusDuplexChannelFactory(this, context);
  37:     }
  38:     else
  39:     {
  40:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  41:     }
  43: }
  45: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
  46: {
  47:     if (context == null)
  48:     {
  49:         throw new ArgumentNullException("context");
  50:     }
  51:     if (!CanBuildChannelListener<TChannel>(context))
  52:     {
  53:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  54:     }
  56:     if (typeof(TChannel) == typeof(IReplyChannel))
  57:     {
  58:         return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
  59:     }
  60:     else if (typeof(TChannel) == typeof(IInputChannel))
  61:     {
  62:         return (IChannelListener<TChannel>)(object)new MessageBusInputChannelListener(this, context);
  63:     }
  64:     else if (typeof(TChannel) == typeof(IDuplexChannel))
  65:     {
  66:         return (IChannelListener<TChannel>)(object)new MessageBusDuplexChannelListener(this, context);
  67:     }
  68:     else
  69:     {
  70:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  71:     }
  72: }


Test Our Duplex Channel

Update our test console application to verify the duplex channel works. First we need to define and implement a service which contains the duplex callback. The service contract and the client side callback contract would be like hits.

   1: [ServiceContract(Namespace = "", CallbackContract = typeof(ISampleCallback))]
   2: public interface ISampleService
   3: {
   4:     [OperationContract]
   5:     string Reverse(string content);
   6: }
   8: [ServiceContract(Namespace = "")]
   9: public interface ISampleCallback
  10: {
  11:     [OperationContract]
  12:     string ToUpper(string content);
  14:     [OperationContract]
  15:     string AddSpaces(string content);
  16: }

The service only has one method to reverse an input string. The client callback contract has two methods. One is to make the input string upper, the other is to add space between each of the chars of the string. When implementation, the service method will invoke these two callback methods one by one.

You will see that when the service and client method was invoked I printed the hash code of current instance and the result, to demonstrate the duplex calling flow.

   1: [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]
   2: public class SampleService : ISampleService
   3: {
   4:     public string Reverse(string content)
   5:     {
   6:         var callback = OperationContext.Current.GetCallbackChannel<ISampleCallback>();
   8:         var result1 = new string(content.Reverse().ToArray());
   9:         Console.WriteLine("Service {0}: Reverse {1} => {2}", OperationContext.Current.Host.GetHashCode(), content, result1);
  11:         var result2 = callback.ToUpper(result1);
  12:         Console.WriteLine("Service {0}: Callback.ToUpper {1} => {2}", OperationContext.Current.Host.GetHashCode(), result1, result2);
  13:         var result3 = callback.AddSpaces(result2);
  14:         Console.WriteLine("Service {0}: Callback.AddSpaces {1} => {2}", OperationContext.Current.Host.GetHashCode(), result2, result3);
  16:         return result3;
  17:     }
  18: }
  20: public class SampleCallback : ISampleCallback
  21: {
  22:     public string ToUpper(string content)
  23:     {
  24:         var result = content.ToUpper();
  25:         Console.WriteLine("Client {0}: ToUpper {1} => {2}", this.GetHashCode(), content, result);
  26:         return result;
  27:     }
  29:     public string AddSpaces(string content)
  30:     {
  31:         var result = string.Join(" ", content.Select(c => new string(c, 1)));
  32:         Console.WriteLine("Client {0}: AddSpaces {1} => {2}", this.GetHashCode(), content, result);
  33:         return result;
  34:     }
  35: }

Make sure on the service implementation class you added the ServiceBehavior attribute and set the concurrency mode to multiple or reentrant, otherwise the application will be failed when executing.

Add another helper method to make it easy to create a duplex channel factory and proxy.

   1: static TChannel EstablishDuplexClientProxy<TChannel, TCallback>(IBus bus, string address) where TCallback : new()
   2: {
   3:     var binding = new MessageBusTransportBinding(bus);
   4:     var callbackInstance = new InstanceContext(new TCallback());
   5:     var factory = new DuplexChannelFactory<TChannel>(callbackInstance, binding, address);
   6:     factory.Opened += (sender, e) =>
   7:         {
   8:             Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
   9:         };
  10:     var proxy = factory.CreateChannel();
  11:     return proxy;
  12: }

Finally in the main function we will create some service instances and client proxies, then let the user select a client to send request to the services. In this scaling-out mode all services could be able to pick this request and process the service logic, but within its method all client callback should be received by this client.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
   8:     var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
   9:     var host3 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
  11:     // establish the clients
  12:     var clients = new List<ISampleService>()
  13:     {
  14:         EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address),
  15:         EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address),
  16:         EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address)
  17:     };
  19:     // invoke the service
  20:     Console.WriteLine("Which client do you want to use? (1|2|3, 0 to exit)");
  21:     var idx = int.Parse(Console.ReadLine()) - 1;
  22:     while (idx >= 0 && idx <= clients.Count - 1)
  23:     {
  24:         var proxy = clients[idx];
  25:         Console.WriteLine("Client ({0}): Say something...", proxy.GetHashCode());
  26:         var content = Console.ReadLine();
  27:         var result = proxy.Reverse(content);
  28:         Console.WriteLine("Client ({0}): {1} => {2}", proxy.GetHashCode(), content, result);
  30:         Console.WriteLine("Which client do you want to use? (1|2|3, 0 to exit)");
  31:         idx = int.Parse(Console.ReadLine()) - 1;
  32:     }
  34:     clients.All((cli) =>
  35:         {
  36:             (cli as ICommunicationObject).Close();
  37:             return true;
  38:         });
  39: }

Let’s start the application and have a try.


As you can see, at first the client sent the request to the service (66622070), and it fired the client callback twice, which all received to the same client callback instance (21647132). Then the second request was picked by another service (20876819) and the callbacks were all went to the same client (6451435).



The duplex mode would be the most complex one in three of the WCF MEPs. The duplex channel allows the service and the client to be invoked freely and unlimited once the channel had been established. In our case it becomes more complexity. If the think about the scaling-out mode the service might have more than one instances, so in fact we have a N:N channel shape.

In our solution we utilize the ChannelID as the identifier to ensure the two rules of duplex mode:

  • The reply message must be received by the channel who sent the related request message
  • The callback request must be received by the client who fired the original request message.

The first rule is mandatory, but the second one is optional. In fact it’s not necessary that only the client who fired the duplex request can receive the callback request from the server. All clients if they implemented the callback contract, should be able to handle the callback request. You can try to implement if you are interested.

Now we had finished all the WCF MEPs in our transport extension. Here I can say we have almost done everything. We can scaling-out our service instances on top of our message bus which supports the datagram, request-reply and duplex channel mode.

In the next post I would describe how to handle the session in our transport extension and to see how different between the WCF session and ASP.NET session.


You can download the source code here.


Hope this helps,


All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.


No comments posted yet.
Post A Comment