Shaun Xu

The Sheep-Pen of the Shaun


News

logo

Shaun, the author of this blog is a semi-geek, clumsy developer, passionate speaker and incapable architect with about 10 years experience in .NET. He hopes to prove that software development is art rather than manufacturing. He's into cloud computing platform and technologies (Windows Azure, Aliyun) as well as WCF and ASP.NET MVC. Recently he's falling in love with JavaScript and Node.js.

Currently Shaun is working at IGT Technology Development (Beijing) Co., Ltd. as the architect responsible for product framework design and development.

MVP


In the previous post I demonstrated how to implement a very basic transport extension over an in memory message bus that supports request reply MEP. At the end of that post I created a console application and establish the service and client directly through the channel listener (on the server side) and the channel factory (on the client side). But this is not the WCF usage that we are familiar with. If you have been using the WCF for a while the common pattern is to create a ServiceHost on the server side and a ChannelFactory<T> on the client side. Then we will using the ChannelFactory<T>.CreateChannel to get an instance of the service contract interface so we can invoke the service remotely as if it’s locally.

Using the “Add Service Reference” in Visual Studio or the “svcutil” is the same way as using the ChannelFactory<T> on the client side. In fact the Visual Studio will call the “svcutil” and generate a client proxy to us, which is similar as the service contract one but will some connection functions.

Based on the transport extension we had finished so far, we can using the ServiceHost and ChannelFactory at once. But since we just implemented the synchronized methods, for example the ReceiveRequest. We must indicate to WCF infrastructure  to use the synchronized methods.

   1: <endpointBehaviors>
   2:   <behavior name="">
   3:     <synchronousReceive />
   4:   </behavior>
   5: </endpointBehaviors>

Now I will describe how to amend our transport extension to support the asynchronous mode channel creation and message operation so that we can use the ServiceHost and ClientFactory<T>, which is the way that we are familiar with. Again, this is not mandatory for transport extension since it works well even though only support the synchronous mode. But in order to maximize the performance and usability, this is very important.

 

Client: ChannelFactory and RequestChannel

On the client side the transport binding element will create the channel factory and it will initialize a request channel object in its CreateChannel method. All of them are invoked synchronously and there’s no related asynchronous methods available. This means in WCF, the client operation will always be invoked synchronously.

Although we can generate a client proxy class from the “svcutil” with the asynchronously methods, this will only affect on the calling thread. The channel factory and channel will still be created and invoked synchronously in its thread.

image

It’s the same on the client side channel, the RequestChannel in our case. It only support the synchronously method to send the request message. So on the client side we don’t need to modify anything.

 

Server: ChannelListener

On the server side it would be a little bit complex. In WCF one service could be able to handle multiple requests at the same time by using the different threads. This means on the server side besides the synchronous methods we must implement the asynchronous ones to support the concurrency mode.

As we know the ChannelListener will create a server side channel when any request came to. In our current code we just initialized a new ReplyChannel instance and returned in the OnAcceptChannel method. But if we are going to use the ServiceHost class to establish the service we must implement its asynchronous method pair, which are OnBeginAcceptChannel and OnEndAcceptChannel. One of the implementation is very simple, we can create a new delegate that point to the OnAcceptChannel method we had done before, and use the asynchronous invoke methods to implement the OnBeginAcceptChannel and the OnEndAcceptChannel. We will define a delegation and assign the OnAcceptChannel to it in the class constructor, and use the BeginInvoke method in the OnBeginAcceptChannel while EndInvoke in the OnEndAcceptChannel method.

   1: public class MessageBusReplyChannelListener : ChannelListenerBase<IReplyChannel>
   2: {
   3:     ... ...
   4:  
   5:     private delegate IReplyChannel AcceptChannelDelegate(TimeSpan timeout);
   6:     private AcceptChannelDelegate _acceptChannelDelegate;
   7:  
   8:     ... ... 
   9:  
  10:     public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
  11:         : base(context.Binding)
  12:     {
  13:         _encoderFactory = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
  14:         _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
  15:         _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
  16:         _bus = transportElement.Bus;
  17:  
  18:         _acceptChannelDelegate = OnAcceptChannel;
  19:     }
  20:  
  21:     ... ...
  22:  
  23:     protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
  24:     {
  25:         return _acceptChannelDelegate.BeginInvoke(timeout, callback, state);
  26:     }
  27:  
  28:     protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
  29:     {
  30:         return _acceptChannelDelegate.EndInvoke(result);
  31:     }
  32:  
  33:     ... ...
  34: }

This is OK and works when using the ServiceHost, and we will see that we are going to use the same way when amend the ReplyChannel part. But this is not the best way of doing the ChannelListener. It’s because if we are doing like this all incoming request will cause the service to try to create a new ReplyChannel, regardless if the server have the enough computing and memory resource available. In WCF we can define the throughput on a server so all client request will be queued on the server, and the server will create the channels based on its availability. Microsoft gives us a very good class to implement this queue with the throughput controlling, as well as the asynchronous enqueue, dequeue and dispatch methods, which the name is InputQueue<T>.

InputQueue<T> was not provided within the BCL of the .NET Framework, although it’s very useful. But there are many ways to get it. One is to use some decompile tool, such as the ILSpy, to get its source code from the System.ServiceModel.dll. It’s an internal class inside the System.ServiceModel.Chanels namespace.

Alternatively you can download the WCF samples and after installed, you can find this class in the folder \WCF\Extensibility\Transport\Udp\CS\UdpTransport\InputQueue.cs. Below this is source code of this class I’m going to use in our example.

   1: // ItemDequeuedCallback is called as an item is dequeued from the InputQueue.  The 
   2: // InputQueue lock is not held during the callback.  However, the user code is
   3: // not notified of the item being available until the callback returns.  If you
   4: // are not sure if the callback blocks for a long time, then first call 
   5: // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
   6: delegate void ItemDequeuedCallback();
   7:  
   8: /// <summary>
   9: /// Handles asynchronous interactions between producers and consumers. 
  10: /// Producers can dispatch available data to the input queue, 
  11: /// where it is dispatched to a waiting consumer or stored until a
  12: /// consumer becomes available. Consumers can synchronously or asynchronously
  13: /// request data from the queue, which is returned when data becomes
  14: /// available.
  15: /// </summary>
  16: /// <typeparam name="T">The concrete type of the consumer objects that are waiting for data.</typeparam>
  17: internal class InputQueue<T> : IDisposable where T : class
  18: {
  19:     //Stores items that are waiting to be accessed.
  20:     private ItemQueue itemQueue;
  21:  
  22:     //Each IQueueReader represents some consumer that is waiting for
  23:     //items to appear in the queue. The readerQueue stores them
  24:     //in an ordered list so consumers get serviced in a FIFO manner.
  25:     private Queue<IQueueReader> readerQueue;
  26:  
  27:     //Each IQueueWaiter represents some waiter that is waiting for
  28:     //items to appear in the queue.  When any item appears, all
  29:     //waiters are signaled.
  30:     private List<IQueueWaiter> waiterList;
  31:  
  32:     private static WaitCallback onInvokeDequeuedCallback;
  33:     private static WaitCallback onDispatchCallback;
  34:     private static WaitCallback completeOutstandingReadersCallback;
  35:     private static WaitCallback completeWaitersFalseCallback;
  36:     private static WaitCallback completeWaitersTrueCallback;
  37:  
  38:     //Represents the current state of the InputQueue.
  39:     //as it transitions through its lifecycle.
  40:     QueueState queueState;
  41:     enum QueueState
  42:     {
  43:         Open,
  44:         Shutdown,
  45:         Closed
  46:     }
  47:  
  48:     public InputQueue()
  49:     {
  50:         this.itemQueue = new ItemQueue();
  51:         this.readerQueue = new Queue<IQueueReader>();
  52:         this.waiterList = new List<IQueueWaiter>();
  53:         this.queueState = QueueState.Open;
  54:     }
  55:  
  56:     public int PendingCount
  57:     {
  58:         get
  59:         {
  60:             lock (ThisLock)
  61:             {
  62:                 return itemQueue.ItemCount;
  63:             }
  64:         }
  65:     }
  66:  
  67:     object ThisLock
  68:     {
  69:         get { return itemQueue; }
  70:     }
  71:  
  72:     public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
  73:     {
  74:         Item item = default(Item);
  75:  
  76:         lock (ThisLock)
  77:         {
  78:             if (queueState == QueueState.Open)
  79:             {
  80:                 if (itemQueue.HasAvailableItem)
  81:                 {
  82:                     item = itemQueue.DequeueAvailableItem();
  83:                 }
  84:                 else
  85:                 {
  86:                     AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
  87:                     readerQueue.Enqueue(reader);
  88:                     return reader;
  89:                 }
  90:             }
  91:             else if (queueState == QueueState.Shutdown)
  92:             {
  93:                 if (itemQueue.HasAvailableItem)
  94:                 {
  95:                     item = itemQueue.DequeueAvailableItem();
  96:                 }
  97:                 else if (itemQueue.HasAnyItem)
  98:                 {
  99:                     AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
 100:                     readerQueue.Enqueue(reader);
 101:                     return reader;
 102:                 }
 103:             }
 104:         }
 105:  
 106:         InvokeDequeuedCallback(item.DequeuedCallback);
 107:         return new TypedCompletedAsyncResult<T>(item.GetValue(), callback, state);
 108:     }
 109:  
 110:     public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
 111:     {
 112:         lock (ThisLock)
 113:         {
 114:             if (queueState == QueueState.Open)
 115:             {
 116:                 if (!itemQueue.HasAvailableItem)
 117:                 {
 118:                     AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
 119:                     waiterList.Add(waiter);
 120:                     return waiter;
 121:                 }
 122:             }
 123:             else if (queueState == QueueState.Shutdown)
 124:             {
 125:                 if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
 126:                 {
 127:                     AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
 128:                     waiterList.Add(waiter);
 129:                     return waiter;
 130:                 }
 131:             }
 132:         }
 133:  
 134:         return new TypedCompletedAsyncResult<bool>(true, callback, state);
 135:     }
 136:  
 137:     static void CompleteOutstandingReadersCallback(object state)
 138:     {
 139:         IQueueReader[] outstandingReaders = (IQueueReader[])state;
 140:  
 141:         for (int i = 0; i < outstandingReaders.Length; i++)
 142:         {
 143:             outstandingReaders[i].Set(default(Item));
 144:         }
 145:     }
 146:  
 147:     static void CompleteWaitersFalseCallback(object state)
 148:     {
 149:         CompleteWaiters(false, (IQueueWaiter[])state);
 150:     }
 151:  
 152:     static void CompleteWaitersTrueCallback(object state)
 153:     {
 154:         CompleteWaiters(true, (IQueueWaiter[])state);
 155:     }
 156:  
 157:     static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
 158:     {
 159:         for (int i = 0; i < waiters.Length; i++)
 160:         {
 161:             waiters[i].Set(itemAvailable);
 162:         }
 163:     }
 164:  
 165:     static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
 166:     {
 167:         if (itemAvailable)
 168:         {
 169:             if (completeWaitersTrueCallback == null)
 170:                 completeWaitersTrueCallback = new WaitCallback(CompleteWaitersTrueCallback);
 171:  
 172:             ThreadPool.QueueUserWorkItem(completeWaitersTrueCallback, waiters);
 173:         }
 174:         else
 175:         {
 176:             if (completeWaitersFalseCallback == null)
 177:                 completeWaitersFalseCallback = new WaitCallback(CompleteWaitersFalseCallback);
 178:  
 179:             ThreadPool.QueueUserWorkItem(completeWaitersFalseCallback, waiters);
 180:         }
 181:     }
 182:  
 183:     void GetWaiters(out IQueueWaiter[] waiters)
 184:     {
 185:         if (waiterList.Count > 0)
 186:         {
 187:             waiters = waiterList.ToArray();
 188:             waiterList.Clear();
 189:         }
 190:         else
 191:         {
 192:             waiters = null;
 193:         }
 194:     }
 195:  
 196:     public void Close()
 197:     {
 198:         ((IDisposable)this).Dispose();
 199:     }
 200:  
 201:     public void Shutdown()
 202:     {
 203:         IQueueReader[] outstandingReaders = null;
 204:         lock (ThisLock)
 205:         {
 206:             if (queueState == QueueState.Shutdown)
 207:                 return;
 208:  
 209:             if (queueState == QueueState.Closed)
 210:                 return;
 211:  
 212:             this.queueState = QueueState.Shutdown;
 213:  
 214:             if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
 215:             {
 216:                 outstandingReaders = new IQueueReader[readerQueue.Count];
 217:                 readerQueue.CopyTo(outstandingReaders, 0);
 218:                 readerQueue.Clear();
 219:             }
 220:         }
 221:  
 222:         if (outstandingReaders != null)
 223:         {
 224:             for (int i = 0; i < outstandingReaders.Length; i++)
 225:             {
 226:                 outstandingReaders[i].Set(new Item((Exception)null, null));
 227:             }
 228:         }
 229:     }
 230:  
 231:     public T Dequeue(TimeSpan timeout)
 232:     {
 233:         T value;
 234:  
 235:         if (!this.Dequeue(timeout, out value))
 236:         {
 237:             throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout));
 238:         }
 239:  
 240:         return value;
 241:     }
 242:  
 243:     public bool Dequeue(TimeSpan timeout, out T value)
 244:     {
 245:         WaitQueueReader reader = null;
 246:         Item item = new Item();
 247:  
 248:         lock (ThisLock)
 249:         {
 250:             if (queueState == QueueState.Open)
 251:             {
 252:                 if (itemQueue.HasAvailableItem)
 253:                 {
 254:                     item = itemQueue.DequeueAvailableItem();
 255:                 }
 256:                 else
 257:                 {
 258:                     reader = new WaitQueueReader(this);
 259:                     readerQueue.Enqueue(reader);
 260:                 }
 261:             }
 262:             else if (queueState == QueueState.Shutdown)
 263:             {
 264:                 if (itemQueue.HasAvailableItem)
 265:                 {
 266:                     item = itemQueue.DequeueAvailableItem();
 267:                 }
 268:                 else if (itemQueue.HasAnyItem)
 269:                 {
 270:                     reader = new WaitQueueReader(this);
 271:                     readerQueue.Enqueue(reader);
 272:                 }
 273:                 else
 274:                 {
 275:                     value = default(T);
 276:                     return true;
 277:                 }
 278:             }
 279:             else // queueState == QueueState.Closed
 280:             {
 281:                 value = default(T);
 282:                 return true;
 283:             }
 284:         }
 285:  
 286:         if (reader != null)
 287:         {
 288:             return reader.Wait(timeout, out value);
 289:         }
 290:         else
 291:         {
 292:             InvokeDequeuedCallback(item.DequeuedCallback);
 293:             value = item.GetValue();
 294:             return true;
 295:         }
 296:     }
 297:  
 298:     public void Dispose()
 299:     {
 300:         Dispose(true);
 301:  
 302:         GC.SuppressFinalize(this);
 303:     }
 304:  
 305:     protected void Dispose(bool disposing)
 306:     {
 307:         if (disposing)
 308:         {
 309:             bool dispose = false;
 310:  
 311:             lock (ThisLock)
 312:             {
 313:                 if (queueState != QueueState.Closed)
 314:                 {
 315:                     queueState = QueueState.Closed;
 316:                     dispose = true;
 317:                 }
 318:             }
 319:  
 320:             if (dispose)
 321:             {
 322:                 while (readerQueue.Count > 0)
 323:                 {
 324:                     IQueueReader reader = readerQueue.Dequeue();
 325:                     reader.Set(default(Item));
 326:                 }
 327:  
 328:                 while (itemQueue.HasAnyItem)
 329:                 {
 330:                     Item item = itemQueue.DequeueAnyItem();
 331:                     item.Dispose();
 332:                     InvokeDequeuedCallback(item.DequeuedCallback);
 333:                 }
 334:             }
 335:         }
 336:     }
 337:  
 338:     public void Dispatch()
 339:     {
 340:         IQueueReader reader = null;
 341:         Item item = new Item();
 342:         IQueueReader[] outstandingReaders = null;
 343:         IQueueWaiter[] waiters = null;
 344:         bool itemAvailable = true;
 345:  
 346:         lock (ThisLock)
 347:         {
 348:             itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
 349:             this.GetWaiters(out waiters);
 350:  
 351:             if (queueState != QueueState.Closed)
 352:             {
 353:                 itemQueue.MakePendingItemAvailable();
 354:  
 355:                 if (readerQueue.Count > 0)
 356:                 {
 357:                     item = itemQueue.DequeueAvailableItem();
 358:                     reader = readerQueue.Dequeue();
 359:  
 360:                     if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
 361:                     {
 362:                         outstandingReaders = new IQueueReader[readerQueue.Count];
 363:                         readerQueue.CopyTo(outstandingReaders, 0);
 364:                         readerQueue.Clear();
 365:  
 366:                         itemAvailable = false;
 367:                     }
 368:                 }
 369:             }
 370:         }
 371:  
 372:         if (outstandingReaders != null)
 373:         {
 374:             if (completeOutstandingReadersCallback == null)
 375:                 completeOutstandingReadersCallback = new WaitCallback(CompleteOutstandingReadersCallback);
 376:  
 377:             ThreadPool.QueueUserWorkItem(completeOutstandingReadersCallback, outstandingReaders);
 378:         }
 379:  
 380:         if (waiters != null)
 381:         {
 382:             CompleteWaitersLater(itemAvailable, waiters);
 383:         }
 384:  
 385:         if (reader != null)
 386:         {
 387:             InvokeDequeuedCallback(item.DequeuedCallback);
 388:             reader.Set(item);
 389:         }
 390:     }
 391:  
 392:     //Ends an asynchronous Dequeue operation.
 393:     public T EndDequeue(IAsyncResult result)
 394:     {
 395:         T value;
 396:  
 397:         if (!this.EndDequeue(result, out value))
 398:         {
 399:             throw new TimeoutException("Asynchronous Dequeue operation timed out.");
 400:         }
 401:  
 402:         return value;
 403:     }
 404:  
 405:     public bool EndDequeue(IAsyncResult result, out T value)
 406:     {
 407:         TypedCompletedAsyncResult<T> typedResult = result as TypedCompletedAsyncResult<T>;
 408:  
 409:         if (typedResult != null)
 410:         {
 411:             value = TypedCompletedAsyncResult<T>.End(result);
 412:             return true;
 413:         }
 414:  
 415:         return AsyncQueueReader.End(result, out value);
 416:     }
 417:  
 418:     public bool EndWaitForItem(IAsyncResult result)
 419:     {
 420:         TypedCompletedAsyncResult<bool> typedResult = result as TypedCompletedAsyncResult<bool>;
 421:         if (typedResult != null)
 422:         {
 423:             return TypedCompletedAsyncResult<bool>.End(result);
 424:         }
 425:  
 426:         return AsyncQueueWaiter.End(result);
 427:     }
 428:  
 429:     public void EnqueueAndDispatch(T item)
 430:     {
 431:         EnqueueAndDispatch(item, null);
 432:     }
 433:  
 434:     public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback)
 435:     {
 436:         EnqueueAndDispatch(item, dequeuedCallback, true);
 437:     }
 438:  
 439:     public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
 440:     {
 441:         Debug.Assert(exception != null, "exception parameter should not be null");
 442:         EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
 443:     }
 444:  
 445:     public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
 446:     {
 447:         Debug.Assert(item != null, "item parameter should not be null");
 448:         EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
 449:     }
 450:  
 451:     void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
 452:     {
 453:         bool disposeItem = false;
 454:         IQueueReader reader = null;
 455:         bool dispatchLater = false;
 456:         IQueueWaiter[] waiters = null;
 457:         bool itemAvailable = true;
 458:  
 459:         lock (ThisLock)
 460:         {
 461:             itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
 462:             this.GetWaiters(out waiters);
 463:  
 464:             if (queueState == QueueState.Open)
 465:             {
 466:                 if (canDispatchOnThisThread)
 467:                 {
 468:                     if (readerQueue.Count == 0)
 469:                     {
 470:                         itemQueue.EnqueueAvailableItem(item);
 471:                     }
 472:                     else
 473:                     {
 474:                         reader = readerQueue.Dequeue();
 475:                     }
 476:                 }
 477:                 else
 478:                 {
 479:                     if (readerQueue.Count == 0)
 480:                     {
 481:                         itemQueue.EnqueueAvailableItem(item);
 482:                     }
 483:                     else
 484:                     {
 485:                         itemQueue.EnqueuePendingItem(item);
 486:                         dispatchLater = true;
 487:                     }
 488:                 }
 489:             }
 490:             else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
 491:             {
 492:                 disposeItem = true;
 493:             }
 494:         }
 495:  
 496:         if (waiters != null)
 497:         {
 498:             if (canDispatchOnThisThread)
 499:             {
 500:                 CompleteWaiters(itemAvailable, waiters);
 501:             }
 502:             else
 503:             {
 504:                 CompleteWaitersLater(itemAvailable, waiters);
 505:             }
 506:         }
 507:  
 508:         if (reader != null)
 509:         {
 510:             InvokeDequeuedCallback(item.DequeuedCallback);
 511:             reader.Set(item);
 512:         }
 513:  
 514:         if (dispatchLater)
 515:         {
 516:             if (onDispatchCallback == null)
 517:             {
 518:                 onDispatchCallback = new WaitCallback(OnDispatchCallback);
 519:             }
 520:  
 521:             ThreadPool.QueueUserWorkItem(onDispatchCallback, this);
 522:         }
 523:         else if (disposeItem)
 524:         {
 525:             InvokeDequeuedCallback(item.DequeuedCallback);
 526:             item.Dispose();
 527:         }
 528:     }
 529:  
 530:     public bool EnqueueWithoutDispatch(T item, ItemDequeuedCallback dequeuedCallback)
 531:     {
 532:         Debug.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
 533:         return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
 534:     }
 535:  
 536:     public bool EnqueueWithoutDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback)
 537:     {
 538:         Debug.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
 539:         return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
 540:     }
 541:  
 542:     // This does not block, however, Dispatch() must be called later if this function
 543:     // returns true.
 544:     bool EnqueueWithoutDispatch(Item item)
 545:     {
 546:         lock (ThisLock)
 547:         {
 548:             // Open
 549:             if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
 550:             {
 551:                 if (readerQueue.Count == 0)
 552:                 {
 553:                     itemQueue.EnqueueAvailableItem(item);
 554:                     return false;
 555:                 }
 556:                 else
 557:                 {
 558:                     itemQueue.EnqueuePendingItem(item);
 559:                     return true;
 560:                 }
 561:             }
 562:         }
 563:  
 564:         item.Dispose();
 565:         InvokeDequeuedCallbackLater(item.DequeuedCallback);
 566:         return false;
 567:     }
 568:  
 569:     static void OnDispatchCallback(object state)
 570:     {
 571:         ((InputQueue<T>)state).Dispatch();
 572:     }
 573:  
 574:     static void InvokeDequeuedCallbackLater(ItemDequeuedCallback dequeuedCallback)
 575:     {
 576:         if (dequeuedCallback != null)
 577:         {
 578:             if (onInvokeDequeuedCallback == null)
 579:             {
 580:                 onInvokeDequeuedCallback = OnInvokeDequeuedCallback;
 581:             }
 582:  
 583:             ThreadPool.QueueUserWorkItem(onInvokeDequeuedCallback, dequeuedCallback);
 584:         }
 585:     }
 586:  
 587:     static void InvokeDequeuedCallback(ItemDequeuedCallback dequeuedCallback)
 588:     {
 589:         if (dequeuedCallback != null)
 590:         {
 591:             dequeuedCallback();
 592:         }
 593:     }
 594:  
 595:     static void OnInvokeDequeuedCallback(object state)
 596:     {
 597:         ItemDequeuedCallback dequeuedCallback = (ItemDequeuedCallback)state;
 598:         dequeuedCallback();
 599:     }
 600:  
 601:     bool RemoveReader(IQueueReader reader)
 602:     {
 603:         lock (ThisLock)
 604:         {
 605:             if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
 606:             {
 607:                 bool removed = false;
 608:  
 609:                 for (int i = readerQueue.Count; i > 0; i--)
 610:                 {
 611:                     IQueueReader temp = readerQueue.Dequeue();
 612:                     if (Object.ReferenceEquals(temp, reader))
 613:                     {
 614:                         removed = true;
 615:                     }
 616:                     else
 617:                     {
 618:                         readerQueue.Enqueue(temp);
 619:                     }
 620:                 }
 621:  
 622:                 return removed;
 623:             }
 624:         }
 625:  
 626:         return false;
 627:     }
 628:  
 629:     public bool WaitForItem(TimeSpan timeout)
 630:     {
 631:         WaitQueueWaiter waiter = null;
 632:         bool itemAvailable = false;
 633:  
 634:         lock (ThisLock)
 635:         {
 636:             if (queueState == QueueState.Open)
 637:             {
 638:                 if (itemQueue.HasAvailableItem)
 639:                 {
 640:                     itemAvailable = true;
 641:                 }
 642:                 else
 643:                 {
 644:                     waiter = new WaitQueueWaiter();
 645:                     waiterList.Add(waiter);
 646:                 }
 647:             }
 648:             else if (queueState == QueueState.Shutdown)
 649:             {
 650:                 if (itemQueue.HasAvailableItem)
 651:                 {
 652:                     itemAvailable = true;
 653:                 }
 654:                 else if (itemQueue.HasAnyItem)
 655:                 {
 656:                     waiter = new WaitQueueWaiter();
 657:                     waiterList.Add(waiter);
 658:                 }
 659:                 else
 660:                 {
 661:                     return false;
 662:                 }
 663:             }
 664:             else // queueState == QueueState.Closed
 665:             {
 666:                 return true;
 667:             }
 668:         }
 669:  
 670:         if (waiter != null)
 671:         {
 672:             return waiter.Wait(timeout);
 673:         }
 674:         else
 675:         {
 676:             return itemAvailable;
 677:         }
 678:     }
 679:  
 680:     interface IQueueReader
 681:     {
 682:         void Set(Item item);
 683:     }
 684:  
 685:     interface IQueueWaiter
 686:     {
 687:         void Set(bool itemAvailable);
 688:     }
 689:  
 690:     class WaitQueueReader : IQueueReader
 691:     {
 692:         Exception exception;
 693:         InputQueue<T> inputQueue;
 694:         T item;
 695:         ManualResetEvent waitEvent;
 696:         object thisLock = new object();
 697:  
 698:         public WaitQueueReader(InputQueue<T> inputQueue)
 699:         {
 700:             this.inputQueue = inputQueue;
 701:             waitEvent = new ManualResetEvent(false);
 702:         }
 703:  
 704:         object ThisLock
 705:         {
 706:             get
 707:             {
 708:                 return this.thisLock;
 709:             }
 710:         }
 711:  
 712:         public void Set(Item item)
 713:         {
 714:             lock (ThisLock)
 715:             {
 716:                 Debug.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
 717:                 Debug.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
 718:  
 719:                 this.exception = item.Exception;
 720:                 this.item = item.Value;
 721:                 waitEvent.Set();
 722:             }
 723:         }
 724:  
 725:         public bool Wait(TimeSpan timeout, out T value)
 726:         {
 727:             bool isSafeToClose = false;
 728:             try
 729:             {
 730:                 if (timeout == TimeSpan.MaxValue)
 731:                 {
 732:                     waitEvent.WaitOne();
 733:                 }
 734:                 else if (!waitEvent.WaitOne(timeout, false))
 735:                 {
 736:                     if (this.inputQueue.RemoveReader(this))
 737:                     {
 738:                         value = default(T);
 739:                         isSafeToClose = true;
 740:                         return false;
 741:                     }
 742:                     else
 743:                     {
 744:                         waitEvent.WaitOne();
 745:                     }
 746:                 }
 747:  
 748:                 isSafeToClose = true;
 749:             }
 750:             finally
 751:             {
 752:                 if (isSafeToClose)
 753:                 {
 754:                     waitEvent.Close();
 755:                 }
 756:             }
 757:  
 758:             value = item;
 759:             return true;
 760:         }
 761:     }
 762:  
 763:     class AsyncQueueReader : AsyncResult, IQueueReader
 764:     {
 765:         static TimerCallback timerCallback = new TimerCallback(AsyncQueueReader.TimerCallback);
 766:  
 767:         bool expired;
 768:         InputQueue<T> inputQueue;
 769:         T item;
 770:         Timer timer;
 771:  
 772:         public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
 773:             : base(callback, state)
 774:         {
 775:             this.inputQueue = inputQueue;
 776:             if (timeout != TimeSpan.MaxValue)
 777:             {
 778:                 this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
 779:             }
 780:         }
 781:  
 782:         public static bool End(IAsyncResult result, out T value)
 783:         {
 784:             AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
 785:  
 786:             if (readerResult.expired)
 787:             {
 788:                 value = default(T);
 789:                 return false;
 790:             }
 791:             else
 792:             {
 793:                 value = readerResult.item;
 794:                 return true;
 795:             }
 796:         }
 797:  
 798:         static void TimerCallback(object state)
 799:         {
 800:             AsyncQueueReader thisPtr = (AsyncQueueReader)state;
 801:             if (thisPtr.inputQueue.RemoveReader(thisPtr))
 802:             {
 803:                 thisPtr.expired = true;
 804:                 thisPtr.Complete(false);
 805:             }
 806:         }
 807:  
 808:         public void Set(Item item)
 809:         {
 810:             this.item = item.Value;
 811:             if (this.timer != null)
 812:             {
 813:                 this.timer.Change(-1, -1);
 814:             }
 815:             Complete(false, item.Exception);
 816:         }
 817:     }
 818:  
 819:     struct Item
 820:     {
 821:         T value;
 822:         Exception exception;
 823:         ItemDequeuedCallback dequeuedCallback;
 824:  
 825:         public Item(T value, ItemDequeuedCallback dequeuedCallback)
 826:             : this(value, null, dequeuedCallback)
 827:         {
 828:         }
 829:  
 830:         public Item(Exception exception, ItemDequeuedCallback dequeuedCallback)
 831:             : this(null, exception, dequeuedCallback)
 832:         {
 833:         }
 834:  
 835:         Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback)
 836:         {
 837:             this.value = value;
 838:             this.exception = exception;
 839:             this.dequeuedCallback = dequeuedCallback;
 840:         }
 841:  
 842:         public Exception Exception
 843:         {
 844:             get { return this.exception; }
 845:         }
 846:  
 847:         public T Value
 848:         {
 849:             get { return value; }
 850:         }
 851:  
 852:         public ItemDequeuedCallback DequeuedCallback
 853:         {
 854:             get { return dequeuedCallback; }
 855:         }
 856:  
 857:         public void Dispose()
 858:         {
 859:             if (value != null)
 860:             {
 861:                 if (value is IDisposable)
 862:                 {
 863:                     ((IDisposable)value).Dispose();
 864:                 }
 865:                 else if (value is ICommunicationObject)
 866:                 {
 867:                     ((ICommunicationObject)value).Abort();
 868:                 }
 869:             }
 870:         }
 871:  
 872:         public T GetValue()
 873:         {
 874:             if (this.exception != null)
 875:             {
 876:                 throw this.exception;
 877:             }
 878:  
 879:             return this.value;
 880:         }
 881:     }
 882:  
 883:     class WaitQueueWaiter : IQueueWaiter
 884:     {
 885:         bool itemAvailable;
 886:         ManualResetEvent waitEvent;
 887:         object thisLock = new object();
 888:  
 889:         public WaitQueueWaiter()
 890:         {
 891:             waitEvent = new ManualResetEvent(false);
 892:         }
 893:  
 894:         object ThisLock
 895:         {
 896:             get
 897:             {
 898:                 return this.thisLock;
 899:             }
 900:         }
 901:  
 902:         public void Set(bool itemAvailable)
 903:         {
 904:             lock (ThisLock)
 905:             {
 906:                 this.itemAvailable = itemAvailable;
 907:                 waitEvent.Set();
 908:             }
 909:         }
 910:  
 911:         public bool Wait(TimeSpan timeout)
 912:         {
 913:             if (timeout == TimeSpan.MaxValue)
 914:             {
 915:                 waitEvent.WaitOne();
 916:             }
 917:             else if (!waitEvent.WaitOne(timeout, false))
 918:             {
 919:                 return false;
 920:             }
 921:  
 922:             return this.itemAvailable;
 923:         }
 924:     }
 925:  
 926:     class AsyncQueueWaiter : AsyncResult, IQueueWaiter
 927:     {
 928:         static TimerCallback timerCallback = new TimerCallback(AsyncQueueWaiter.TimerCallback);
 929:         Timer timer;
 930:         bool itemAvailable;
 931:         object thisLock = new object();
 932:  
 933:         public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state)
 934:             : base(callback, state)
 935:         {
 936:             if (timeout != TimeSpan.MaxValue)
 937:             {
 938:                 this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
 939:             }
 940:         }
 941:  
 942:         object ThisLock
 943:         {
 944:             get
 945:             {
 946:                 return this.thisLock;
 947:             }
 948:         }
 949:  
 950:         public static bool End(IAsyncResult result)
 951:         {
 952:             AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
 953:             return waiterResult.itemAvailable;
 954:         }
 955:  
 956:         static void TimerCallback(object state)
 957:         {
 958:             AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
 959:             thisPtr.Complete(false);
 960:         }
 961:  
 962:         public void Set(bool itemAvailable)
 963:         {
 964:             bool timely;
 965:  
 966:             lock (ThisLock)
 967:             {
 968:                 timely = (this.timer == null) || this.timer.Change(-1, -1);
 969:                 this.itemAvailable = itemAvailable;
 970:             }
 971:  
 972:             if (timely)
 973:             {
 974:                 Complete(false);
 975:             }
 976:         }
 977:     }
 978:  
 979:     class ItemQueue
 980:     {
 981:         Item[] items;
 982:         int head;
 983:         int pendingCount;
 984:         int totalCount;
 985:  
 986:         public ItemQueue()
 987:         {
 988:             items = new Item[1];
 989:         }
 990:  
 991:         public Item DequeueAvailableItem()
 992:         {
 993:             if (totalCount == pendingCount)
 994:             {
 995:                 Debug.Assert(false, "ItemQueue does not contain any available items");
 996:                 throw new Exception("Internal Error");
 997:             }
 998:             return DequeueItemCore();
 999:         }
1000:  
1001:         public Item DequeueAnyItem()
1002:         {
1003:             if (pendingCount == totalCount)
1004:                 pendingCount--;
1005:             return DequeueItemCore();
1006:         }
1007:  
1008:         void EnqueueItemCore(Item item)
1009:         {
1010:             if (totalCount == items.Length)
1011:             {
1012:                 Item[] newItems = new Item[items.Length * 2];
1013:                 for (int i = 0; i < totalCount; i++)
1014:                     newItems[i] = items[(head + i) % items.Length];
1015:                 head = 0;
1016:                 items = newItems;
1017:             }
1018:             int tail = (head + totalCount) % items.Length;
1019:             items[tail] = item;
1020:             totalCount++;
1021:         }
1022:  
1023:         Item DequeueItemCore()
1024:         {
1025:             if (totalCount == 0)
1026:             {
1027:                 Debug.Assert(false, "ItemQueue does not contain any items");
1028:                 throw new Exception("Internal Error");
1029:             }
1030:             Item item = items[head];
1031:             items[head] = new Item();
1032:             totalCount--;
1033:             head = (head + 1) % items.Length;
1034:             return item;
1035:         }
1036:  
1037:         public void EnqueuePendingItem(Item item)
1038:         {
1039:             EnqueueItemCore(item);
1040:             pendingCount++;
1041:         }
1042:  
1043:         public void EnqueueAvailableItem(Item item)
1044:         {
1045:             EnqueueItemCore(item);
1046:         }
1047:  
1048:         public void MakePendingItemAvailable()
1049:         {
1050:             if (pendingCount == 0)
1051:             {
1052:                 Debug.Assert(false, "ItemQueue does not contain any pending items");
1053:                 throw new Exception("Internal Error");
1054:             }
1055:             pendingCount--;
1056:         }
1057:  
1058:         public bool HasAvailableItem
1059:         {
1060:             get { return totalCount > pendingCount; }
1061:         }
1062:  
1063:         public bool HasAnyItem
1064:         {
1065:             get { return totalCount > 0; }
1066:         }
1067:  
1068:         public int ItemCount
1069:         {
1070:             get { return totalCount; }
1071:         }
1072:     }
1073: }

And in order to cooperate with this InputQueue<T> we also need a asynchronous result class, which will be inherited from the IAsyncResult interface.

   1: /// <summary>
   2: /// A generic base class for IAsyncResult implementations
   3: /// that wraps a ManualResetEvent.
   4: /// </summary>
   5: abstract class AsyncResult : IAsyncResult
   6: {
   7:     AsyncCallback callback;
   8:     object state;
   9:     bool completedSynchronously;
  10:     bool endCalled;
  11:     Exception exception;
  12:     bool isCompleted;
  13:     ManualResetEvent manualResetEvent;
  14:     object thisLock;
  15:  
  16:     protected AsyncResult(AsyncCallback callback, object state)
  17:     {
  18:         this.callback = callback;
  19:         this.state = state;
  20:         this.thisLock = new object();
  21:     }
  22:  
  23:     public object AsyncState
  24:     {
  25:         get
  26:         {
  27:             return state;
  28:         }
  29:     }
  30:  
  31:     public WaitHandle AsyncWaitHandle
  32:     {
  33:         get
  34:         {
  35:             if (manualResetEvent != null)
  36:             {
  37:                 return manualResetEvent;
  38:             }
  39:  
  40:             lock (ThisLock)
  41:             {
  42:                 if (manualResetEvent == null)
  43:                 {
  44:                     manualResetEvent = new ManualResetEvent(isCompleted);
  45:                 }
  46:             }
  47:  
  48:             return manualResetEvent;
  49:         }
  50:     }
  51:  
  52:     public bool CompletedSynchronously
  53:     {
  54:         get
  55:         {
  56:             return completedSynchronously;
  57:         }
  58:     }
  59:  
  60:     public bool IsCompleted
  61:     {
  62:         get
  63:         {
  64:             return isCompleted;
  65:         }
  66:     }
  67:  
  68:     object ThisLock
  69:     {
  70:         get
  71:         {
  72:             return this.thisLock;
  73:         }
  74:     }
  75:  
  76:     // Call this version of complete when your asynchronous operation is complete.  This will update the state
  77:     // of the operation and notify the callback.
  78:     protected void Complete(bool completedSynchronously)
  79:     {
  80:         if (isCompleted)
  81:         {
  82:             // It is a bug to call Complete twice.
  83:             throw new InvalidOperationException("Cannot call Complete twice");
  84:         }
  85:  
  86:         this.completedSynchronously = completedSynchronously;
  87:  
  88:         if (completedSynchronously)
  89:         {
  90:             // If we completedSynchronously, then there is no chance that the manualResetEvent was created so
  91:             // we do not need to worry about a race condition.
  92:             Debug.Assert(this.manualResetEvent == null, "No ManualResetEvent should be created for a synchronous AsyncResult.");
  93:             this.isCompleted = true;
  94:         }
  95:         else
  96:         {
  97:             lock (ThisLock)
  98:             {
  99:                 this.isCompleted = true;
 100:                 if (this.manualResetEvent != null)
 101:                 {
 102:                     this.manualResetEvent.Set();
 103:                 }
 104:             }
 105:         }
 106:  
 107:         // If the callback throws, there is a bug in the callback implementation
 108:         if (callback != null)
 109:         {
 110:             callback(this);
 111:         }
 112:     }
 113:  
 114:     // Call this version of complete if you raise an exception during processing.  In addition to notifying
 115:     // the callback, it will capture the exception and store it to be thrown during AsyncResult.End.
 116:     protected void Complete(bool completedSynchronously, Exception exception)
 117:     {
 118:         this.exception = exception;
 119:         Complete(completedSynchronously);
 120:     }
 121:  
 122:     // End should be called when the End function for the asynchronous operation is complete.  It
 123:     // ensures the asynchronous operation is complete, and does some common validation.
 124:     protected static TAsyncResult End<TAsyncResult>(IAsyncResult result)
 125:         where TAsyncResult : AsyncResult
 126:     {
 127:         if (result == null)
 128:         {
 129:             throw new ArgumentNullException("result");
 130:         }
 131:  
 132:         TAsyncResult asyncResult = result as TAsyncResult;
 133:  
 134:         if (asyncResult == null)
 135:         {
 136:             throw new ArgumentException("Invalid async result.", "result");
 137:         }
 138:  
 139:         if (asyncResult.endCalled)
 140:         {
 141:             throw new InvalidOperationException("Async object already ended.");
 142:         }
 143:  
 144:         asyncResult.endCalled = true;
 145:  
 146:         if (!asyncResult.isCompleted)
 147:         {
 148:             asyncResult.AsyncWaitHandle.WaitOne();
 149:         }
 150:  
 151:         if (asyncResult.manualResetEvent != null)
 152:         {
 153:             asyncResult.manualResetEvent.Close();
 154:         }
 155:  
 156:         if (asyncResult.exception != null)
 157:         {
 158:             throw asyncResult.exception;
 159:         }
 160:  
 161:         return asyncResult;
 162:     }
 163: }
 164:  
 165: //An AsyncResult that completes as soon as it is instantiated.
 166: class CompletedAsyncResult : AsyncResult
 167: {
 168:     public CompletedAsyncResult(AsyncCallback callback, object state)
 169:         : base(callback, state)
 170:     {
 171:         Complete(true);
 172:     }
 173:  
 174:     public static void End(IAsyncResult result)
 175:     {
 176:         AsyncResult.End<CompletedAsyncResult>(result);
 177:     }
 178: }
 179:  
 180: //A strongly typed AsyncResult
 181: abstract class TypedAsyncResult<T> : AsyncResult
 182: {
 183:     T data;
 184:  
 185:     protected TypedAsyncResult(AsyncCallback callback, object state)
 186:         : base(callback, state)
 187:     {
 188:     }
 189:  
 190:     public T Data
 191:     {
 192:         get { return data; }
 193:     }
 194:  
 195:     protected void Complete(T data, bool completedSynchronously)
 196:     {
 197:         this.data = data;
 198:         Complete(completedSynchronously);
 199:     }
 200:  
 201:     public static T End(IAsyncResult result)
 202:     {
 203:         TypedAsyncResult<T> typedResult = AsyncResult.End<TypedAsyncResult<T>>(result);
 204:         return typedResult.Data;
 205:     }
 206: }
 207:  
 208: //A strongly typed AsyncResult that completes as soon as it is instantiated.
 209: class TypedCompletedAsyncResult<T> : TypedAsyncResult<T>
 210: {
 211:     public TypedCompletedAsyncResult(T data, AsyncCallback callback, object state)
 212:         : base(callback, state)
 213:     {
 214:         Complete(data, true);
 215:     }
 216:  
 217:     public new static T End(IAsyncResult result)
 218:     {
 219:         TypedCompletedAsyncResult<T> completedResult = result as TypedCompletedAsyncResult<T>;
 220:         if (completedResult == null)
 221:         {
 222:             throw new ArgumentException("Invalid async result.", "result");
 223:         }
 224:  
 225:         return TypedAsyncResult<T>.End(completedResult);
 226:     }
 227: }

I’m not going to explain more deeply into the InputQueue<T> and the AsyncResult classes. If you want to know more about it there’s a very good blog post to be reference.

Then with this new scaffold let’s amend our ChannelListener. First we will define a local variant of the InputQueue<T>, where the type parameter would be the IReplyChannel. This means we will queue all request channel in this class. We also need a local variant for currently using channel and an object for locking.

   1: private readonly InputQueue<IReplyChannel> _channelQueue;
   2: private readonly object _currentChannelLock;
   3: private IReplyChannel _currentChannel;

And initialize them in the constructor.

   1: public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
   2:     : base(context.Binding)
   3: {
   4:     _encoderFactory = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
   5:     _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
   6:     _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
   7:     _bus = transportElement.Bus;
   8:  
   9:     _channelQueue = new InputQueue<IReplyChannel>();
  10:     _currentChannelLock = new object();
  11:     _currentChannel = null;
  12: }

Since the InputQueue<T> need to execute some procedures when terminate we have to implement the OnAbort, OnClose and OnClosed virtual methods to close the InputQueue<T>.

   1: protected override void OnAbort()
   2: {
   3:     try
   4:     {
   5:         lock (ThisLock)
   6:         {
   7:             _channelQueue.Close();
   8:         }
   9:     }
  10:     catch { }
  11: }
  12:  
  13: protected override void OnClose(TimeSpan timeout)
  14: {
  15:     try
  16:     {
  17:         lock (ThisLock)
  18:         {
  19:             _channelQueue.Close();
  20:         }
  21:     }
  22:     catch { }
  23: }
  24:  
  25: protected override void OnClosed()
  26: {
  27:     base.OnClosed();
  28:  
  29:     try
  30:     {
  31:         _bufferManager.Clear();
  32:         _bus.Dispose();
  33:     }
  34:     catch { }
  35: }

And then we will create an internal method that ensure the current channel is available. It will check if the local variant _currentChannel is null. If yes then it will create a new channel from another private method called CreateChannel, which we will implement in the next step, and assign the OnChannelClosed method to its Closed event, and then enqueued into the InputQueue<IRequestChannel>. If not, which means there’s a channel object available, we will not do anything here.

   1: private void EnsureChannelAvailable()
   2: {
   3:     IReplyChannel newChannel = null;
   4:     bool channelCreated = false;
   5:  
   6:     if ((newChannel = _currentChannel) == null)
   7:     {
   8:         lock (_currentChannelLock)
   9:         {
  10:             if ((newChannel = _currentChannel) == null)
  11:             {
  12:                 newChannel = CreateChannel(_bufferManager, _encoder, new EndpointAddress(_uri), this, _bus);
  13:                 newChannel.Closed += new EventHandler(OnChannelClosed);
  14:                 _currentChannel = newChannel;
  15:                 channelCreated = true;
  16:             }
  17:         }
  18:     }
  19:  
  20:     if (channelCreated)
  21:     {
  22:         _channelQueue.EnqueueAndDispatch(newChannel);
  23:     }
  24: }
  25:  
  26: private IReplyChannel CreateChannel(
  27:     BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress, MessageBusReplyChannelListener parent, IBus bus)
  28: {
  29:     return new MessageBusReplyChannel(_bufferManager, _encoder, localAddress, parent, _bus);
  30: }
  31:  
  32: private void OnChannelClosed(object sender, EventArgs e)
  33: {
  34:     var channel = sender as IReplyChannel;
  35:     lock (_currentChannelLock)
  36:     {
  37:         if (channel == _currentChannel)
  38:         {
  39:             _currentChannel = null;
  40:         }
  41:     }
  42: }

Now we can use them to finish the asynchronous BeginOnAcceptChannel, EndOnAcceptChannel, and we will also amend the synchronous OnAcceptChannel as well.

   1: protected override IReplyChannel OnAcceptChannel(TimeSpan timeout)
   2: {
   3:     if (!IsDisposed)
   4:     {
   5:         EnsureChannelAvailable();
   6:     }
   7:  
   8:     IReplyChannel channel = null;
   9:     if (_channelQueue.Dequeue(timeout, out channel))
  10:     {
  11:         return channel;
  12:     }
  13:     else
  14:     {
  15:         throw new TimeoutException();
  16:     }
  17: }
  18:  
  19: protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
  20: {
  21:     if (!IsDisposed)
  22:     {
  23:         EnsureChannelAvailable();
  24:     }
  25:  
  26:     return _channelQueue.BeginDequeue(timeout, callback, state);
  27: }
  28:  
  29: protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
  30: {
  31:     IReplyChannel channel;
  32:     if (_channelQueue.EndDequeue(result, out channel))
  33:     {
  34:         return channel;
  35:     }
  36:     else
  37:     {
  38:         throw new TimeoutException();
  39:     }
  40: }

 

Server: ReplyChannel and RequestContext

The ReplyChannel created from the ChannelListener also need to support the asynchronous functions in order to make it work with the ServiceHost. In this case we will just use the asynchronous delegate invoke. Since we have had the synchronous ReceiveRequest method, what we need to do is to create a delegate from it, and invoke its Begin and End invoke method in the BeginTryRecevieRequest and the EndTryReceiveRequest methods.

   1: private delegate bool TryReceiveRequestDelegate(TimeSpan timeout, out RequestContext context);
   2: private TryReceiveRequestDelegate _tryReceiveRequestDelegate;

Initialize this delegate based on what we did in the ReceiveRequest method.

   1: public MessageBusReplyChannel(
   2:     BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress address,
   3:     MessageBusReplyChannelListener parent,
   4:     IBus bus)
   5:     : base(parent)
   6: {
   7:     _bufferManager = bufferManager;
   8:     _encoder = encoder.CreateSessionEncoder();
   9:  
  10:     _localAddress = address;
  11:     _bus = bus;
  12:     _aLock = new object();
  13:  
  14:     _tryReceiveRequestDelegate = (TimeSpan t, out RequestContext rc) =>
  15:     {
  16:         rc = null;
  17:         // receive the request message from the bus
  18:         var busMsg = _bus.Receive(true, null);
  19:         // box the wcf message
  20:         var raw = Encoding.UTF8.GetBytes(busMsg.Content);
  21:         var data = _bufferManager.TakeBuffer(raw.Length);
  22:         Buffer.BlockCopy(raw, 0, data, 0, raw.Length);
  23:         var buffer = new ArraySegment<byte>(data, 0, raw.Length);
  24:         var message = _encoder.ReadMessage(buffer, _bufferManager);
  25:         // initialize the request context and return
  26:         rc = new MessageBusRequestContext(message, this, _bufferManager, _encoder, _localAddress, _bus, busMsg.MessageID);
  27:         return true;
  28:     };
  29: }

And invoke it asynchronously in the BeginTryRecevieRequest and the EndTryReceiveRequest methods.

   1: public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
   2: {
   3:     RequestContext context;
   4:     return _tryReceiveRequestDelegate.BeginInvoke(timeout, out context, callback, state);
   5: }
   6:  
   7: public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext context)
   8: {
   9:     var ret = _tryReceiveRequestDelegate.EndInvoke(out context, result);
  10:     return ret;
  11: }

The ReplyChannel had been modified finished and we don’t need to change anything in the RequestContext. Then let’s have a try.

 

Use the ServiceHost and ChannelFactory Mode

Now we will use the way that you are familiar with to test our transport extension. There is no channel listener, no channel in our test code, and the service business logic will not be mixed within the service hosting code. Let’s have a look.

First we will create a service contract interface and a service class which implement it. Again, we will use the string reverse as the sample.

   1: [ServiceContract(Namespace="http://wcf.shaunxu.me/")]
   2: public interface ISampleService
   3: {
   4:     [OperationContract]
   5:     string Reverse(string content);
   6: }
   7:  
   8: public class SampleService : ISampleService
   9: {
  10:     public string Reverse(string content)
  11:     {
  12:         return new string(content.Reverse().ToArray());
  13:     }
  14: }

And in the main function we will create and open the service by using the ServiceHost. But since we didn’t implement any configuration extension so for now we have to use the code to set the binding and address.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the service
   7:     var host = new ServiceHost(typeof(SampleService));
   8:     var svcBinding = new MessageBusTransportBinding(bus);
   9:     host.AddServiceEndpoint(typeof(ISampleService), svcBinding, address);
  10:     host.Opened += (sender, e) =>
  11:         {
  12:             Console.WriteLine("Service opened at {0}", host.Description.Endpoints[0].ListenUri);
  13:         };
  14:     host.Open();
  15: }

And we will use the ChannelFactory<TChannel> to create a client side proxy.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     ... ...
   8:  
   9:     // establish the client
  10:     var cliBinding = new MessageBusTransportBinding(bus);
  11:     var factory = new ChannelFactory<ISampleService>(cliBinding, address);
  12:     factory.Opened += (sender, e) =>
  13:         {
  14:             Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
  15:         };
  16:     var proxy = factory.CreateChannel();
  17: }

Then finally invoke the service by the proxy the ChannelFactory generate to us.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the service
   7:     ... ...
   8:  
   9:     // establish the client
  10:     ... ...
  11:  
  12:     // invoke the service
  13:     using (proxy as IDisposable)
  14:     {
  15:         Console.WriteLine("Say something...");
  16:         var content = Console.ReadLine();
  17:         while (!string.IsNullOrEmpty(content))
  18:         {
  19:             var result = proxy.Reverse(content);
  20:             Console.WriteLine("RESULT: {0} => {1}", content, result);
  21:             Console.WriteLine("Say something...");
  22:             content = Console.ReadLine();
  23:         }
  24:     }
  25: }

And this is the screenshot when I was running this code on my machine.

image

But don’t forget our goal, scaling-out on the server instances. Similar as what we did before, we will create more than one ServiceHost instance that listening on the same endpoint, which can handle the client request at the same time, to simulate the cross process or cross machine scaling-out scenario.

   1: class Program
   2: {
   3:     static void Main(string[] args)
   4:     {
   5:         var bus = new InProcMessageBus();
   6:         var address = "net.bus://localhost/sample";
   7:  
   8:         // establish the services
   9:         var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
  10:         var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
  11:         var host3 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
  12:  
  13:         // establish the client
  14:         var cliBinding = new MessageBusTransportBinding(bus);
  15:         var factory = new ChannelFactory<ISampleService>(cliBinding, address);
  16:         factory.Opened += (sender, e) =>
  17:             {
  18:                 Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
  19:             };
  20:         var proxy = factory.CreateChannel();
  21:  
  22:         // invoke the service
  23:         using (proxy as IDisposable)
  24:         {
  25:             Console.WriteLine("Say something...");
  26:             var content = Console.ReadLine();
  27:             while (!string.IsNullOrEmpty(content))
  28:             {
  29:                 var result = proxy.Reverse(content);
  30:                 Console.WriteLine("RESULT: {0} => {1}", content, result);
  31:                 Console.WriteLine("Say something...");
  32:                 content = Console.ReadLine();
  33:             }
  34:         }
  35:     }
  36:  
  37:     static ServiceHost EstablishServiceHost<TChannel, TService>(IBus bus, string address)
  38:     {
  39:         var host = new ServiceHost(typeof(TService));
  40:         var binding = new MessageBusTransportBinding(bus);
  41:         host.AddServiceEndpoint(typeof(TChannel), binding, address);
  42:         host.Opened += (sender, e) =>
  43:         {
  44:             Console.WriteLine("Service ({0}) opened at {1}", host.GetHashCode(), host.Description.Endpoints[0].ListenUri);
  45:         };
  46:         host.Open();
  47:         return host;
  48:     }
  49: }

As you can see I created three hosts and one client. In order to display which service host is handing the request I add one line code inside the service implementation class that display the current service host’s hash code.

   1: public class SampleService : ISampleService
   2: {
   3:     public string Reverse(string content)
   4:     {
   5:         Console.WriteLine("SERVICE: Invoked on the service host instance {0}.", OperationContext.Current.Host.GetHashCode());
   6:         return new string(content.Reverse().ToArray());
   7:     }
   8: }

So let’s have a look on what’s going on.

image

As you can see I invoked the service method four times and the first three of them was handled by the three different service host, which means scaling-out to my service instances. And the fourth one was handled by the first service host instance.

 

Summary

In this post I described how to amend our extension so that we can use it in a better way for service definition, implementation, hosting and client invoking. Once we implemented the relevant asynchronous methods in the ChannelListener and ReplyChannel we can use the ServiceContract interface to define the service contract, and use the ServiceHost to let WCF create the service class instance, open the service. On the client side we can use the ChannelFactory<TChannel> to get the proxy class of our ServiceContract interface. All of them are what we had been doing for a long time and familiar with.

 

Till now we had finished the basis of the transport extension, even though we had just implemented the request reply MEP. But as what I said in the first post, all other MEPs can be implemented by introducing the more ChannelListener, ChannelFactory and Channel.

But things always not as simple as we expected. In the next post I will show you how to implement the datagram MEP, with some code refactoring, with some bug fix.

 

You can download the source code of this post here.

 

Hope this helps,

Shaun

All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.

Comments

No comments posted yet.
Post A Comment
Title:
Name:
Email:
Comment:
Verification: