CloudCasts Blog

Webcasts in the Cloud
posts - 112, comments - 68, trackbacks - 120

My Links

News

Tag Cloud

Article Categories

Archives

Post Categories

Image Galleries

Bloggers Guides

Monday, May 21, 2012

Sweden Windows Azure Group Meeting - Migrating Applications to Windows Azure & Sharding And Scaling with RavenDB, with Shay Friedman & Oren Eini

Wednesday, May 23, 6:00 PM, Stockholm

Migrating Applications to Windows Azure – Shay Friedman

Windows Azure is the next big thing for server-side applications and one of its major use cases is hosting existing .NET applications. However, Window Azure is not your regular playground and some preparations are necessary. In this session, Shay Friedman will take you through different ways to make sure your application is ready to move to the cloud. In addition, Friedman will explain how you can estimate the cost of running your web application in the cloud.

Sharding And Scaling with RavenDB – Oren Eini

From the get go, RavenDB was designed with sharding in mind. But sharding was always a complex topic, and it scared people off. Following the same principles that guides us with the rest of RavenDB design, we have taken sharding to the next level, made it easier to work with, performant and self optimizing.

Come to this talk with Ayende Rahien to discover RavenDB sharding, discuss scaling scenarios and see how we can use RavenDB in a high traffic scenarios.

Shay Friedman

Shay Friedman is a Visual C#/IronRuby MVP and the author of IronRuby Unleashed. With more than 10 years of experience in the software industry, Friedman now works in CodeValue, a company he has co-founded, where he creates products for developers, consults and conducts courses around the world about web development and dynamic languages. You can visit his blog at http://IronShay.com.

Oren Eini (Ayende Rahien)

Oren Eini has over 15 years of experience in the development world with a strong focus on the Microsoft and .NET ecosystem and has been awarded the Microsoft’s Most Valuable Professional since 2007. An internationally known presenter, Oren has spoken at conferences such as DevTeach, JAOO, QCon, Oredev, NDC,...

Posted On Monday, May 21, 2012 9:20 AM | Feedback (0) | Filed Under [ Azure ]

Monday, May 07, 2012

Weekly Cloud Newsround 2012-18

Filtering the informative, insightful and quirky from the fire hose of cloud-based hype.

Mike Pearl from PWC Looks At Cloud Strategy Through The Lens Of Value in a guest post for Forbes, Michelle Boisvert discusses the Real and perceived security threats of cloud computing, abd Greenpeace is still mad about ‘dirty cloudy computing’.

CTV News shares its opinions on Which cloud storage solution is best for your data?, whilst V3 discusses the security and performance issues that may slow the adoption of cloud storage. Addressing the security issues, the Washington Post looks at ways to keep the cloud secure.

On the Azure front, there is a nice story about how MediaValet are Thriving on Microsoft's Cloud Platform, and Mary-Jo writes about How Twilio went from handshake to Microsoft Azure partner in a month-plus.

Posted On Monday, May 07, 2012 8:21 AM | Feedback (0) |

Saturday, April 28, 2012

Clemens Vasters @ Sweden Windows Azure Group Stockholm

Clemens Vasters was kind enough to round off his visit to Norway and Sweden by presenting a session on Hybrid Applications for the Sweden Windows Azure User Group (SWAG) in Stockholm. After presenting two sessions at TechDays he ditched Power Point and gave a two hour improvised presentation that spread across three whiteboards.

WP_000019

He covered a number of scenarios where the Windows Azure Service Bus could be used to enhance the reach of existing on-premise systems, and also how internet enabled devices can be hooked up to the service bus to provide control and reporting functionality.

WP_000021

Big thanks to Clemens for finding time is his busy schedule to present for us.

If you would like to sign up to the Sweden Windows Azure Group in Stockholm, you can do that here: http://swagmembership.eventbrite.com/

If you are in the south of Sweden, or Copenhagen Magnus Mårtensson, aka noopman, is kicking of a branch of SWAG in Malmö. The premier event will be at foocafe on Thursday 24th May. The signup for that event is here: http://foocafe.org/#event-cloud-core-values-from-the-source-with-magnus-martensson-and-glenn-block

Posted On Saturday, April 28, 2012 2:37 PM | Feedback (0) |

Friday, April 27, 2012

Weekly Cloud Newsround 2012-17

Filtering the informative, insightful and quirky from the fire hose of cloud-based hype.

Google’s “Google Drive” has been leading the news this week, and has sparked off some debate about the data storage “In the Cloud”. LA Times has a nice comparison of the different storage options available, and also asks “Who owns your stuff in the Google Drive cloud?

Two more stores on the data storage side, Microsoft will bundle CommVault Simpana with Windows Azure “The new bundle, unveiled Monday, provides access to up to 62 TBs of cloud storage capacity along with CommVault's Simpana 9 Express data protection capabilities for prices starting at about $50,000”. Also Aidmatrix Expands to the Microsoft Windows Azure Cloud to Enhance Its Disaster Response Platform.

Big data seems to be big business there is a nice overview of big data here. As for big data transfer, academics get a break with Microsoft waiving Azure bandwidth fees for research universities.

On the security side if you are working with Azure the new book from Packt “Microsoft Windows Identity Foundation Cookbook” should go on your shopping list.

I can’t decide if this is fun or serious, but the “Home Security System Using Kinect, Azure, Windows Phone and Windows 8” makes good use of emerging technologies. I wonder if they have a “Caught red-handed!” achievement…

Posted On Friday, April 27, 2012 8:49 AM | Feedback (1) |

Friday, April 20, 2012

Stockholm, Friday 27th April: Hybrid Applications with Clemens Vasters + Win a Hot Air Balloon Ride!

The Sweden Windows Azure Group (SWAG) will hold a session on “Hybrid Applications – Building Solutions that span On-Premises Assets and the Cloud” with Clemens Vasters, principle technical lead for the Windows Azure Service Bus. It should be a great session with a lot of inside information from the guy behind the technology.

Microsoft Sweden has been kind enough to give us three “Windows Azure Hot Air Balloon” ride tickets, which we were tempted to use ourselves, but will raffle to three lucky attendees at the meeting. A great way to “move to the cloud…”.

BaloonSmall

Date: Friday 27th April, 17:30 – 20:30
Location: knowit Stockholm: Klarabergsgatan 60 4tr

Register here: http://swag10.eventbrite.com/

See you there.

Posted On Friday, April 20, 2012 10:55 AM | Feedback (0) |

Weekly Cloud Newsround 2012-16

Filtering the informative, insightful and quirky from the fire hose of cloud-based hype.

The announcement of the Windows Azure hosted media platform has been leading the Azure news stories this week with articles my Mary-Jo, streamingmedia.com and V3. I’m keen to get into testing the preview of this, it would be a great technology to use for the webcasts on my CloudCasts site.

Hovhannes Avoyan continues his Windows Azure Overview series with part 4 looking at Security. The links to the four articles are:

Windows Azure Overview

Windows Azure Overview Part 2: Pros and Cons

Windows Azure Overview Part 3: All About Azure Pricing

Windows Azure Overview Part 4: Security

HPC Wire has an interesting article “Cycle Computing Creates 50,000-Core Supercomputer Through Amazon”, the ability to provision massive compute power for short periods of time is a great use of cloud computing.

Cloud Times has an interesting article on “The Rising Value of Cloud Computing”, whilst Greenpeace question the environmental friendliness of the cloud, naming Apple and Amazon as the main offenders. Apple defend their green credentials in the Guardian, with a nice video of a there new data center, hoping to provide 10% of their energy through solar power.

If you are “not ready to move to the cloud”, you could consider moving to the beach to take a look at Crab Computing. Seems interesting, but I have reservations about high availability when there is a spring tide…

Posted On Friday, April 20, 2012 9:14 AM | Feedback (0) |

Monday, April 16, 2012

Windows Azure Service Bus Scatter-Gather Implementation

One of the more challenging enterprise integration patterns that developers may wish to implement is the Scatter-Gather pattern. In this article I will show the basic implementation of a scatter-gather pattern using the topic-subscription model of the windows azure service bus. I’ll be using the implementation in demos, and also as a lab in my training courses, and the pattern will also be included in the next release of my free e-book the “Windows Azure Service Bus Developer Guide”.

The Scatter-Gather pattern answers the following scenario.

How do you maintain the overall message flow when a message needs to be sent to multiple recipients, each of which may send a reply?

clip_image001

Use a Scatter-Gather that broadcasts a message to multiple recipients and re-aggregates the responses back into a single message.

The Enterprise Integration Patterns website provides a description of the Scatter-Gather pattern here.

 

The scatter-gather pattern uses a composite of the publish-subscribe channel pattern and the aggregator pattern. The publish-subscribe channel is used to broadcast messages to a number of receivers, and the aggregator is used to gather the response messages and aggregate them together to form a single message.

Scatter-Gather Scenario

The scenario for this scatter-gather implementation is an application that allows users to answer questions in a poll based voting scenario. A poll manager application will be used to broadcast questions to users, the users will use a voting application that will receive and display the questions and send the votes back to the poll manager. The poll manager application will receive the users’ votes and aggregate them together to display the results. The scenario should be able to scale to support a large number of users.

 

Scatter-Gather Implementation

The diagram below shows the overall architecture for the scatter-gather implementation.

clip_image003

 

 

 

Messaging Entities

Looking at the scatter-gather pattern diagram it can be seen that the topic-subscription architecture is well suited for broadcasting a message to a number of subscribers. The poll manager application can send the question messages to a topic, and each voting application can receive the question message on its own subscription. The static limit of 2,000 subscriptions per topic in the current release means that 2,000 voting applications can receive question messages and take part in voting.

The vote messages can then be sent to the poll manager application using a queue. The voting applications will send their vote messages to the queue, and the poll manager will receive and process the vote messages.

The questions topic and answer queue are created using the Windows Azure Developer Portal. Each instance of the voting application will create its own subscription in the questions topic when it starts, allowing the question messages to be broadcast to all subscribing voting applications.

Data Contracts

Two simple data contracts will be used to serialize the questions and votes as brokered messages. The code for these is shown below.

 

[DataContract]

public class Question

{

    [DataMember]

    public string QuestionText { get; set; }

}

 

 

To keep the implementation of the voting functionality simple and focus on the pattern implementation, the users can only vote yes or no to the questions.

 

[DataContract]

public class Vote

{

    [DataMember]

    public string QuestionText { get; set; }

 

    [DataMember]

    public bool IsYes { get; set; }

}

 

 

Poll Manager Application

The poll manager application has been implemented as a simple WPF application; the user interface is shown below.

clip_image004

A question can be entered in the text box, and sent to the topic by clicking the Add button. The topic and subscriptions used for broadcasting the messages are shown in a TreeView control. The questions that have been broadcast and the resulting votes are shown in a ListView control.

When the application is started any existing subscriptions are cleared form the topic, clients are then created for the questions topic and votes queue, along with background workers for receiving and processing the vote messages, and updating the display of subscriptions.

 

public MainWindow()

{

    InitializeComponent();

 

    // Create a new results list and data bind it.

    Results = new ObservableCollection<Result>();

    lsvResults.ItemsSource = Results;

 

    // Create a token provider with the relevant credentials.

    TokenProvider credentials =

        TokenProvider.CreateSharedSecretTokenProvider

        (AccountDetails.Name, AccountDetails.Key);

 

    // Create a URI for the serivce bus.

    Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri

        ("sb", AccountDetails.Namespace, string.Empty);

 

    // Clear out any old subscriptions.

    NamespaceManager = new NamespaceManager(serviceBusUri, credentials);

    IEnumerable<SubscriptionDescription> subs =

        NamespaceManager.GetSubscriptions(AccountDetails.ScatterGatherTopic);

    foreach (SubscriptionDescription sub in subs)

    {

        NamespaceManager.DeleteSubscription(sub.TopicPath, sub.Name);

    }

 

    // Create the MessagingFactory

    MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);

 

    // Create the topic and queue clients.

    ScatterGatherTopicClient =

        factory.CreateTopicClient(AccountDetails.ScatterGatherTopic);

    ScatterGatherQueueClient =

        factory.CreateQueueClient(AccountDetails.ScatterGatherQueue);

 

    // Start the background worker threads.

    VotesBackgroundWorker = new BackgroundWorker();

    VotesBackgroundWorker.DoWork += new DoWorkEventHandler(ReceiveMessages);

    VotesBackgroundWorker.RunWorkerAsync();

 

    SubscriptionsBackgroundWorker = new BackgroundWorker();

    SubscriptionsBackgroundWorker.DoWork += new DoWorkEventHandler(UpdateSubscriptions);

    SubscriptionsBackgroundWorker.RunWorkerAsync();

}

 

 

When the poll manager user nters a question in the text box and clicks the Add button a question message is created and sent to the topic. This message will be broadcast to all the subscribing voting applications. An instance of the Result class is also created to keep track of the votes cast, this is then added to an observable collection named Results, which is data-bound to the ListView control.

 

private void btnAddQuestion_Click(object sender, RoutedEventArgs e)

{

    // Create a new result for recording votes.

    Result result = new Result()

    {

        Question = txtQuestion.Text

    };

    Results.Add(result);

 

    // Send the question to the topic

    Question question = new Question()

    {

        QuestionText = result.Question

    };

    BrokeredMessage msg = new BrokeredMessage(question);

    ScatterGatherTopicClient.Send(msg);

 

    txtQuestion.Text = "";

}

 

 

The Results class is implemented as follows.

 

public class Result : INotifyPropertyChanged

{

    public string Question { get; set; }

 

    private int m_YesVotes;

    private int m_NoVotes;

 

    public event PropertyChangedEventHandler PropertyChanged;

 

    public int YesVotes

    {

        get { return m_YesVotes; }

        set

        {

            m_YesVotes = value;

            NotifyPropertyChanged("YesVotes");

        }

    }

 

    public int NoVotes

    {

        get { return m_NoVotes; }

        set

        {

            m_NoVotes = value;

            NotifyPropertyChanged("NoVotes");

        }

    }

 

    private void NotifyPropertyChanged(string prop)

    {

        if(PropertyChanged != null)

        {

            PropertyChanged(this, new PropertyChangedEventArgs(prop));

        }

    }

}

 

 

The INotifyPropertyChanged interface is implemented so that changes to the number of yes and no votes will be updated in the ListView control.

Receiving the vote messages from the voting applications is done asynchronously, using a background worker thread.

 

// This runs on a background worker.

private void ReceiveMessages(object sender, DoWorkEventArgs e)

{

    while (true)

    {

        // Receive a vote message from the queue

        BrokeredMessage msg = ScatterGatherQueueClient.Receive();

        if (msg != null)

        {

            // Deserialize the message.

            Vote vote = msg.GetBody<Vote>();

 

            // Update the results.

            foreach (Result result in Results)

            {

                if (result.Question.Equals(vote.QuestionText))

                {

                    if (vote.IsYes)

                    {

                        result.YesVotes++;

                    }

                    else

                    {

                        result.NoVotes++;

                    }

                    break;

                }

            }

 

            // Mark the message as complete.

            msg.Complete();

        }

 

    }

}

 

 

When a vote message is received, the result that matches the vote question is updated with the vote from the user. The message is then marked as complete.

A second background thread is used to update the display of subscriptions in the TreeView, with a dispatcher used to update the user interface.

// This runs on a background worker.

private void UpdateSubscriptions(object sender, DoWorkEventArgs e)

{

    while (true)

    {

        // Get a list of subscriptions.

        IEnumerable<SubscriptionDescription> subscriptions =

            NamespaceManager.GetSubscriptions(AccountDetails.ScatterGatherTopic);

 

        // Update the user interface.

        SimpleDelegate setQuestion = delegate()

        {

            trvSubscriptions.Items.Clear();

            TreeViewItem topicItem = new TreeViewItem()

            {

                Header = AccountDetails.ScatterGatherTopic

            };

 

            foreach (SubscriptionDescription subscription in subscriptions)

            {

                TreeViewItem subscriptionItem = new TreeViewItem()

                {

                    Header = subscription.Name

                };

                topicItem.Items.Add(subscriptionItem);

            }

            trvSubscriptions.Items.Add(topicItem);

 

            topicItem.ExpandSubtree();

        };

        this.Dispatcher.BeginInvoke(DispatcherPriority.Send, setQuestion);

 

        Thread.Sleep(3000);

    }

}

 

 

 

Voting Application

The voting application is implemented as another WPF application. This one is more basic, and allows the user to vote “Yes” or “No” for the questions sent by the poll manager application. The user interface for that application is shown below.

clip_image005

When an instance of the voting application is created it will create a subscription in the questions topic using a GUID as the subscription name. The application can then receive copies of every question message that is sent to the topic.

Clients for the new subscription and the votes queue are created, along with a background worker to receive the question messages. The voting application is set to receiving mode, meaning it is ready to receive a question message from the subscription.

 

public MainWindow()

{

    InitializeComponent();

 

    // Set the mode to receiving.

    IsReceiving = true;

 

    // Create a token provider with the relevant credentials.

    TokenProvider credentials =

        TokenProvider.CreateSharedSecretTokenProvider

        (AccountDetails.Name, AccountDetails.Key);

 

    // Create a URI for the serivce bus.

    Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri

        ("sb", AccountDetails.Namespace, string.Empty);

 

    // Create the MessagingFactory

    MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);

 

    // Create a subcription for this instance

    NamespaceManager mgr = new NamespaceManager(serviceBusUri, credentials);

    string subscriptionName = Guid.NewGuid().ToString();

    mgr.CreateSubscription(AccountDetails.ScatterGatherTopic, subscriptionName);

 

    // Create the subscription and queue clients.

    ScatterGatherSubscriptionClient = factory.CreateSubscriptionClient

        (AccountDetails.ScatterGatherTopic, subscriptionName);

    ScatterGatherQueueClient =

        factory.CreateQueueClient(AccountDetails.ScatterGatherQueue);

 

    // Start the background worker thread.

    BackgroundWorker = new BackgroundWorker();

    BackgroundWorker.DoWork += new DoWorkEventHandler(ReceiveMessages);

    BackgroundWorker.RunWorkerAsync();

}

 

 

I took the inspiration for creating the subscriptions in the voting application from the chat application that uses topics and subscriptions blogged by Ovais Akhter here.

The method that receives the question messages runs on a background thread. If the application is in receive mode, a question message will be received from the subscription, the question will be displayed in the user interface, the voting buttons enabled, and IsReceiving set to false to prevent more questing from being received before the current one is answered.

 

// This runs on a background worker.

private void ReceiveMessages(object sender, DoWorkEventArgs e)

{

    while (true)

    {

        if (IsReceiving)

        {

            // Receive a question message from the topic.

            BrokeredMessage msg = ScatterGatherSubscriptionClient.Receive();

            if (msg != null)

            {

                // Deserialize the message.

                Question question = msg.GetBody<Question>();

 

                // Update the user interface.

                SimpleDelegate setQuestion = delegate()

                {

                    lblQuestion.Content = question.QuestionText;

                    btnYes.IsEnabled = true;

                    btnNo.IsEnabled = true;

                };

                this.Dispatcher.BeginInvoke(DispatcherPriority.Send, setQuestion);

                IsReceiving = false;

 

                // Mark the message as complete.

                msg.Complete();

            }

        }

        else

        {

            Thread.Sleep(1000);

        }

    }

}

 

 

When the user clicks on the Yes or No button, the btnVote_Click method is called. This will create a new Vote data contract with the appropriate question and answer and send the message to the poll manager application using the votes queue. The user voting buttons are then disabled, the question text cleared, and the IsReceiving flag set to true to allow a new message to be received.

 

private void btnVote_Click(object sender, RoutedEventArgs e)

{

    // Create a new vote.

    Vote vote = new Vote()

    {

        QuestionText = (string)lblQuestion.Content,

        IsYes = ((sender as Button).Content as string).Equals("Yes")

    };

 

    // Send the vote message.

    BrokeredMessage msg = new BrokeredMessage(vote);

    ScatterGatherQueueClient.Send(msg);

 

    // Update the user interface.

    lblQuestion.Content = "";

    btnYes.IsEnabled = false;

    btnNo.IsEnabled = false;

    IsReceiving = true;

}

 

 

Testing the Application

In order to test the application, an instance of the poll manager application is started; the user interface is shown below.

clip_image006

As no instances of the voting application have been created there are no subscriptions present in the topic. When an instance of the voting application is created the subscription will be displayed in the poll manager.

clip_image007

clip_image008

Now that a voting application is subscribing, a questing can be sent from the poll manager application. When the message is sent to the topic, the voting application will receive the message and display the question.

clip_image009

clip_image005[1]

The voter can then answer the question by clicking on the appropriate button. The results of the vote are updated in the poll manager application.

clip_image010

When two more instances of the voting application are created, the poll manager will display the new subscriptions. More questions can then be broadcast to the voting applications.

clip_image011

As the question messages are queued up in the subscription for each voting application, the users can answer the questions in their own time. The vote messages will be received by the poll manager application and aggregated to display the results. The screenshots of the applications part way through voting are shown below.

clip_image012

clip_image013

clip_image014

clip_image015

The messages for each voting application are queued up in sequence on the voting application subscriptions, allowing the questions to be answered at different speeds by the voters.

Posted On Monday, April 16, 2012 9:53 AM | Feedback (0) |

Saturday, April 14, 2012

Weekly Cloud Roundup 2012-15

Filtering the informative, insightful and quirky from the fire hose of cloud-based hype.

Irving Wladawsky-Berger provides some great insight into The Complex Transition to the Cloud, sharing his views on the slow adoption of cloud computing in organizations. “…a prediction by the research firm Gartner that while cloud computing will continue to grow at almost 20 percent a year, it will account for less than 5 percent of totally IT spending in 2015.” With a more positive mindset, Balaji Viswanathan highlights 7 Salient Trends and Directions in Cloud Computing that could be shaping the industry over the next few years.

Cloud computing also looks to save energy “A small business with 100 users that moved the Microsoft applications to the cloud could cut energy use and carbon emissions by 90%. Large organizations with 10,000 users saw a 30% reduction.” More on that story here.

The expansion of Windows Azure has been in the news with the announcement of “East US” and “West US” datacenters; this was covered by Visual Studio Magazine and Mary-Jo, and according to thenextweb.com Microsoft are also building $112 million data center in Wyoming.

The cloud price war is still in full swing with Joe Panettieri discussing the pricing of Windows Azure and Office 365 and asking How Low Can It Go?

Posted On Saturday, April 14, 2012 10:15 AM | Feedback (0) |

Tuesday, April 10, 2012

Windows Azure Service Bus Splitter and Aggregator

This article will cover basic implementations of the Splitter and Aggregator patterns using the Windows Azure Service Bus. The content will be included in the next release of the “Windows Azure Service Bus Developer Guide”, along with some other patterns I am working on.

I’ve taken the pattern descriptions from the book “Enterprise Integration Patterns” by Gregor Hohpe. I bought a copy of the book in 2004, and recently dusted it off when I started to look at implementing the patterns on the Windows Azure Service Bus. Gregor has also presented an session in 2011 “Enterprise Integration Patterns: Past, Present and Future” which is well worth a look.

I’ll be covering more patterns in the coming weeks, I’m currently working on Wire-Tap and Scatter-Gather. There will no doubt be a section on implementing these patterns in my “SOA, Connectivity and Integration using the Windows Azure Service Bus” course.

There are a number of scenarios where a message needs to be divided into a number of sub messages, and also where a number of sub messages need to be combined to form one message. The splitter and aggregator patterns provide a definition of how this can be achieved. This section will focus on the implementation of basic splitter and aggregator patens using the Windows Azure Service Bus direct programming model.

In BizTalk Server receive pipelines are typically used to implement the splitter patterns, with sequential convoy orchestrations often used to aggregate messages. In the current release of the Service Bus, there is no functionality in the direct programming model that implements these patterns, so it is up to the developer to implement them in the applications that send and receive messages.

Splitter

A message splitter takes a message and spits the message into a number of sub messages. As there are different scenarios for how a message can be split into sub messages, message splitters are implemented using different algorithms.

The Enterprise Integration Patterns book describes the splatter pattern as follows:

How can we process a message if it contains multiple elements, each of which may have to be processed in a different way?

image

Use a Splitter to break out the composite message into a series of individual messages, each containing data related to one item.

The Enterprise Integration Patterns website provides a description of the Splitter pattern here.

In some scenarios a batch message could be split into the sub messages that are contained in the batch. The splitting of a message could be based on the message type of sub-message, or the trading partner that the sub message is to be sent to.

Aggregator

An aggregator takes a stream or related messages and combines them together to form one message.

The Enterprise Integration Patterns book describes the aggregator pattern as follows:

How do we combine the results of individual, but related messages so that they can be processed as a whole?

image

Use a stateful filter, an Aggregator, to collect and store individual messages until a complete set of related messages has been received. Then, the Aggregator publishes a single message distilled from the individual messages.

The Enterprise Integration Patterns website provides a description of the Aggregator pattern here.

A common example of the need for an aggregator is in scenarios where a stream of messages needs to be combined into a daily batch to be sent to a legacy line-of-business application. The BizTalk Server EDI functionality provides support for batching messages in this way using a sequential convoy orchestration.

Scenario

The scenario for this implementation of the splitter and aggregator patterns is the sending and receiving of large messages using a Service Bus queue. In the current release, the Windows Azure Service Bus currently supports a maximum message size of 256 KB, with a maximum header size of 64 KB. This leaves a safe maximum body size of 192 KB.

The BrokeredMessage class will support messages larger than 256 KB; in fact the Size property is of type long, implying that very large messages may be supported at some point in the future. The 256 KB size restriction is set in the service bus components that are deployed in the Windows Azure data centers.

One of the ways of working around this size restriction is to split large messages into a sequence of smaller sub messages in the sending application, send them via a queue, and then reassemble them in the receiving application. This scenario will be used to demonstrate the pattern implementations.

Implementation

The splitter and aggregator will be used to provide functionality to send and receive large messages over the Windows Azure Service Bus. In order to make the implementations generic and reusable they will be implemented as a class library. The splitter will be implemented in the LargeMessageSender class and the aggregator in the LargeMessageReceiver class. A class diagram showing the two classes is shown below.

image

Implementing the Splitter

The splitter will take a large brokered message, and split the messages into a sequence of smaller sub-messages that can be transmitted over the service bus messaging entities. The LargeMessageSender class provides a Send method that takes a large brokered message as a parameter. The implementation of the class is shown below; console output has been added to provide details of the splitting operation.

public class LargeMessageSender

{

    private static int SubMessageBodySize = 192 * 1024;

    private QueueClient m_QueueClient;

 

    public LargeMessageSender(QueueClient queueClient)

    {

        m_QueueClient = queueClient;

    }

 

    public void Send(BrokeredMessage message)

    {

        // Calculate the number of sub messages required.

        long messageBodySize = message.Size;

        int nrSubMessages = (int)(messageBodySize / SubMessageBodySize);

        if (messageBodySize % SubMessageBodySize != 0)

        {

            nrSubMessages++;

        }

 

        // Create a unique session Id.

        string sessionId = Guid.NewGuid().ToString();

        Console.WriteLine("Message session Id: " + sessionId);

        Console.Write("Sending {0} sub-messages", nrSubMessages);

 

        Stream bodyStream = message.GetBody<Stream>();

        for (int streamOffest = 0; streamOffest < messageBodySize;

            streamOffest += SubMessageBodySize)

        {                        

            // Get the stream chunk from the large message

            long arraySize = (messageBodySize - streamOffest) > SubMessageBodySize

                ? SubMessageBodySize : messageBodySize - streamOffest;

            byte[] subMessageBytes = new byte[arraySize];

            int result = bodyStream.Read(subMessageBytes, 0, (int)arraySize);

            MemoryStream subMessageStream = new MemoryStream(subMessageBytes);

 

            // Create a new message

            BrokeredMessage subMessage = new BrokeredMessage(subMessageStream, true);

            subMessage.SessionId = sessionId;

 

            // Send the message

            m_QueueClient.Send(subMessage);

            Console.Write(".");

        }

        Console.WriteLine("Done!");

    }

}

The LargeMessageSender class is initialized with a QueueClient that is created by the sending application. When the large message is sent, the number of sub messages is calculated based on the size of the body of the large message. A unique session Id is created to allow the sub messages to be sent as a message session, this session Id will be used for correlation in the aggregator. A for loop in then used to create the sequence of sub messages by creating chunks of data from the stream of the large message. The sub messages are then sent to the queue using the QueueClient.

As sessions are used to correlate the messages, the queue used for message exchange must be created with the RequiresSession property set to true.

Implementing the Aggregator

The aggregator will receive the sub messages in the message session that was created by the splitter, and combine them to form a single, large message. The aggregator is implemented in the LargeMessageReceiver class, with a Receive method that returns a BrokeredMessage. The implementation of the class is shown below; console output has been added to provide details of the splitting operation.

 


public class LargeMessageReceiver

{

    private QueueClient m_QueueClient;

 

    public LargeMessageReceiver(QueueClient queueClient)

    {

        m_QueueClient = queueClient;

    }

 

    public BrokeredMessage Receive()

    {

        // Create a memory stream to store the large message body.

        MemoryStream largeMessageStream = new MemoryStream();

 

        // Accept a message session from the queue.

        MessageSession session = m_QueueClient.AcceptMessageSession();

        Console.WriteLine("Message session Id: " + session.SessionId);

        Console.Write("Receiving sub messages");

 

        while (true)

        {

            // Receive a sub message

            BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));

 

            if (subMessage != null)

            {

                // Copy the sub message body to the large message stream.

                Stream subMessageStream = subMessage.GetBody<Stream>();

                subMessageStream.CopyTo(largeMessageStream);

 

                // Mark the message as complete.

                subMessage.Complete();

                Console.Write(".");

            }

            else

            {

                // The last message in the sequence is our completeness criteria.

                Console.WriteLine("Done!");

                break;

            }

        }

           

        // Create an aggregated message from the large message stream.

        BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);

        return largeMessage;

    }

}

 

The LargeMessageReceiver initialized using a QueueClient that is created by the receiving application. The receive method creates a memory stream that will be used to aggregate the large message body. The AcceptMessageSession method on the QueueClient is then called, which will wait for the first message in a message session to become available on the queue. As the AcceptMessageSession can throw a timeout exception if no message is available on the queue after 60 seconds, a real-world implementation should handle this accordingly.

Once the message session as accepted, the sub messages in the session are received, and their message body streams copied to the memory stream. Once all the messages have been received, the memory stream is used to create a large message, that is then returned to the receiving application.

Testing the Implementation

The splitter and aggregator are tested by creating a message sender and message receiver application. The payload for the large message will be one of the webcast video files from http://www.cloudcasts.net/, the file size is 9,697 KB, well over the 256 KB threshold imposed by the Service Bus.

As the splitter and aggregator are implemented in a separate class library, the code used in the sender and receiver console is fairly basic. The implementation of the main method of the sending application is shown below.

 

static void Main(string[] args)

{

    // Create a token provider with the relevant credentials.

    TokenProvider credentials =

        TokenProvider.CreateSharedSecretTokenProvider

        (AccountDetails.Name, AccountDetails.Key);

 

    // Create a URI for the serivce bus.

    Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri

        ("sb", AccountDetails.Namespace, string.Empty);

 

    // Create the MessagingFactory

    MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);

 

    // Use the MessagingFactory to create a queue client

    QueueClient queueClient = factory.CreateQueueClient(AccountDetails.QueueName);

 

    // Open the input file.

    FileStream fileStream = new FileStream(AccountDetails.TestFile, FileMode.Open);

 

    // Create a BrokeredMessage for the file.

    BrokeredMessage largeMessage = new BrokeredMessage(fileStream, true);

 

    Console.WriteLine("Sending: " + AccountDetails.TestFile);

    Console.WriteLine("Message body size: " + largeMessage.Size);

    Console.WriteLine();

 

 

    // Send the message with a LargeMessageSender

    LargeMessageSender sender = new LargeMessageSender(queueClient);

    sender.Send(largeMessage);

 

    // Close the messaging facory.

    factory.Close(); 

}

The implementation of the main method of the receiving application is shown below.

static void Main(string[] args)

{

 

    // Create a token provider with the relevant credentials.

    TokenProvider credentials =

        TokenProvider.CreateSharedSecretTokenProvider

        (AccountDetails.Name, AccountDetails.Key);

 

    // Create a URI for the serivce bus.

    Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri

        ("sb", AccountDetails.Namespace, string.Empty);

 

    // Create the MessagingFactory

    MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);

 

    // Use the MessagingFactory to create a queue client

    QueueClient queueClient = factory.CreateQueueClient(AccountDetails.QueueName);

 

    // Create a LargeMessageReceiver and receive the message.

    LargeMessageReceiver receiver = new LargeMessageReceiver(queueClient);

    BrokeredMessage largeMessage = receiver.Receive();

 

    Console.WriteLine("Received message");

    Console.WriteLine("Message body size: " + largeMessage.Size);

 

    string testFile = AccountDetails.TestFile.Replace(@"\In\", @"\Out\");

    Console.WriteLine("Saving file: " + testFile);

 

    // Save the message body as a file.

    Stream largeMessageStream = largeMessage.GetBody<Stream>();

    largeMessageStream.Seek(0, SeekOrigin.Begin);

    FileStream fileOut = new FileStream(testFile, FileMode.Create);

    largeMessageStream.CopyTo(fileOut);

    fileOut.Close();

 

    Console.WriteLine("Done!");

}

In order to test the application, the sending application is executed, which will use the LargeMessageSender class to split the message and place it on the queue. The output of the sender console is shown below.

image

The console shows that the body size of the large message was 9,929,365 bytes, and the message was sent as a sequence of 51 sub messages.

When the receiving application is executed the results are shown below.

image

The console application shows that the aggregator has received the 51 messages from the message sequence that was creating in the sending application. The messages have been aggregated to form a massage with a body of 9,929,365 bytes, which is the same as the original large message. The message body is then saved as a file.

Improvements to the Implementation

The splitter and aggregator patterns in this implementation were created in order to show the usage of the patterns in a demo, which they do quite well. When implementing these patterns in a real-world scenario there are a number of improvements that could be made to the design.

Copying Message Header Properties

When sending a large message using these classes, it would be great if the message header properties in the message that was received were copied from the message that was sent. The sending application may well add information to the message context that will be required in the receiving application.

When the sub messages are created in the splitter, the header properties in the first message could be set to the values in the original large message. The aggregator could then used the values from this first sub message to set the properties in the message header of the large message during the aggregation process.

Using Asynchronous Methods

The current implementation uses the synchronous send and receive methods of the QueueClient class. It would be much more performant to use the asynchronous methods, however doing so may well affect the sequence in which the sub messages are enqueued, which would require the implementation of a resequencer in the aggregator to restore the correct message sequence.

Handling Exceptions

In order to keep the code readable no exception handling was added to the implementations. In a real-world scenario exceptions should be handled accordingly.

Posted On Tuesday, April 10, 2012 11:35 AM | Feedback (0) |

Monday, April 02, 2012

Transactional Messaging in the Windows Azure Service Bus

Introduction

I’m currently working on broadening the content in the Windows Azure Service Bus Developer Guide. One of the features I have been looking at over the past week is the support for transactional messaging. When using the direct programming model and the WCF interface some, but not all, messaging operations can participate in transactions. This allows developers to improve the reliability of messaging systems. There are some limitations in the transactional model, transactions can only include one top level messaging entity (such as a queue or topic, subscriptions are no top level entities), and transactions cannot include other systems, such as databases.

As the transaction model is currently not well documented I have had to figure out how things work through experimentation, with some help from the development team to confirm any questions I had. Hopefully I’ve got the content mostly correct, I will update the content in the e-book if I find any errors or improvements that can be made (any feedback would be very welcome). I’ve not had a chance to look into the code for transactions and asynchronous operations, maybe that would make a nice challenge lab for my Windows Azure Service Bus course.

Transactional Messaging

Messaging entities in the Windows Azure Service Bus provide support for participation in transactions. This allows developers to perform several messaging operations within a transactional scope, and ensure that all the actions are committed or, if there is a failure, none of the actions are committed. There are a number of scenarios where the use of transactions can increase the reliability of messaging systems.

Using TransactionScope

In .NET the TransactionScope class can be used to perform a series of actions in a transaction. The using declaration is typically used de define the scope of the transaction. Any transactional operations that are contained within the scope can be committed by calling the Complete method. If the Complete method is not called, any transactional methods in the scope will not commit.

 

// Create a transactional scope.

using (TransactionScope scope = new TransactionScope())

{

    // Do something.

 

    // Do something else.

 

    // Commit the transaction.

    scope.Complete();

}

 

 

In order for methods to participate in the transaction, they must provide support for transactional operations. Database and message queue operations typically provide support for transactions.

Transactions in Brokered Messaging

Transaction support in Service Bus Brokered Messaging allows message operations to be performed within a transactional scope; however there are some limitations around what operations can be performed within the transaction.

In the current release, only one top level messaging entity, such as a queue or topic can participate in a transaction, and the transaction cannot include any other transaction resource managers, making transactions spanning a messaging entity and a database not possible.

When sending messages, the send operations can participate in a transaction allowing multiple messages to be sent within a transactional scope. This allows for “all or nothing” delivery of a series of messages to a single queue or topic.

When receiving messages, messages that are received in the peek-lock receive mode can be completed, deadlettered or deferred within a transactional scope. In the current release the Abandon method will not participate in a transaction. The same restrictions of only one top level messaging entity applies here, so the Complete method can be called transitionally on messages received from the same queue, or messages received from one or more subscriptions in the same topic.

Sending Multiple Messages in a Transaction

A transactional scope can be used to send multiple messages to a queue or topic. This will ensure that all the messages will be enqueued or, if the transaction fails to commit, no messages will be enqueued.

image

 

 

An example of the code used to send 10 messages to a queue as a single transaction from a console application is shown below.

 

QueueClient queueClient = messagingFactory.CreateQueueClient(Queue1);

 

Console.Write("Sending");

 

// Create a transaction scope.

using (TransactionScope scope = new TransactionScope())

{

    for (int i = 0; i < 10; i++)

    {

        // Send a message

        BrokeredMessage msg = new BrokeredMessage("Message: " + i);

        queueClient.Send(msg);

        Console.Write(".");

    }

    Console.WriteLine("Done!");

    Console.WriteLine();

 

    // Should we commit the transaction?

    Console.WriteLine("Commit send 10 messages? (yes or no)");

    string reply = Console.ReadLine();

    if (reply.ToLower().Equals("yes"))

    {

        // Commit the transaction.

        scope.Complete();

    }

}

Console.WriteLine();

messagingFactory.Close();

 

 

The transaction scope is used to wrap the sending of 10 messages. Once the messages have been sent the user has the option to either commit the transaction or abandon the transaction. If the user enters “yes”, the Complete method is called on the scope, which will commit the transaction and result in the messages being enqueued. If the user enters anything other than “yes”, the transaction will not commit, and the messages will not be enqueued.

Receiving Multiple Messages in a Transaction

The receiving of multiple messages is another scenario where the use of transactions can improve reliability. When receiving a group of messages that are related together, maybe in the same message session, it is possible to receive the messages in the peek-lock receive mode, and then complete, defer, or deadletter the messages in one transaction. (In the current version of Service Bus, abandon is not transactional.)

image

 

The following code shows how this can be achieved.

using (TransactionScope scope = new TransactionScope())

{

 

    while (true)

    {

        // Receive a message.

        BrokeredMessage msg = q1Client.Receive(TimeSpan.FromSeconds(1));

        if (msg != null)

        {

            // Wrote message body and complete message.

            string text = msg.GetBody<string>();

            Console.WriteLine("Received: " + text);

            msg.Complete();

        }

        else

        {

            break;

        }

    }

    Console.WriteLine();

 

    // Should we commit?

    Console.WriteLine("Commit receive? (yes or no)");

    string reply = Console.ReadLine();

    if (reply.ToLower().Equals("yes"))

    {

        // Commit the transaction.

        scope.Complete();

    }

    Console.WriteLine();

}

 

 

Note that if there are a large number of messages to be received, there will be a chance that the transaction may time out before it can be committed. It is possible to specify a longer timeout when the transaction is created, but It may be better to receive and commit smaller amounts of messages within the transaction.

It is also possible to complete, defer, or deadletter messages received from more than one subscription, as long as all the subscriptions are contained in the same topic. As subscriptions are not top level messaging entities this scenarios will work.

image

The following code shows how this can be achieved.

try

{

    using (TransactionScope scope = new TransactionScope())

    {

        // Receive one message from each subscription.

        BrokeredMessage msg1 = subscriptionClient1.Receive();

        BrokeredMessage msg2 = subscriptionClient2.Receive();

 

        // Complete the message receives.

        msg1.Complete();

        msg2.Complete();

 

        Console.WriteLine("Msg1: " + msg1.GetBody<string>());

        Console.WriteLine("Msg2: " + msg2.GetBody<string>());

 

        // Commit the transaction.

        scope.Complete();

    }

}

catch (Exception ex)

{

    Console.WriteLine(ex.Message);

}

 

 

Unsupported Scenarios

The restriction of only one top level messaging entity being able to participate in a transaction makes some useful scenarios unsupported. As the Windows Azure Service Bus is under continuous development and new releases are expected to be frequent it is possible that this restriction may not be present in future releases.

The first is the scenario where messages are to be routed to two different systems.

image

The following code attempts to do this.

 

try

{

    // Create a transaction scope.

    using (TransactionScope scope = new TransactionScope())

    {

        BrokeredMessage msg1 = new BrokeredMessage("Message1");

        BrokeredMessage msg2 = new BrokeredMessage("Message2");

 

        // Send a message to Queue1

        Console.WriteLine("Sending Message1");

        queue1Client.Send(msg1);

 

        // Send a message to Queue2

        Console.WriteLine("Sending Message2");

        queue2Client.Send(msg2);

 

        // Commit the transaction.

        Console.WriteLine("Committing transaction...");

        scope.Complete();

    }

}

catch (Exception ex)

{

    Console.WriteLine(ex.Message);

}

 

 

The results of running the code are shown below.

image

When attempting to send a message to the second queue the following exception is thrown:

No active Transaction was found for ID '35ad2495-ee8a-4956-bbad-eb4fedf4a96e:1'. The Transaction may have timed out or attempted to span multiple top-level entities such as Queue or Topic. The server Transaction timeout is: 00:01:00..TrackingId:947b8c4b-7754-4044-b91b-4a959c3f9192_3_3,TimeStamp:3/29/2012 7:47:32 AM.

 

Another scenario where transactional support could be useful is when forwarding messages from one queue to another queue. This would also involve more than one top level messaging entity, and is therefore not supported.

image

 

Another scenario that developers may wish to implement is performing transactions across messaging entities and other transactional systems, such as an on-premise database. In the current release this is not supported.

image

 

Workarounds for Unsupported Scenarios

There are some techniques that developers can use to work around the one top level entity limitation of transactions. When sending two messages to two systems, topics and subscriptions can be used. If the same message is to be sent to two destinations then the subscriptions would have the default subscriptions, and the client would only send one message. If two different messages are to be sent, then filters on the subscriptions can route the messages to the appropriate destination. The client can then send the two messages to the topic in the same transaction.

image

 

In scenarios where a message needs to be received and then forwarded to another system within the same transaction topics and subscriptions can also be used. A message can be received from a subscription, and then sent to a topic within the same transaction. As a topic is a top level messaging entity, and a subscription is not, this scenario will work.

image

 

Posted On Monday, April 02, 2012 9:46 AM | Feedback (2) |

Friday, March 23, 2012

Windows Azure Service Bus Resequencer

Introduction

I’ll be presenting a session at Sweden Windows Azure Group (SWAG) on Monday, as well as presenting on the Windows Azure Service Bus at various other events. I thought it would be fun to look at implementing some of the Enterprise Integration Patterns using the brokered messaging capabilities. I’ll use this article in the next release of Windows Azure Service Bus Developer Guide, and it will probably end up as a “challenge lab” for my Service Bus course.

As a long-time BizTalk developer I have seen many scenarios where the order of messages sent to a destination system needs is critical to business operations. Updating orders is a good example, if the first update to an order is sent after the second update then data may well be corrupted. Many BizTalk adapters, such as the file and FTP adapters have the potential to shuffle the sequence of messages as they enter BizTalk server, and message channels must be developed to resequence these messages.

The Enterprise Integration Patterns website provides a description of the Resequencer pattern here.

 

There are also scenarios where the sequence of messages can get mixed up when working with Service Bus brokered messaging. When sending a stream of messages using the asynchronous send method there is a good chance that the messages will be placed on the queue out of sequence. The following scenario will use this as an example, and a resequencer will be implemented to restore the order of the messages.

Resequencer Scenario

The scenario used for the resequencer implementation is the transfer of a photo over a Service Bus queue. The photo is broken down into 64 tiles (8 x 8), with each tile being sent in a separate message. When the tiles are received from the queue they are reassembled to form the original image.

I’ve have used this scenario previously when demoing resequencer patterns in BizTalk Server after seeing Shy Cohen use a similar scenario to demo reliable messaging in WCF. It’s great to use for presentations, and the use of an image makes it easy to see when messages are out of sequence.

The application that sends and receives the messages is built using Windows Presentation Foundation (WPF), with a basic user interface to show the original image, and the reassembled image after it has been sent and received on the queue. The application has the option to send messages using the synchronous or asynchronous send methods.

A screenshot of the application after sending the messages synchronously is shown below.

clip_image002

The received image has been assembled correctly from the sequence of message tiles, indicating that there was not a disturbance in the order of the messages. The sending of the messages, however, was not optimal. As they were sent synchronously, with the send operation on one having to complete before the next message can be sent, it took almost 7 seconds to send all 64 messages, at about 9.5 messages per second.

Sending messages asynchronously will provide much better throughput for the sending application. The results of this are shown below.

clip_image004

Using asynchronous send the 64 messages were sent in under half a second, at a rate of over 150 messages per second. In this test, sending messages asynchronously provides better than 15 times the throughput. Sending the messages asynchronously, however, has affected the order in which the messages were received. While most of the messages are in order, the first four messages containing the first half of the book title were the last four messages to be received.

In some scenarios the order of messages is not important, we are only concerned with throughput and reliability, but in this scenario it affects the display of the image. For these kinds of scenarios we need to implement a resequencer.

Resequencer Implementation

In this scenario the resequencer will be implemented as in intermediary service between the source system and the target system. I am using the Image Transfer WPF application to act as the source and target system, but the principle is the same. 

image

The pseudo-code for a possible resequencer is shown below.

while (true)

{

    ReceiveMessage;

    if (message is in sequence)

    {

        ForwardMessage;

        Forward any stored in sequence messages;

    }

    else

    {

        Store message;

    }

}       

 

The design decisions we have to make when implementing a resequencer are as follows:

·       How is the sequence of messages determined?

·       How should the message store be implemented?

Determining the Sequence of Messages

In order for a resequencer to work it must have a means of determining the correct sequence of messages. The BrokeredMessage class provides a SequenceNumber property, which is set by the queue or topic when the message is enqueued, starting with 1 for the first message enqueued.

In some scenarios the messages could have been enqueued in the correct order, and then the sequence changed by a multi-threaded receive. In those scenarios the SequenceNumber could be used to resequencer the messages.

In this scenario the messages are enqueued out of sequence by the multi-threaded asynchronous sending operations. This means that the SequenceNumber property of the dequeued messages will not reflect correct order of the messages. This means that the message sender will have to  provide some means of identifying the sequence of each message.

The following code shows the code used to create messages from the encoded image streams and assign an incrementing send session ID value to the Label property of the message header. The message is then sent synchronously or asynchronously depending on the selection made by the user.

for (int y = 0; y < ImageTiles; y++)

{

    for (int x = 0; x < ImageTiles; x++)

    {

        MemoryStream ms = new MemoryStream();

 

        // Use a delegate as we are accessing UI elements in a different thread.

        SimpleDelegate getImageCrop = delegate()

        {

            // Create a cropped bitmap of the image tile.

            CroppedBitmap cb = null;

            cb = new CroppedBitmap(FileBitmapImage, new Int32Rect

                (x * blockWidth, y * blockHeight, blockWidth, blockHeight));

 

            // Encode the bitmap to the memory stream.

            PngBitmapEncoder encoder = new PngBitmapEncoder();

            encoder.Frames.Add(BitmapFrame.Create(cb));

            encoder.Save(ms);

        };

        this.Dispatcher.Invoke(DispatcherPriority.Send, getImageCrop);

 

        // Create a brokered message using the stream.

        ms.Seek(0, SeekOrigin.Begin);

        BrokeredMessage blockMsg = new BrokeredMessage(ms, true);

 

        // Set the send sequence ID to the message lable.

        blockMsg.Label = sendSequenceId.ToString();

 

        // Send the message using either sync or async.

        if (SendAsync == true)

        {

            queueClient.BeginSend(blockMsg, OnSendComplete,

                new Tuple<QueueClient, string>(queueClient, blockMsg.MessageId));

        }

        else

        {

            queueClient.Send(blockMsg);

        }

 

        // Increment the send sequence ID

        sendSequenceId++;

 

        // Update the progress bar.

        SimpleDelegate updateBar = delegate()

        {

            prgTransfer.Value++;

        };

        this.Dispatcher.BeginInvoke(DispatcherPriority.Send, updateBar);

    }

}

 

 

Delegates have been used here as the code is running on a background worker thread and needs to access the UI elements in the WPF application.

Storing Messages

Any messages that are received out of sequence will need to be stored, and then forwarded to the target system once the previous message in the sequence has been received and forwarded. The message deferral functionality available when receiving messages from queues and subscriptions provides a nice way to store messages that are out of sequence. Provided the messages are received using the peek-lock receive mode the messages can be deferred, and then received again by specifying the appropriate SessionId in the receive method.

Implementing the Resequencer Loop

The resequencer is implemented as a separate WPF application. It receives messages from an inbound queue, resequences them, and sends them, and then sends them on an outbound queue. The code for the main loop of the resequencer is shown below. The receiveSequenceId variable is used to keep track of the sequence of received messages. If the sendSequenceId, which is retrieved from the Label property of the message matches the receiveSequenceId the message is cloned and forwarded and receiveSequenceId is incremented.

When a message has been forwarded, the next message in the sequence may have been received earlier and been deferred. The deferred messages are checked using the dictionary to see if the next message is present. If so it is received, copied, and forwarded, and the process repeated.

If the receiveSequenceId does not match sendSequenceId then the message is out of sequence. When this happens the message is deferred, and the SequenceNumber of the message added to a dictionary with the sendSequenceId used as a key. The SequenceNumber is required to receive the deferred message.

// Initialize the receive sequence ID.

long receiveSequenceId = 1;

 

while (true)

{

    BrokeredMessage msg = inbloudQueueClient.Receive(TimeSpan.FromSeconds(3));

    if (msg != null)

    {

                   

        long sendSequenceId = long.Parse(msg.Label);

 

        // Is the message in sequence?

        if (sendSequenceId == receiveSequenceId)

        {

            // Clone the message and forward it.

            Debug.WriteLine("Forwarding: " + sendSequenceId);

            BrokeredMessage outMsg = CloneBrokeredMessage(msg);

            outboundQueueClient.Send(outMsg);

            msg.Complete();

 

            // Increment the receive sequence ID.

            receiveSequenceId++;

 

            // Check for deferred messages in sequence.

            while (true)

            {

                if (deferredMessageSequenceNumbers.ContainsKey(receiveSequenceId))

                {

                    Console.WriteLine("Sending deferred message: " + receiveSequenceId);

 

                    // Receive the deferred message from the queue using the sequence ID

                    // retrieved from the dictionary.

                    long deferredMessageSequenceNumber =

                        deferredMessageSequenceNumbers[receiveSequenceId];

                    BrokeredMessage msgDeferred =

                        inbloudQueueClient.Receive(deferredMessageSequenceNumber);

                               

                    // Clone the deferred message and send it.

                    BrokeredMessage outMsgDeferred = CloneBrokeredMessage(msgDeferred);

                    outboundQueueClient.Send(outMsgDeferred);

                    msgDeferred.Complete();

                    receiveSequenceId++;

                }

                else

                {

                    // The next message in the sequence is not deferred.

                    break;

                }

 

            }

        }

        else

        {

            // Add the message sequence ID to the dictionary using the send sequence ID

            // then defer the message. We will need the sequence id to receive it.

            deferredMessageSequenceNumbers.Add(sendSequenceId, msg.SequenceNumber);

            msg.Defer();

        }

    }

}

 

 

Testing the Implementation

In order to test the resequencer the image transfer application will send messages to the inbound queue, and receive them from the outbound queue. The results of testing with the application sending messages asynchronously is shown below.

clip_image008

 

When the application was tested with 16 tiles (4 x 4) with tracing added the forwarding and deferring of the messages can clearly be seen.

Deferring: 3

Deferring: 4

Deferring: 5

Deferring: 6

Deferring: 7

Deferring: 8

Deferring: 9

Deferring: 10

Deferring: 11

Deferring: 12

Deferring: 13

Deferring: 14

Deferring: 15

Deferring: 16

Forwarding: 1

Forwarding: 2

Sending deferred message: 3

Sending deferred message: 4

Sending deferred message: 5

Sending deferred message: 6

Sending deferred message: 7

Sending deferred message: 8

Sending deferred message: 9

Sending deferred message: 10

Sending deferred message: 11

Sending deferred message: 12

Sending deferred message: 13

Sending deferred message: 14

Sending deferred message: 15

Sending deferred message: 16

 

Issues with Cloning Messages

When receiving messages from one messaging entity and forwarding them to another the following code should not be used.

// Receive a message from the inbound queue.

BrokeredMessage msg = inbloudQueueClient.Receive(TimeSpan.FromSeconds(3));

 

// Forward the message to the outbound queue.

outboundQueueClient.Send(outMsg);

 

 

It will result in an InvalidOperationException being thrown with the message “A received message cannot be directly sent to another entity. Construct a new message object instead.”.

The resequencer uses a quick and dirty message clone method, the code for this is shown below.

private BrokeredMessage CloneBrokeredMessage (BrokeredMessage msg)

{

    Stream stream = msg.GetBody<Stream>();          

    BrokeredMessage clonedMsg = new BrokeredMessage(stream, true);

    clonedMsg.Label = msg.Label;

    return clonedMsg;

}

 

The code seems to work fine in this scenario, but care must be taken to ensure that the appropriate message properties are copied from the header of the source message to that of the destination message.

 

Alternative Resequencer Implementations

The implementation in the previous section has been developed to demonstrate the principles of a resequencer in a presentation. For this reason it is hosted in a WPF application, and no error handling code has been added. In a real world scenario the resequencer would be either hosted in a service, or alternatively in the target system. Using a worker role in Windows Azure would allow for a cloud-based solution, but the hourly costs may make this prohibitive.

Handling Errors

As well as the standard error handling on sending and receiving messages a resequencer should also handle a scenario when one of the messages in the sequence is missing. If this happens in my scenario all subsequent messages will be deferred, and the system will never recover. There are a number of ways that this could be handled better.

One option would be to set a threshold of a specific number of deferred messages or a specific time interval that would indicate that the missing message is probably lost. When this threshold is reached, an error or warning could be raised, and the sequence could be resumed. This could either be by an administrative action, or automatically. In either case, if the missing message does eventually arrive at the resequencer it can be dead-lettered and another error or warning raised.

Storing State of Deferred Messages

One of the disadvantages of the message deferral design is that the resequencer needs to hold the state of the SequenceNumber values for the deferred messages. If this is lost there is no way to receive these messages from the queue. In my demo scenario I use an in-memory dictionary for this. In a real-world implementation the resequencer should store the SequenceNumber values in a durable store.

Using SessionId for Send Sequence Id

An alternative to using message deferral to store messages the resequencer could be implemented using message sessions. Each session would contain one message, and the SessionId would be set to the sending sequence id. The resequencer (or the receiving application) could then use a session receiver and receive the message from the session by incrementing the value of the session it is listening for to receive the messages in order. The disadvantage of this design is that it would not be possible to use sessions for another purpose in the implementation.

 

Posted On Friday, March 23, 2012 10:47 AM | Feedback (1) |

Tuesday, March 13, 2012

Sweden Windows Azure Group Meeting - Windows Azure Service Bus, 26th March, Stockholm

I’ll be presenting a session on “Windows Azure Service Bus” for the Sweden Windows Azure Group (SWAG) at AddSkills in Stockholm on the 26th March. It will be a demo intensive session looking at the relayed and brokered messaging capabilities of the Service Bus.

Register for the event here.

Sign up to Sweden Windows Azure Group (SWAG) for notifications of future events here.

Read more about the Windows Azure Service Bus in my e-book “Windows Azure Service Bus Developer Guide”.

Posted On Tuesday, March 13, 2012 9:20 AM | Feedback (0) |

Friday, March 02, 2012

PDF and CHM versions of Windows Azure Service Bus Developer Guide Available & Azure Service Bus 2-day Course

I’ve just added PDF and CHM versions of “Windows Azure Service Bus Developer Guide”, you can get them here.

The HTML browsable version is here.

I have the first delivery of my 2-day course “SOA, Connectivity and Integration using the Windows Azure Service Bus” scheduled for 3-4 May in Stockholm. Feel free to contact me via my blog if you have any questions about the course, or would be interested in an on-site delivery. Details of the course are here.

Posted On Friday, March 02, 2012 12:54 AM | Feedback (0) |

Tuesday, February 28, 2012

Windows Azure Service Bus Developer Guide

I’ve just published a web-browsalbe version of “Windows Azure Service Bus Developer Guide”. “The Developers Guide to AppFabric” has been re-branded, and has a new title of “Windows Azure Service Bus Developer Guide”. There is not that much new in the way of content, but I have made changes to the overall structure of the guide. More content will follow, along with updated PDF and CHM versions of the guide.

Posted On Tuesday, February 28, 2012 10:32 PM | Feedback (0) |

Monday, February 13, 2012

Sweden Windows Azure Group (SWAG) Meeting, 20th September in Stockholm

For the first SWAG meeting of 2012 Ludwig Ahrle and Mikael Eriksson will present two developer focused sessions covering the work they have been doing for Curvande on the Windows Azure platform. This will be a great opportunity to see real-world Azure development hands-on.

The presentations will take place between 18:00 and 20.30, with a 30 minute break for food, there will then be a chance to take a drink, chat and mingle with the presenters and other SWAG members.

Register here.

Sign up to join Sweden Windows Azure Group here.

 

Posted On Monday, February 13, 2012 9:48 AM | Feedback (0) |

Powered by: