CloudCasts Blog

Webcasts in the Cloud
posts - 131 , comments - 71 , trackbacks - 120

My Links

News

Tag Cloud

Article Categories

Archives

Post Categories

Image Galleries

Bloggers Guides

AppFabric Duplicate Message Detection

I’ve just published three webcasts looking at AppFabric Messaging, Introduction to Azure AppFabric Queues, AppFabric Duplicate Message Detection and AppFabric Messaging Message Expiration.
There are more webcasts on the AppFabric June CTP here.
This article will take a look at the code used in the duplicate detection webcast and explain the concepts involved.
Bear in mind that this code is based on the AppFabric June CTP, things may change when the production version is released.
AppFabric Duplicate Message Detection
When developing message processing systems there can be scenarios where an identical message can be sent by an application more than once. In computer science if an operation is able to handle multiple instances of the same input without changing the result we say that it is idempotent. In AppFabric service bus messaging we are able to create idempotent messaging channels by leveraging the duplicate deletion functionality that is built into queues and topics.

 

The Enterprise Integration Patterns website provides a description of the Idempotent Receiver pattern here.
 
Duplicate Message Detection
Duplicate message detection in queues and topics is based on the MessageId property of the messages that are sent by the sending application. When a message is created using one of the static CreateMessage methods the string value of a new GUID will be assigned to the MessageId property.
The following code will create 10 new messages and output their MessageID values.

 

 
for (int i = 0; i <= 9; i++)
{
    // Create a test message.
    BrokeredMessage testMsg = BrokeredMessage.CreateMessage("Test");
 
    // Output the message ID
    Console.WriteLine("Message {0} has MessageId {1}", i, testMsg.MessageId);
}
 
 
The output from the code is shown below.
Message IDs
Detecting duplicate messages based on the default message ID can be useful in scenarios where an application sending messages to a queue or topic needs to ensure that exactly one message is delivered.
Consider the following code.

 

 
try
{
    // Send a message to a queue.
    sender.Send(msg);
 
}
catch (TimeoutException ex)
{
    // Has the message been sent?                   
}
 
 
If the message is sent to the queue and a timeout exception is thrown there is no way for the sending application to know that the message has been sent successfully. If the sending application chooses to resend the message it can ensure at-least-once delivery. If the sending application chooses not to resend the message it can ensure at-most-once delivery.
If the queue or topic that the message is being sent to is configured to detect duplicate messages the client can retry sending the message and the queue or topic will ignore any duplicates. This will ensure exactly-once delivery of the message.
In some scenarios message duplication may occur before the application places the message on the queue or topic. If a web service is called by a client places a message on the queue there is a chance that the client may make two calls to the service if a timeout exception occurs. As the two messages created in the service will have different message IDs the duplicate detection logic will not detect the second message as a duplicate. In order for duplicate detection to work in these scenarios we must replace the default message ID with an identifier that is linked to the content of the messages we are sending. The next scenario will explore how this can be implemented.
Duplicate Messaging Scenario
Consider a scenario where Radio Frequency Identification (RFID) tags are used for tracking goods passing an RFID reader on a store checkout. It is very easy for some of the RFID tags to be read more than once as a basket of goods is moved past the RFID reader. If the application processing the tag reads is not able to detect and remove these duplicate tag read messages it will result in the customer being billed too much for the transaction, and also cause chaos in the inventory and other systems that are dependent on this data.
A simple console application has been created to simulate this scenario. The RFID tag reads are represented using a data contract. When an instance of the RfidTag class is created it is assigned a unique tag id, and the price and product can be set.

 

 
[DataContract]
class RfidTag
{
    [DataMember]
    public string TagId { get; set; }
 
    [DataMember]
    public string Product { get; set; }
 
    [DataMember]
    public double Price { get; set; }
 
    public RfidTag()
    {
        TagId = Guid.NewGuid().ToString();
    }
}
 
 
In the Main method of the console application an order batch is created as an array of RfidTag objects using the following code. The order is for equipment for two children to play a ball game. The number of items in the order and the total value for the order are calculated and displayed.

 

 
// Create a sample order
RfidTag[] orderItems = new RfidTag[]
{
    new RfidTag() { Product = "Ball", Price = 4.99 },
    new RfidTag() { Product = "Whistle", Price = 1.95 },
    new RfidTag() { Product = "Bat", Price = 12.99 },
    new RfidTag() { Product = "Bat", Price = 12.99 },
    new RfidTag() { Product = "Gloves", Price = 7.99 },
    new RfidTag() { Product = "Gloves", Price = 7.99 },
    new RfidTag() { Product = "Cap", Price = 9.99 },
    new RfidTag() { Product = "Cap", Price = 9.99 },
    new RfidTag() { Product = "Shirt", Price = 14.99 },
    new RfidTag() { Product = "Shirt", Price = 14.99 },
};
 
// Display the order data.
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Order contains {0} items.", orderItems.Length);
Console.ForegroundColor = ConsoleColor.Yellow;
 
double orderTotal = 0.0;
foreach (RfidTag tag in orderItems)
{
    Console.WriteLine("{0} - £{1}", tag.Product, tag.Price);
    orderTotal += tag.Price;
}
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Order value = £{0}.", orderTotal);
Console.WriteLine();
Console.ResetColor();
 
 
The reading of the order tags is then simulated using a random number generator to create duplicate tag reads. The while loop will send messages containing the RfidTag objects to the queue. The random number generator is used to decide whether or not to advance the position in the array, and generate duplicate tag reads 40% of the time. During the sending process a count of messages is maintained and displayed in the console when the message have been sent.

 

 
// Send the order with random duplicate tag reads
int sentCount = 0;
int position = 0;
Random random = new Random(DateTime.Now.Millisecond);
 
Console.WriteLine("Reading tags...");
Console.WriteLine();
Console.ForegroundColor = ConsoleColor.Cyan;
while (position < 10)
{
    // Create a new brokered message from the order item RFID tag.
    BrokeredMessage tagRead =
        BrokeredMessage.CreateMessage(orderItems[position]);
 
    // Send the message
    sender.Send(tagRead);
    Console.WriteLine("Sent: {0}", orderItems[position].Product);
 
    // Randomly cause a duplicate message to be sent.
    if (random.NextDouble() > 0.4) position++;
    sentCount++;
}
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("{0} total tag reads.", sentCount);
Console.WriteLine();
Console.ResetColor();
 
 
Once the tag read messages have been sent they can be received by the point of sale application and the customer billed. The code shown below will receive messages from the queue and display the product names. It will also count the number of items in the invoice and the invoice total, and display this information in the console.

 

 
Console.WriteLine("Receiving tag read messages...");
Console.WriteLine();
double billTotal = 0.0;
 
// Receive the order batch tag read messages.
Console.ForegroundColor = ConsoleColor.Magenta;
while (receiver.TryReceive(TimeSpan.FromSeconds(1), out receivedTagRead))
{
    RfidTag tag = receivedTagRead.GetBody<RfidTag>();
    Console.WriteLine("Bill for {0}", tag.Product);
    receivedCount++;
    billTotal += tag.Price;
}
 
// Bill the customer.
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Bill customer £{0} for {1} items.", billTotal, receivedCount);
Console.WriteLine();
Console.ResetColor();
 
 
When the application is run the output is as follows.
Duplicate Messages Received
The duplicate RFID tag reads meant that the unlucky customer was billed for 13 items instead of 10, and overcharged £44.97. In order to generate an invoice for the correct amount the application will have to be modified to handle the duplicate RFID tag read messages.
Enabling Duplicate Detection
AppFabric queues and topics are able to suppress duplicate messages by setting the RequiresDuplicateDetection property of the relevant description class to true when creating the queue or topic. The default value of RequiresDuplicateDetection is false. The description classes also contain a DuplicateDetectionHistoryTimeWindow property that specifies the time window for the detection of duplicate messages, the default value is 10 minutes.
In the RFID messaging scenario the code used to create the queue is modified as follows. This will create a queue which will suppress any messages that are sent to the queue that have the same message ID as a message that was sent during the past minute.

 

 
// Create a QueueDescription that will suppress duplicate messages
// with a detection history window of one minute.
QueueDescription rfidCheckOutQueueDescription = new QueueDescription()
{
    RequiresDuplicateDetection = true,
    DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 1, 0)
 
};
 
// Create an order queue using QueueDescription
Queue rfidCheckoutQueue = serviceBusNamespaceClient.CreateQueue
    ("rfidcheckout", rfidCheckOutQueueDescription);
 
 
With this modification the application will still be susceptible to duplicate messages as the MessageId property for the messages that are created will have the default values. This means the message id will be unique even for duplicate RFID tag reads. In order for duplicate messages to be detected the message ID is set to the tag ID property of the RFID tag, which will be unique for that RFID tag. This will ensure that the messages for duplicate RFID tag reads will have the same message ID and duplicate messages will be detected by the queue.

 

 
while (position < 10)
{
    // Create a new brokered message from the order item RFID tag.
    BrokeredMessage tagRead =
        BrokeredMessage.CreateMessage(orderItems[position]);
 
    // Set the MessageId of the message to the TagId of the RFID tag.
    tagRead.MessageId = orderItems[position].TagId;
 
    // Send the message
    sender.Send(tagRead);
    Console.WriteLine("Sent: {0}", orderItems[position].Product);
 
    // Randomly cause a duplicate message to be sent.
    if (random.NextDouble() > 0.4) position++;
    sentCount++;
}
 
 
When running the modified application the output is as follows.
Duplicates Eliminated
The queue now successfully suppresses duplicate messages and the amount billed to the customer is correct.
 

Print | posted on Wednesday, August 17, 2011 12:08 AM | Filed Under [ Azure VS2010 AppFabic ]

Feedback

No comments posted yet.
Post A Comment
Title:
Name:
Email:
Comment:
Verification:
 

Powered by: