CloudCasts Blog

Webcasts in the Cloud
posts - 130 , comments - 71 , trackbacks - 120

My Links

News

Tag Cloud

Article Categories

Archives

Post Categories

Image Galleries

Bloggers Guides

Windows Azure Service Bus Resequencer

Introduction

I’ll be presenting a session at Sweden Windows Azure Group (SWAG) on Monday, as well as presenting on the Windows Azure Service Bus at various other events. I thought it would be fun to look at implementing some of the Enterprise Integration Patterns using the brokered messaging capabilities. I’ll use this article in the next release of Windows Azure Service Bus Developer Guide, and it will probably end up as a “challenge lab” for my Service Bus course.

As a long-time BizTalk developer I have seen many scenarios where the order of messages sent to a destination system needs is critical to business operations. Updating orders is a good example, if the first update to an order is sent after the second update then data may well be corrupted. Many BizTalk adapters, such as the file and FTP adapters have the potential to shuffle the sequence of messages as they enter BizTalk server, and message channels must be developed to resequence these messages.

The Enterprise Integration Patterns website provides a description of the Resequencer pattern here.

 

There are also scenarios where the sequence of messages can get mixed up when working with Service Bus brokered messaging. When sending a stream of messages using the asynchronous send method there is a good chance that the messages will be placed on the queue out of sequence. The following scenario will use this as an example, and a resequencer will be implemented to restore the order of the messages.

Resequencer Scenario

The scenario used for the resequencer implementation is the transfer of a photo over a Service Bus queue. The photo is broken down into 64 tiles (8 x 8), with each tile being sent in a separate message. When the tiles are received from the queue they are reassembled to form the original image.

I’ve have used this scenario previously when demoing resequencer patterns in BizTalk Server after seeing Shy Cohen use a similar scenario to demo reliable messaging in WCF. It’s great to use for presentations, and the use of an image makes it easy to see when messages are out of sequence.

The application that sends and receives the messages is built using Windows Presentation Foundation (WPF), with a basic user interface to show the original image, and the reassembled image after it has been sent and received on the queue. The application has the option to send messages using the synchronous or asynchronous send methods.

A screenshot of the application after sending the messages synchronously is shown below.

clip_image002

The received image has been assembled correctly from the sequence of message tiles, indicating that there was not a disturbance in the order of the messages. The sending of the messages, however, was not optimal. As they were sent synchronously, with the send operation on one having to complete before the next message can be sent, it took almost 7 seconds to send all 64 messages, at about 9.5 messages per second.

Sending messages asynchronously will provide much better throughput for the sending application. The results of this are shown below.

clip_image004

Using asynchronous send the 64 messages were sent in under half a second, at a rate of over 150 messages per second. In this test, sending messages asynchronously provides better than 15 times the throughput. Sending the messages asynchronously, however, has affected the order in which the messages were received. While most of the messages are in order, the first four messages containing the first half of the book title were the last four messages to be received.

In some scenarios the order of messages is not important, we are only concerned with throughput and reliability, but in this scenario it affects the display of the image. For these kinds of scenarios we need to implement a resequencer.

Resequencer Implementation

In this scenario the resequencer will be implemented as in intermediary service between the source system and the target system. I am using the Image Transfer WPF application to act as the source and target system, but the principle is the same. 

image

The pseudo-code for a possible resequencer is shown below.

while (true)

{

    ReceiveMessage;

    if (message is in sequence)

    {

        ForwardMessage;

        Forward any stored in sequence messages;

    }

    else

    {

        Store message;

    }

}       

 

The design decisions we have to make when implementing a resequencer are as follows:

·       How is the sequence of messages determined?

·       How should the message store be implemented?

Determining the Sequence of Messages

In order for a resequencer to work it must have a means of determining the correct sequence of messages. The BrokeredMessage class provides a SequenceNumber property, which is set by the queue or topic when the message is enqueued, starting with 1 for the first message enqueued.

In some scenarios the messages could have been enqueued in the correct order, and then the sequence changed by a multi-threaded receive. In those scenarios the SequenceNumber could be used to resequencer the messages.

In this scenario the messages are enqueued out of sequence by the multi-threaded asynchronous sending operations. This means that the SequenceNumber property of the dequeued messages will not reflect correct order of the messages. This means that the message sender will have to  provide some means of identifying the sequence of each message.

The following code shows the code used to create messages from the encoded image streams and assign an incrementing send session ID value to the Label property of the message header. The message is then sent synchronously or asynchronously depending on the selection made by the user.

for (int y = 0; y < ImageTiles; y++)

{

    for (int x = 0; x < ImageTiles; x++)

    {

        MemoryStream ms = new MemoryStream();

 

        // Use a delegate as we are accessing UI elements in a different thread.

        SimpleDelegate getImageCrop = delegate()

        {

            // Create a cropped bitmap of the image tile.

            CroppedBitmap cb = null;

            cb = new CroppedBitmap(FileBitmapImage, new Int32Rect

                (x * blockWidth, y * blockHeight, blockWidth, blockHeight));

 

            // Encode the bitmap to the memory stream.

            PngBitmapEncoder encoder = new PngBitmapEncoder();

            encoder.Frames.Add(BitmapFrame.Create(cb));

            encoder.Save(ms);

        };

        this.Dispatcher.Invoke(DispatcherPriority.Send, getImageCrop);

 

        // Create a brokered message using the stream.

        ms.Seek(0, SeekOrigin.Begin);

        BrokeredMessage blockMsg = new BrokeredMessage(ms, true);

 

        // Set the send sequence ID to the message lable.

        blockMsg.Label = sendSequenceId.ToString();

 

        // Send the message using either sync or async.

        if (SendAsync == true)

        {

            queueClient.BeginSend(blockMsg, OnSendComplete,

                new Tuple<QueueClient, string>(queueClient, blockMsg.MessageId));

        }

        else

        {

            queueClient.Send(blockMsg);

        }

 

        // Increment the send sequence ID

        sendSequenceId++;

 

        // Update the progress bar.

        SimpleDelegate updateBar = delegate()

        {

            prgTransfer.Value++;

        };

        this.Dispatcher.BeginInvoke(DispatcherPriority.Send, updateBar);

    }

}

 

 

Delegates have been used here as the code is running on a background worker thread and needs to access the UI elements in the WPF application.

Storing Messages

Any messages that are received out of sequence will need to be stored, and then forwarded to the target system once the previous message in the sequence has been received and forwarded. The message deferral functionality available when receiving messages from queues and subscriptions provides a nice way to store messages that are out of sequence. Provided the messages are received using the peek-lock receive mode the messages can be deferred, and then received again by specifying the appropriate SessionId in the receive method.

Implementing the Resequencer Loop

The resequencer is implemented as a separate WPF application. It receives messages from an inbound queue, resequences them, and sends them, and then sends them on an outbound queue. The code for the main loop of the resequencer is shown below. The receiveSequenceId variable is used to keep track of the sequence of received messages. If the sendSequenceId, which is retrieved from the Label property of the message matches the receiveSequenceId the message is cloned and forwarded and receiveSequenceId is incremented.

When a message has been forwarded, the next message in the sequence may have been received earlier and been deferred. The deferred messages are checked using the dictionary to see if the next message is present. If so it is received, copied, and forwarded, and the process repeated.

If the receiveSequenceId does not match sendSequenceId then the message is out of sequence. When this happens the message is deferred, and the SequenceNumber of the message added to a dictionary with the sendSequenceId used as a key. The SequenceNumber is required to receive the deferred message.

// Initialize the receive sequence ID.

long receiveSequenceId = 1;

 

while (true)

{

    BrokeredMessage msg = inbloudQueueClient.Receive(TimeSpan.FromSeconds(3));

    if (msg != null)

    {

                   

        long sendSequenceId = long.Parse(msg.Label);

 

        // Is the message in sequence?

        if (sendSequenceId == receiveSequenceId)

        {

            // Clone the message and forward it.

            Debug.WriteLine("Forwarding: " + sendSequenceId);

            BrokeredMessage outMsg = CloneBrokeredMessage(msg);

            outboundQueueClient.Send(outMsg);

            msg.Complete();

 

            // Increment the receive sequence ID.

            receiveSequenceId++;

 

            // Check for deferred messages in sequence.

            while (true)

            {

                if (deferredMessageSequenceNumbers.ContainsKey(receiveSequenceId))

                {

                    Console.WriteLine("Sending deferred message: " + receiveSequenceId);

 

                    // Receive the deferred message from the queue using the sequence ID

                    // retrieved from the dictionary.

                    long deferredMessageSequenceNumber =

                        deferredMessageSequenceNumbers[receiveSequenceId];

                    BrokeredMessage msgDeferred =

                        inbloudQueueClient.Receive(deferredMessageSequenceNumber);

                               

                    // Clone the deferred message and send it.

                    BrokeredMessage outMsgDeferred = CloneBrokeredMessage(msgDeferred);

                    outboundQueueClient.Send(outMsgDeferred);

                    msgDeferred.Complete();

                    receiveSequenceId++;

                }

                else

                {

                    // The next message in the sequence is not deferred.

                    break;

                }

 

            }

        }

        else

        {

            // Add the message sequence ID to the dictionary using the send sequence ID

            // then defer the message. We will need the sequence id to receive it.

            deferredMessageSequenceNumbers.Add(sendSequenceId, msg.SequenceNumber);

            msg.Defer();

        }

    }

}

 

 

Testing the Implementation

In order to test the resequencer the image transfer application will send messages to the inbound queue, and receive them from the outbound queue. The results of testing with the application sending messages asynchronously is shown below.

clip_image008

 

When the application was tested with 16 tiles (4 x 4) with tracing added the forwarding and deferring of the messages can clearly be seen.

Deferring: 3

Deferring: 4

Deferring: 5

Deferring: 6

Deferring: 7

Deferring: 8

Deferring: 9

Deferring: 10

Deferring: 11

Deferring: 12

Deferring: 13

Deferring: 14

Deferring: 15

Deferring: 16

Forwarding: 1

Forwarding: 2

Sending deferred message: 3

Sending deferred message: 4

Sending deferred message: 5

Sending deferred message: 6

Sending deferred message: 7

Sending deferred message: 8

Sending deferred message: 9

Sending deferred message: 10

Sending deferred message: 11

Sending deferred message: 12

Sending deferred message: 13

Sending deferred message: 14

Sending deferred message: 15

Sending deferred message: 16

 

Issues with Cloning Messages

When receiving messages from one messaging entity and forwarding them to another the following code should not be used.

// Receive a message from the inbound queue.

BrokeredMessage msg = inbloudQueueClient.Receive(TimeSpan.FromSeconds(3));

 

// Forward the message to the outbound queue.

outboundQueueClient.Send(outMsg);

 

 

It will result in an InvalidOperationException being thrown with the message “A received message cannot be directly sent to another entity. Construct a new message object instead.”.

The resequencer uses a quick and dirty message clone method, the code for this is shown below.

private BrokeredMessage CloneBrokeredMessage (BrokeredMessage msg)

{

    Stream stream = msg.GetBody<Stream>();          

    BrokeredMessage clonedMsg = new BrokeredMessage(stream, true);

    clonedMsg.Label = msg.Label;

    return clonedMsg;

}

 

The code seems to work fine in this scenario, but care must be taken to ensure that the appropriate message properties are copied from the header of the source message to that of the destination message.

 

Alternative Resequencer Implementations

The implementation in the previous section has been developed to demonstrate the principles of a resequencer in a presentation. For this reason it is hosted in a WPF application, and no error handling code has been added. In a real world scenario the resequencer would be either hosted in a service, or alternatively in the target system. Using a worker role in Windows Azure would allow for a cloud-based solution, but the hourly costs may make this prohibitive.

Handling Errors

As well as the standard error handling on sending and receiving messages a resequencer should also handle a scenario when one of the messages in the sequence is missing. If this happens in my scenario all subsequent messages will be deferred, and the system will never recover. There are a number of ways that this could be handled better.

One option would be to set a threshold of a specific number of deferred messages or a specific time interval that would indicate that the missing message is probably lost. When this threshold is reached, an error or warning could be raised, and the sequence could be resumed. This could either be by an administrative action, or automatically. In either case, if the missing message does eventually arrive at the resequencer it can be dead-lettered and another error or warning raised.

Storing State of Deferred Messages

One of the disadvantages of the message deferral design is that the resequencer needs to hold the state of the SequenceNumber values for the deferred messages. If this is lost there is no way to receive these messages from the queue. In my demo scenario I use an in-memory dictionary for this. In a real-world implementation the resequencer should store the SequenceNumber values in a durable store.

Using SessionId for Send Sequence Id

An alternative to using message deferral to store messages the resequencer could be implemented using message sessions. Each session would contain one message, and the SessionId would be set to the sending sequence id. The resequencer (or the receiving application) could then use a session receiver and receive the message from the session by incrementing the value of the session it is listening for to receive the messages in order. The disadvantage of this design is that it would not be possible to use sessions for another purpose in the implementation.

 

Print | posted on Friday, March 23, 2012 10:47 AM | Filed Under [ BizTalk Azure WCF/WF VS2010 AppFabic Windows Azure Service Bus ]

Feedback

Gravatar

# re: Windows Azure Service Bus Resequencer

Good sample, Alan. And fresh idea to show the sequence result as a picture.
4/1/2012 8:56 PM | Leonid Ganelin
Post A Comment
Title:
Name:
Email:
Comment:
Verification:
 
 

Powered by: