Shaun Xu

The Sheep-Pen of the Shaun


News

logo

Shaun, the author of this blog is a semi-geek, clumsy developer, passionate speaker and incapable architect with about 10 years experience in .NET. He hopes to prove that software development is art rather than manufacturing. He's into cloud computing platform and technologies (Windows Azure, Aliyun) as well as WCF and ASP.NET MVC. Recently he's falling in love with JavaScript and Node.js.

Currently Shaun is working at IGT Technology Development (Beijing) Co., Ltd. as the architect responsible for product framework design and development.

MVP

My Stats

  • Posts - 97
  • Comments - 348
  • Trackbacks - 0

Tag Cloud


Recent Comments


Recent Posts


Archives


Post Categories



We are almost done everything about the WCF transport extension over the message bus, which makes our services can be scaled out by introducing more instances over machines and servers. We had finished the structure of our transport extension and implemented the request reply mode in the 2nd post, the datagram and duplex mode in the 4th and 5th post. As I have said at the end of the 5th post, currently we can use our transport extension. But there still something left. Although are not that major as the three MEPs implementation, sometimes they are very useful. In this post I will cover the first of them: session.

 

Session in ASP.NET and WCF

If you have the experience developing the ASP.NET application you should be very familiar with the session. Session is a very important feature included in the ASP.NET runtime, and all framework built on top could use it, such as the ASP.NET WebForm, ASP.NET MVC and ASP.NET Dynamic Data.

In ASP.NET, when the client sent the first request to the server, a session ID will be generated on the server side. And by default, there will be a dictionary item created in the web application process (w3wp.exe) to associate with this session ID. Then the session ID will be replied to the client and by default, it will be stored in the cookie. After that every request the client send to the server will pass the session ID within the request body, so that the server would be able to know which this request was come from.

In this way, ASP.NET runtime makes it possible on the server side to save some information per client. When we develop an ASP.NET website we can save and retrieve this information through the SessionState. In ASP.NET runtime it find our session in the dictionary in the server memory by the session ID it has.

What I mentioned above are based on the situation that specified to use the InProc session state mode. If we use the session server or SQL Server, the dictionary and the session object may not be located in the server’s memory.

image

So we can figure out that the ASP.NET session is mainly focus on:

  • ASP.NET session is always be initialized by the server.
  • ASP.NET session solves how to keep the relationship between the requests from the same client.
  • ASP.NET provides a general way to store the session data, but not forced.

Next, let’s have a look on the WCF session. WCF session is more general than ASP.NET session. In WCF, a session just means a connection of messages. It doesn’t care about how to pass the identity between the server and the client. It also doesn’t case about how to store the session at all. As the developer of transport extension we need to figure out how to pass the session ID through the transportation.

MSDN explained the WCF session as following:

  • They are explicitly initiated and terminated by the calling application.
  • Messages delivered during a session are processed in the order in which they are received.
  • Sessions correlate a group of messages into a conversation. The meaning of that correlation is an abstraction. For instance, one session-based channel may correlate messages based on a shared network connection while another session-based channel may correlate messages based on a shared tag in the message body. The features that can be derived from the session depend on the nature of the correlation.
  • There is no general data store associated with a WCF session.

In WCF, a session will be established when the client created the channel and will be terminated once the client channel was closed. Based on the service contract definition the client will ask the server if it needs a sessionful channel. If the transport support session it will create the session and have a session ID associate with all message until this channel is closed. Regarding how to store the shared data between the messages in a session, WCF doesn’t have any principal or guild. By default, WCF combines the session and the service instance lifecycle, so that the developer can utilize the service class local variants to store the intermediate data. But this is not mandatory. In the following sections you will see how we will do to make the session works under our scaling-out architecture.

Apply the Default WCF Session Behavior

By default WCF combines the service instance mode and session to make it easy to developer to use the session. There are three modes could be specified in the service contract:

  • Required: This service requires session.
  • Allow: This service allows the session to be establish, but not mandatory.
  • Not Allow: This service doesn’t allow session.

And there are three service instance context modes which takes the responsible for managing the service instance lifecycle:

  • PerCall: Each client request will make the service create a new instance and handle it.
  • PerSession: A service instance will be created once a session was establish. All requests in this session will be handled by this instance.
  • Single: The service instance will be created once the service was opened. All clients requests will be handled by this instance.

In MSDN there is a very good table that describes the relationship between the session, service instance mode and the default WCF session behavior.

In WCF it utilize a binding element to implement the default session behavior. This binding element takes the currently channel type and if it’s a sessionful channel, it will create some special channels on top of the underlying transport channels to handle the session. For example, if it found that we are going to use session of the datagram, then it will create the ReliableOutputSessionChannelOverRequest on top our own OutputChannel. And it will takes the responsible to create session, maintain the message order etc.

To use this build-in session binding element we can just add the ReliableSessionBindingElement into the binding element collection in our binding class. In MessageBusTransportBinding class I added a local variant which saves the ReliableSessionBindingElement, and with two constructors.

   1: public class MessageBusTransportBinding : Binding
   2: {
   3:     private readonly MessageEncodingBindingElement _messageElement;
   4:     private readonly MessageBusTransportBindingElement _transportElement;
   5:     private readonly ReliableSessionBindingElement _sessionElement;
   6:  
   7:     public MessageBusTransportBinding(IBus bus)
   8:         : this(bus, SessionfulMode.Distributed)
   9:     {
  10:     }
  11:  
  12:     public MessageBusTransportBinding(IBus bus, SessionfulMode sessionfulMode)
  13:         : base()
  14:     {
  15:         _messageElement = new TextMessageEncodingBindingElement();
  16:         _transportElement = new MessageBusTransportBindingElement(bus);
  17:         if (sessionfulMode == SessionfulMode.Standard)
  18:         {
  19:             _sessionElement = new ReliableSessionBindingElement();
  20:         }
  21:     }
  22:  
  23:     ... ...
  24:  
  25: }

And when getting the binding elements we will add it based on what we want.

   1: public override BindingElementCollection CreateBindingElements()
   2: {
   3:     var elements = new BindingElementCollection();
   4:     elements.Add(_messageElement);
   5:     if (_sessionElement != null)
   6:     {
   7:         elements.Add(_sessionElement);
   8:     }
   9:     // the transport binding element must be the last one
  10:     elements.Add(_transportElement);
  11:     return elements.Clone();
  12: }

And, that’s all. Simple? OK, let’s test our sessionful transport extension. Update our service contract and the implementation class. Here I defined the service behavior as required session and the service instance mode was per session.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
   2: public interface ISampleService
   3: {
   4:     [OperationContract(IsOneWay = true)]
   5:     void Add(int value);
   6:  
   7:     [OperationContract(IsOneWay = false)]
   8:     int GetResult();
   9: }
  10:  
  11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
  12: public class SampleService : ISampleService
  13: {
  14:     private int _current;
  15:  
  16:     public void Add(int value)
  17:     {
  18:         _current += value;
  19:     }
  20:  
  21:     public int GetResult()
  22:     {
  23:         return _current;
  24:     }
  25: }

I added a helper method to establish a client proxy.

   1: static TChannel EstablishClientProxy<TChannel>(IBus bus, string address, SessionfulMode sessionfulMode)
   2: {
   3:     var binding = new MessageBusTransportBinding(bus, sessionfulMode);
   4:     var factory = new ChannelFactory<TChannel>(binding, address);
   5:     factory.Opened += (sender, e) =>
   6:     {
   7:         Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
   8:     };
   9:     var proxy = factory.CreateChannel();
  10:     return proxy;
  11: }

Then we make our service hosted on a message bus and two client invoke it.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Standard);
   8:  
   9:     // establish the client
  10:     var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Standard);
  11:     var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Standard);
  12:     using (client1 as IDisposable)
  13:     using (client2 as IDisposable)
  14:     {
  15:         client1.Add(1);
  16:         client2.Add(4);
  17:         
  18:         client1.Add(3);
  19:         client2.Add(5);
  20:         
  21:         client1.Add(2);
  22:         client2.Add(6);
  23:  
  24:         var result1 = client1.GetResult();
  25:         var result2 = client2.GetResult();
  26:         Console.WriteLine("Client 1 Result: {0}", result1);
  27:         Console.WriteLine("Client 2 Result: {0}", result2);
  28:     }
  29:  
  30:     // close the service
  31:     host1.Close();
  32:  
  33:     Console.ReadKey();
  34: }

The execution result was like this. Since our service instance mode was per session, so all requests in the same session will use the same service instance. This is the reason we can use the service class local variant to store the intermediate data.

image

But if we added more service instance it will be failed when client invokes. This is because, under the default ReliableSessionBindingElement WCF need to track the request sequence and it’s not allowed that messages in one session was received by another service instance.

image

Hence in order to support the service scaling-out we must implement our own sessionful channels.

 

Message Bus with Session ID

Before we implement the sessionful channels, we need to send the current session ID between the service and client through the message bus. So need to amend the message structure.

   1: public class BusMessage
   2: {
   3:     public string MessageID { get; private set; }
   4:     public string SessionID { get; private set; }
   5:     public string From { get; private set; }
   6:     public string ReplyTo { get; private set; }
   7:     public string Content { get; private set; }
   8:  
   9:     public BusMessage(string messageId, string sessionId, string fromChannelId, string replyToChannelId, string content)
  10:     {
  11:         MessageID = messageId;
  12:         SessionID = sessionId;
  13:         From = fromChannelId;
  14:         ReplyTo = replyToChannelId;
  15:         Content = content;
  16:     }
  17: }

As well as the IBus interface which allows us to send the session ID.

   1: public interface IBus : IDisposable
   2: {
   3:     string SendRequest(string message, string sessionId, bool fromClient, string from, string to = null);
   4:  
   5:     void SendReply(string message, string sessionId, bool fromClient, string replyTo);
   6:  
   7:     BusMessage Receive(bool fromClient, string replyTo);
   8: }

And the underlying message entity and the in process message bus implementation.

   1: public class InProcMessageEntity
   2: {
   3:     public Guid ID { get; set; }
   4:     public string SessionID { get; set; }
   5:     public string Content { get; set; }
   6:     public bool FromClient { get; set; }
   7:     public string From { get; set; }
   8:     public string To { get; set; }
   9:  
  10:     public InProcMessageEntity()
  11:         : this(string.Empty, string.Empty, false, string.Empty, string.Empty)
  12:     {
  13:     }
  14:  
  15:     public InProcMessageEntity(string content, string sessionId, bool fromClient, string from, string to)
  16:     {
  17:         ID = Guid.NewGuid();
  18:         SessionID = sessionId;
  19:         Content = content;
  20:         FromClient = fromClient;
  21:         From = from;
  22:         To = to;
  23:     }
  24: }
   1: public class InProcMessageBus : IBus
   2: {
   3:     private readonly ConcurrentDictionary<Guid, InProcMessageEntity> _queue;
   4:     private readonly object _lock;
   5:  
   6:     public InProcMessageBus()
   7:     {
   8:         _queue = new ConcurrentDictionary<Guid, InProcMessageEntity>();
   9:         _lock = new object();
  10:     }
  11:  
  12:     public string SendRequest(string message, string sessionId, bool fromClient, string from, string to = null)
  13:     {
  14:         var entity = new InProcMessageEntity(message, sessionId, fromClient, from, to);
  15:         _queue.TryAdd(entity.ID, entity);
  16:         return entity.ID.ToString();
  17:     }
  18:  
  19:     public void SendReply(string message, string sessionId, bool fromClient, string replyTo)
  20:     {
  21:         var entity = new InProcMessageEntity(message, sessionId, fromClient, null, replyTo);
  22:         _queue.TryAdd(entity.ID, entity);
  23:     }
  24:  
  25:     public BusMessage Receive(bool fromClient, string replyTo)
  26:     {
  27:         InProcMessageEntity e = null;
  28:         while (true)
  29:         {
  30:             lock (_lock)
  31:             {
  32:                 var entity = _queue
  33:                     .Where(kvp => kvp.Value.FromClient == fromClient && (kvp.Value.To == replyTo || string.IsNullOrWhiteSpace(kvp.Value.To)))
  34:                     .FirstOrDefault();
  35:                 if (entity.Key != Guid.Empty && entity.Value != null)
  36:                 {
  37:                     _queue.TryRemove(entity.Key, out e);
  38:                 }
  39:             }
  40:             if (e == null)
  41:             {
  42:                 Thread.Sleep(100);
  43:             }
  44:             else
  45:             {
  46:                 return new BusMessage(e.ID.ToString(), e.SessionID, e.From, e.To, e.Content);
  47:             }
  48:         }
  49:     }
  50:  
  51:     public void Dispose()
  52:     {
  53:     }
  54: }

As we had changed the message bus interface, some existing code need to be changed. You can download the final code at end of this post.

 

Sessionful Channels: Request Reply Mode

In WCF if we want to implement our own sessionful channels we need to implement the channel classes for each MEP. Each session channel will have a property that returns the session object. And in WCF there is an interface to define the session which is ISession.

   1: namespace System.ServiceModel.Channels
   2: {
   3:     // Summary:
   4:     //     Defines the interface to establish a shared context among parties that exchange
   5:     //     messages by providing an ID for the communication session.
   6:     public interface ISession
   7:     {
   8:         // Summary:
   9:         //     Gets the ID that uniquely identifies the session.
  10:         //
  11:         // Returns:
  12:         //     The ID that uniquely identifies the session.
  13:         string Id { get; }
  14:     }
  15: }

As you can see the ISession interface only defines a property that returns the session ID. This means, as I said before, WCF doesn’t care about how the session data will be stored and how to establish the sessionful communication.

And regarding each MEP there will be three sub session interfaces in WCF: IOutputSession, IInputSession and IDuplexSession. First two interfaces will be used in the sessionful datagram and request reply mode, the IDuplexSession will be used in the sessionful duplex mode.

   1: // Summary:
   2: //     Defines the interface for the session implemented on the sending side of
   3: //     a one-way communication between messaging endpoints.
   4: public interface IOutputSession : ISession
   5: {
   6: }
   7:  
   8: // Summary:
   9: //     Defines the interface for the session implemented on the receiving side of
  10: //     a one-way communication between messaging endpoints.
  11: public interface IInputSession : ISession
  12: {
  13: }
  14:  
  15: // Summary:
  16: //     Defines the interface for the session implemented on each side of a bi-directional
  17: //     communication between messaging endpoints.
  18: public interface IDuplexSession : IInputSession, IOutputSession, ISession
  19: {
  20:     // Summary:
  21:     //     Begins an asynchronous operation to terminate the outbound session.
  22:     //
  23:     // Parameters:
  24:     //   callback:
  25:     //     The System.AsyncCallback delegate.
  26:     //
  27:     //   state:
  28:     //     An object that contains state information for this request.
  29:     //
  30:     // Returns:
  31:     //     The System.IAsyncResult that references the asynchronous outbound session
  32:     //     termination.
  33:     IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state);
  34:     //
  35:     // Summary:
  36:     //     Begins an asynchronous operation to terminate the outbound session with a
  37:     //     specified timeout within which the operation must complete.
  38:     //
  39:     // Parameters:
  40:     //   timeout:
  41:     //     The System.TimeSpan that specifies the interval of time within which the
  42:     //     operation must complete.
  43:     //
  44:     //   callback:
  45:     //     The System.AsyncCallback delegate.
  46:     //
  47:     //   state:
  48:     //     An object that contains state information for this request.
  49:     //
  50:     // Returns:
  51:     //     The System.IAsyncResult that references the asynchronous outbound session
  52:     //     termination.
  53:     IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state);
  54:     //
  55:     // Summary:
  56:     //     Terminates the outbound session that indicates that no more messages will
  57:     //     be sent from this endpoint on the channel associated with the session.
  58:     void CloseOutputSession();
  59:     //
  60:     // Summary:
  61:     //     Terminates the outbound session that indicates that no more messages will
  62:     //     be sent from this endpoint on the channel associated with the session within
  63:     //     a specified interval of time.
  64:     //
  65:     // Parameters:
  66:     //   timeout:
  67:     //     The System.TimeSpan that specifies the interval of time within which the
  68:     //     operation must complete.
  69:     void CloseOutputSession(TimeSpan timeout);
  70:     //
  71:     // Summary:
  72:     //     Completes an asynchronous operation to terminate the outbound session that
  73:     //     indicates that no more messages will be sent from this endpoint on the channel
  74:     //     associated with the session.
  75:     //
  76:     // Parameters:
  77:     //   result:
  78:     //     The System.IAsyncResult returned by a call to one of the Overload:System.ServiceModel.Channels.IDuplexSession.BeginCloseOutputSession
  79:     //     methods.
  80:     void EndCloseOutputSession(IAsyncResult result);
  81: }

So what we need to do is to implement these sessions, sessionful channels and the related channel factories and listeners.

To make our sample as simple as possible I just implemented the necessary members of these thress session interface, which is the property that returns the session ID.

   1: internal class MessageBusOutputSession : IOutputSession
   2: {
   3:     private string _id;
   4:  
   5:     public string Id
   6:     {
   7:         get
   8:         {
   9:             return _id;
  10:         }
  11:     }
  12:  
  13:     public MessageBusOutputSession(string id)
  14:     {
  15:         _id = id;
  16:     }
  17: }
  18:  
  19: internal class MessageBusInputSession : IInputSession
  20: {
  21:     private string _id;
  22:  
  23:     public string Id
  24:     {
  25:         get
  26:         {
  27:             return _id;
  28:         }
  29:     }
  30:  
  31:     public MessageBusInputSession(string id)
  32:     {
  33:         _id = id;
  34:     }
  35: }
  36:  
  37: internal class MessageBusDuplexSession : IDuplexSession
  38: {
  39:     private string _id;
  40:  
  41:     public MessageBusDuplexSession(string id)
  42:     {
  43:         _id = id;
  44:     }
  45:  
  46:     public IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state)
  47:     {
  48:         throw new NotImplementedException();
  49:     }
  50:  
  51:     public IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state)
  52:     {
  53:         throw new NotImplementedException();
  54:     }
  55:  
  56:     public void CloseOutputSession(TimeSpan timeout)
  57:     {
  58:         throw new NotImplementedException();
  59:     }
  60:  
  61:     public void CloseOutputSession()
  62:     {
  63:         throw new NotImplementedException();
  64:     }
  65:  
  66:     public void EndCloseOutputSession(IAsyncResult result)
  67:     {
  68:         throw new NotImplementedException();
  69:     }
  70:  
  71:     public string Id
  72:     {
  73:         get
  74:         {
  75:             return _id;
  76:         }
  77:     }
  78: }

And then, let’s implement the sessionful channels. The first one is the request reply mode, which are request session channel and reply session channel.

The request session channel will be inherited from the existing MessageBusRequestChannel class, so that we can leverage the existing message operations. And it should implement the IRequestSessionChannel interface. The IRequestSessionChannel  interface need a property of IOutputSession which we had implemented before. So the MessageBusRequestSessionChannel would be like this.

   1: public class MessageBusRequestSessionChannel : MessageBusRequestChannel, IRequestSessionChannel
   2: {
   3:     private IOutputSession _session;
   4:  
   5:     public IOutputSession Session
   6:     {
   7:         get
   8:         {
   9:             return _session;
  10:         }
  11:     }
  12:  
  13:     public MessageBusRequestSessionChannel(
  14:         BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent, 
  15:         EndpointAddress remoteAddress, Uri via,
  16:         IBus bus)
  17:         : base(bufferManager, encoder, parent, remoteAddress, via, bus)
  18:     {
  19:         _session = new MessageBusOutputSession((new UniqueId()).ToString());
  20:     }
  21: }

In our implementation before the client send the request message it will attach the current session ID from its IOutputSession object. So in the base class before the request message was sent we need to provide a chance to let the session ID to be set. Use a virtual method should be a quick way.

In the base class we added a virtual method that can update the session ID, if needed. And then it will send the request message with this session ID.

   1: public Message Request(Message message, TimeSpan timeout)
   2: {
   3:     ThrowIfDisposedOrNotOpen();
   4:     lock (_aLock)
   5:     {
   6:         // unbox the message into string that will be sent into the bus
   7:         var content = GetStringFromWcfMessage(message,_remoteAddress);
   8:         // apply the session from the sub class if needed
   9:         var sessionId = string.Empty;
  10:         OnBeforeRequest(ref sessionId);
  11:         // send the message into bus
  12:         var busMsgId = _bus.SendRequest(content, sessionId, true, null);
  13:         // waiting for the reply message arrive from the bus
  14:         var replyMsg = _bus.Receive(false, busMsgId);
  15:         if (string.IsNullOrWhiteSpace(replyMsg.Content))
  16:         {
  17:             // this means this is a one way channel acknowledge from server
  18:             // we just return null and do nothing
  19:             return null;
  20:         }
  21:         else
  22:         {
  23:             // box the message from the bus message content and return back
  24:             var reply = GetWcfMessageFromString(replyMsg.Content);
  25:             return reply;
  26:         }
  27:     }
  28: }
  29:  
  30: protected virtual void OnBeforeRequest(ref string sessionId)
  31: {
  32: }

Then in the sessionful output channel we can override the virtual method and set the session ID.

   1: protected override void OnBeforeRequest(ref string sessionId)
   2: {
   3:     sessionId = _session.Id;
   4: }

Similarly, on the server side we create the MessageBusReplySessionChannel class which is based on the MessageBusReplyChannel, and implement the interface IReplySessionChannel. On the server side, it needs to retrieve the session ID after it received a message from the bus. So that in the base class we need a virtual method as well. The base class MessageBusReplyChannel would be changed like this.

   1: public MessageBusReplyChannel(
   2:     BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
   3:     EndpointAddress localAddress,
   4:     IBus bus)
   5:     : base(bufferManager, encoder, parent)
   6: {
   7:     _localAddress = localAddress;
   8:     _bus = bus;
   9:     _aLock = new object();
  10:  
  11:     _tryReceiveRequestDelegate = (TimeSpan t, out RequestContext rc) =>
  12:     {
  13:         rc = null;
  14:         // receive the request message from the bus
  15:         var busMsg = _bus.Receive(true, null);
  16:         // box the wcf message
  17:         var message = GetWcfMessageFromString(busMsg.Content);
  18:         // initialize the request context and return
  19:         rc = new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
  20:         OnAfterTryReceiveRequest(busMsg);
  21:         return true;
  22:     };
  23: }
  24:  
  25: protected virtual void OnAfterTryReceiveRequest(BusMessage message)
  26: {
  27: }

And the sessionful channel MessageBusReplySessionChannel  would be like this.

   1: public class MessageBusReplySessionChannel : MessageBusReplyChannel, IReplySessionChannel
   2: {
   3:     private IInputSession _session;
   4:  
   5:     public IInputSession Session
   6:     {
   7:         get
   8:         {
   9:             return _session;
  10:         }
  11:     }
  12:  
  13:     public MessageBusReplySessionChannel(
  14:         BufferManager bufferManager, MessageEncoderFactory encoderFactory, ChannelManagerBase parent,
  15:         EndpointAddress localAddress,
  16:         IBus bus)
  17:         : base(bufferManager, encoderFactory, parent, localAddress, bus)
  18:     {
  19:     }
  20:  
  21:     protected override void OnAfterTryReceiveRequest(BusMessage message)
  22:     {
  23:         _session = new MessageBusInputSession(message.SessionID);
  24:     }
  25: }

The sessionful reply channel need the IInputSession, instead of the IOutputChannel that we are using in the request part.

Last one, implement the related channel factory and channel listener, and modify the transport binding element to let it return the sessionful channels.

   1: public class MessageBusRequestSessionChannelFactory : MessageBusChannelFactoryBase<IRequestSessionChannel>
   2: {
   3:     public MessageBusRequestSessionChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IRequestSessionChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  10:         MessageBusChannelFactoryBase<IRequestSessionChannel> parent, 
  11:         Uri via, 
  12:         IBus bus)
  13:     {
  14:         return new MessageBusRequestSessionChannel(bufferManager, encoder, parent, remoteAddress, via, bus);
  15:     }
  16: }
   1: public class MessageBusReplySessionChannelListener : MessageBusChannelListenerBase<IReplySessionChannel>
   2: {
   3:     public MessageBusReplySessionChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IReplySessionChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
  10:         MessageBusChannelListenerBase<IReplySessionChannel> parent,
  11:         IBus bus)
  12:     {
  13:         return new MessageBusReplySessionChannel(bufferManager, encoder, parent, localAddress, bus);
  14:     }
  15: }
   1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
   2: {
   3:     return typeof(TChannel) == typeof(IRequestChannel) ||
   4:            typeof(TChannel) == typeof(IOutputChannel) ||
   5:            typeof(TChannel) == typeof(IDuplexChannel) ||
   6:            typeof(TChannel) == typeof(IRequestSessionChannel);
   7: }
   8:  
   9: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
  10: {
  11:     return typeof(TChannel) == typeof(IReplyChannel) ||
  12:            typeof(TChannel) == typeof(IInputChannel) ||
  13:            typeof(TChannel) == typeof(IDuplexChannel) ||
  14:            typeof(TChannel) == typeof(IReplySessionChannel);
  15: }
  16:  
  17: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
  18: {
  19:     if (context == null)
  20:     {
  21:         throw new ArgumentNullException("context");
  22:     }
  23:     if (!CanBuildChannelFactory<TChannel>(context))
  24:     {
  25:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
  26:     }
  27:  
  28:     if (typeof(TChannel) == typeof(IRequestChannel))
  29:     {
  30:         return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
  31:     }
  32:     else if (typeof(TChannel) == typeof(IOutputChannel))
  33:     {
  34:         return (IChannelFactory<TChannel>)(object)new MessageBusOutputChannelFactory(this, context);
  35:     }
  36:     else if (typeof(TChannel) == typeof(IDuplexChannel))
  37:     {
  38:         return (IChannelFactory<TChannel>)(object)new MessageBusDuplexChannelFactory(this, context);
  39:     }
  40:     else if (typeof(TChannel) == typeof(IRequestSessionChannel))
  41:     {
  42:         return (IChannelFactory<TChannel>)(object)new MessageBusRequestSessionChannelFactory(this, context);
  43:     }
  44:     else
  45:     {
  46:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  47:     }
  48:  
  49: }
  50:  
  51: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
  52: {
  53:     if (context == null)
  54:     {
  55:         throw new ArgumentNullException("context");
  56:     }
  57:     if (!CanBuildChannelListener<TChannel>(context))
  58:     {
  59:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  60:     }
  61:  
  62:     if (typeof(TChannel) == typeof(IReplyChannel))
  63:     {
  64:         return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
  65:     }
  66:     else if (typeof(TChannel) == typeof(IInputChannel))
  67:     {
  68:         return (IChannelListener<TChannel>)(object)new MessageBusInputChannelListener(this, context);
  69:     }
  70:     else if (typeof(TChannel) == typeof(IDuplexChannel))
  71:     {
  72:         return (IChannelListener<TChannel>)(object)new MessageBusDuplexChannelListener(this, context);
  73:     }
  74:     else if (typeof(TChannel) == typeof(IReplySessionChannel))
  75:     {
  76:         return (IChannelListener<TChannel>)(object)new MessageBusReplySessionChannelListener(this, context);
  77:     }
  78:     else
  79:     {
  80:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  81:     }
  82: }

Now we can test our sessionful request reply channel. Since currently our transport extension supports multiple service instances running over the message bus, so there’s no restrict that we must specify the instance context mode to PerSession or Single for sessionful channel. But as the requests may be processed by any service instances we cannot use the local variant of the service class to store the session state values. What we should do now is to retrieve the current session ID and get/set the intermediate value from the session store, maybe it’s a distributed cache.

image

The image above demonstrates how the session ID will be used when using our new sessionful request reply mode.

  • When client channel was opened, a session ID will be generated on the client side. Let’s say the session ID is ABCDE.
  • Client invoked the service method Add with the value that it wanted to add. The request will be sent to the service with the session ID (ABCDE) attached.
  • Service received this message and take the session ID into its session channel’s session object, so that the service business logic can retrieve this value by using the OperationContext.SessionID.
  • Service business code utilize this session ID to find the value from the session store, add the value and set back to the session store.
  • Client sent next two request with the same session ID.
  • Client request to get the result with the session ID attached.
  • Service utilized the session ID to retrieve the value from the session store and reply back.

Below is an in process session store for test purpose.

   1: public class InProcSessionStore
   2: {
   3:     #region Singleton
   4:  
   5:     private static InProcSessionStore _instance;
   6:  
   7:     public static InProcSessionStore Current
   8:     {
   9:         get
  10:         {
  11:             return _instance;
  12:         }
  13:     }
  14:  
  15:     static InProcSessionStore()
  16:     {
  17:         _instance = new InProcSessionStore();
  18:     }
  19:  
  20:     private InProcSessionStore()
  21:     {
  22:         _dic = new ConcurrentDictionary<string, object>();
  23:     }
  24:  
  25:     #endregion
  26:  
  27:     private ConcurrentDictionary<string, object> _dic;
  28:  
  29:     public void Set(string key, object value)
  30:     {
  31:         _dic.AddOrUpdate(key, value, (k, v) => value);
  32:     }
  33:  
  34:     public T Get<T>(string key)
  35:     {
  36:         var value = default(T);
  37:         object result;
  38:         if (_dic.TryGetValue(key, out result) && result != null && result.GetType() == typeof(T))
  39:         {
  40:             value = (T)result;
  41:         }
  42:         return value;
  43:     }
  44: }

And below is the new service contract and implementation class. You can see I specified the instance context mode to PerCall. And I use the OperationContext.SessionID to get and set the intermediate value.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
   2: public interface ISampleService
   3: {
   4:     [OperationContract(IsOneWay = true)]
   5:     void Add(int value);
   6:  
   7:     [OperationContract(IsOneWay = false)]
   8:     int GetResult();
   9: }
  10:  
  11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
  12: public class SampleService : ISampleService
  13: {
  14:     public void Add(int value)
  15:     {
  16:         //_current += value;
  17:         var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
  18:         current += value;
  19:         InProcSessionStore.Current.Set(OperationContext.Current.SessionId, current);
  20:         Console.WriteLine("[{0}] SampleService.Add({1}), Current = {2}, SessionID = {3}", this.GetHashCode(), value, current, OperationContext.Current.SessionId);
  21:     }
  22:  
  23:     public int GetResult()
  24:     {
  25:         var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
  26:         Console.WriteLine("[{0}] SampleService.GetResult(), SessionID = {1}", this.GetHashCode(), OperationContext.Current.SessionId);
  27:         return current;
  28:     }
  29: }

In the main function I created two services and two clients, each of the client should have its own session ID.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
   8:     var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
   9:  
  10:     // establish the client
  11:     var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
  12:     var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
  13:     using (client1 as IDisposable)
  14:     using (client2 as IDisposable)
  15:     {
  16:         client1.Add(1);
  17:         client2.Add(4);
  18:         
  19:         client1.Add(3);
  20:         client2.Add(5);
  21:         
  22:         client1.Add(2);
  23:         client2.Add(6);
  24:  
  25:         var result1 = client1.GetResult();
  26:         var result2 = client2.GetResult();
  27:         Console.WriteLine("Client 1 Result: {0}", result1);
  28:         Console.WriteLine("Client 2 Result: {0}", result2);
  29:     }
  30:  
  31:     // close the service
  32:     host1.Close();
  33:     host2.Close();
  34:  
  35:     Console.ReadKey();
  36: }

From the following result we can see that based on the PerCall instance mode each client request will create a new service instance. But the session ID would be the same within the same client. So that we can use the session ID to get and set values in the session store.

image

 

Sessionful Channels: Datagram Mode and Duplex Mode

The sessionful datagram mode would be similar as the sessionful request reply mode, but it inherits from our original input and output channel. Since it is sessionful, we also need some extra works to make it fulfill the WCF session requirement.

At the beginning of this post I described that in WCF session, “Messages delivered during a session are processed in the order in which they are received.”. This means on the server side it cannot receive and process the second request until the first one had been done. This is not a problem when we implemented the sessionful request reply channel, since the request reply mode ensure that the request channel must be waiting for the reply message. So that it would be impossible that the next request was sent before the first reply comes.

But the datagram mode doesn’t have this restriction. The sender (output channel) will be returned without waiting for anything replied from the server. Now we need to add some more procedures to make sure that the sessionful datagram mode follow the session requirement, which means the output channel must be waiting for the input channel’s reply, or I should say, input channel’s acknowledge.

The sessionful output will inherit from our own MessageBusOutputChannel and implement the interface IOutputSessionChannel. In order to make it possible to rewrite the send method we need to mark the Send method as virtual in the base class. So that in the sessionful output channel we can override it. We append the message ID into the output message for receiving purpose and send the message as usual. And then we receive the acknowledge message. This will ensure that the next operation will not be fired in this client channel until the reply comes.

   1: public override void Send(Message message, TimeSpan timeout)
   2: {
   3:     // add the message id if not
   4:     var messageId = new System.Xml.UniqueId();
   5:     if (message.Headers.MessageId == null)
   6:     {
   7:         message.Headers.MessageId = messageId;
   8:     }
   9:     // send message with session id
  10:     var content = GetStringFromWcfMessage(message, RemoteAddress);
  11:     _bus.SendRequest(content, _session.Id, true, ChannelID, null);
  12:     // wait for the acknowledge message from the server side
  13:     _bus.Receive(false, messageId.ToString());
  14: }

On the server side in the input channel we also need to override the receive method, so that it can send the acknowledge message back.

   1: public override bool EndTryReceive(IAsyncResult result, out Message message)
   2: {
   3:     var ret = base.EndTryReceive(result, out message);
   4:     // unbox the message id and send the acknowledge message back to the client
   5:     var messageId = message.Headers.MessageId;
   6:     _bus.SendReply(string.Empty, _session.Id, false, messageId.ToString());
   7:     return ret;
   8: }

I don’t want to describe the other modification here for example the channel factory, listener and transport binding element. The full code can be found at the end of this post. Let’s quick jump to have a try of the sessionful datagram mode. The service contract and class would be like this.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
   2: public interface ISampleService
   3: {
   4:     [OperationContract(IsOneWay = true)]
   5:     void Add(int value);
   6:  
   7:     [OperationContract(IsOneWay = true)]
   8:     void GetResult(string id);
   9: }
  10:  
  11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
  12: public class SampleService : ISampleService
  13: {
  14:     public void Add(int value)
  15:     {
  16:         //_current += value;
  17:         var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
  18:         current += value;
  19:         InProcSessionStore.Current.Set(OperationContext.Current.SessionId, current);
  20:         Console.WriteLine("[{0}] SampleService.Add({1}), Current = {2}, SessionID = {3}", this.GetHashCode(), value, current, OperationContext.Current.SessionId);
  21:     }
  22:  
  23:     public void GetResult(string id)
  24:     {
  25:         var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
  26:         Console.WriteLine("[{0}] SampleService.GetResult(), SessionID = {1}", this.GetHashCode(), OperationContext.Current.SessionId);
  27:         InProcSessionStore.Current.Set(id, current);
  28:     }
  29: }

And the main function would be changed as well.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
   8:     var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
   9:  
  10:     // establish the client
  11:     var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
  12:     var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
  13:     using (client1 as IDisposable)
  14:     using (client2 as IDisposable)
  15:     {
  16:         client1.Add(1);
  17:         client2.Add(4);
  18:         
  19:         client1.Add(3);
  20:         client2.Add(5);
  21:         
  22:         client1.Add(2);
  23:         client2.Add(6);
  24:  
  25:         client1.GetResult("1");
  26:         client2.GetResult("2");
  27:         Console.WriteLine("Client 1 Result: {0}", InProcSessionStore.Current.Get<int>("1"));
  28:         Console.WriteLine("Client 2 Result: {0}", InProcSessionStore.Current.Get<int>("2"));
  29:     }
  30:  
  31:     // close the service
  32:     host1.Close();
  33:     host2.Close();
  34:  
  35:     Console.ReadKey();
  36: }

Since in the datagram mode I cannot return any value from the server side so in the GetResult method I copy the current value into the session store so that in the main function I can get the result from there. The execution result would be like this. As you can see the session ID was maintained during the whole message conversation.

image

The last one would be the duplex channel. There’s no special modification so the sessionful channel would be like this.

   1: public class MessageBusDuplexSessionChannel : MessageBusDuplexChannel, IDuplexSessionChannel
   2: {
   3:     private IDuplexSession _session;
   4:  
   5:     public IDuplexSession Session
   6:     {
   7:         get
   8:         {
   9:             return _session;
  10:         }
  11:     }
  12:  
  13:     public MessageBusDuplexSessionChannel(
  14:         BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
  15:         EndpointAddress remoteAddress, Uri via,
  16:         IBus bus,
  17:         bool isClient)
  18:         : base(bufferManager, encoder, remoteAddress, parent, via, bus, isClient)
  19:     {
  20:         _session = new MessageBusDuplexSession((new UniqueId()).ToString());
  21:     }
  22:  
  23:     protected override void OnAfterTryReceive(BusMessage message)
  24:     {
  25:         _session = new MessageBusDuplexSession(message.SessionID);
  26:     }
  27:  
  28:     protected override void OnBeforeSend(ref string sessionId)
  29:     {
  30:         sessionId = _session.Id;
  31:     }
  32: }

 

Summary

In this post I described the basis of the WCF session and how different it is to the ASP.NET session. WCF session is more general than ASP.NET session. It doesn’t care about how session ID should be passed, it doesn’t care how we should handle the session state data. It only ensure that the messages within a session should have the same session ID, at least on one side of the communication.

It’s not mandatory that the session ID must be the same on both server side and client side. It only needs that in one side the session ID should be the same. Hence we can have our transport extension that in one session there’s a session ID on server side, while another ID on client side.

And then we discussed what WCF itself works for session and how it leverage the session mode, instance context mode to make the developer utilize the service class local variant to store the intermediate data. And we also discussed why this is not suitable for our scaling-out requirement.

Then we implemented our own sessionful channels. Developer can use the OperationContext.SessionID as the key to get and set the intermediate data from the session store, so that it could be scaling-out across the service instances.

Till now I can say that we have done all things for WCF transport extension. We have our own binding and transport binding element. We support all three WCF MEP and the additional sessionful MEPs on top our transport, with the scalability on both server and client instances. But I have to note that, the code I mentioned and attached at the end of each posts are just for research purpose. Do NOT use it directly in your production.

For now you will notice that all our testing are based on the in process message bus, which is cannot be used in the real production at all. In the next post I’m going to use some real message bus, by implementing their own IBus classes.

 

The source code can be download here.

 

Hope this helps,

Shaun

All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.

Comments

Gravatar # re: Scaling-out Your Services by Message Bus based WCF Transport Extension – Part 6 – WCF Session
Posted by vietnam travel guides on 4/17/2012 11:33 AM
Thank for useful post. Have a good time!
Post A Comment
Title:
Name:
Email:
Comment:
Verification: