Geeks With Blogs

News
Vitaly Dilmukhametov

Today I tell you about BlockingCollection<T>. It’s more complex and more interesting data storage.

BlockingCollection<T> is a thread-safe data structure, which is called “blocking”, because it based on following principles:

  • if the collection is empty, and some code try to take an element, the thread, that executes this code, is blocked while any at least one element is not added;
  • if the collection already contains maximum elements, the thread, attempting to add new element, will be blocked until free space for the element is not available.

Of course, it’s not a full description of the collection’s possibilities.

Let’s start to introduce with BlockingCollection<T>. It has few methods for manipulating data:

Blocking methods

non-Blocking methods

Add

Add(T);
Add(T, CancellationToken);
TryAdd(T)
TryAdd(T, Int32)
TryAdd(T, TimeSpan)
TryAdd(T, Int32, CancellationToken)

Take

Take()
Take(CancellationToken)
TryTake(out T)
TryTake(out T, Int32)
TryTake(out T, TimeSpan)
TryTake(out T, Int32, CancellationToken)

How you can see, we have both types of methods – blocking and non-blocking (starts with “Try” prefix). Try*-method return true on success of adding or taking an element, else – false. Both types have overloads with cancellation token, which allow to cancel method async.

BlockingCollection<T> have mechanism of completion. Collection is completed, if we will not add any data in future. After completion all attempts to add some data will result in generation of InvalidOperationException. If we try to take an element from empty completed collection, we also have an exception. When we create a collection, it is not completed, and completion is doing by calling CompeteAdding() method. So, we must complete the collection explicitly. How you can use the completion mechanism? You can sync producer and consumer, i.e. parts of code, adding and taking data from the collection. Producer may notice, that new data will not be added by marking the collection as completed. So, consumer will not be wait for new elements.

Ok, here is some code, that demonstrate how to work with BlockingCollection<T>. First of all we instantiate the collection, and set max elements count. However, we can use constructor overload without parameters, that allow collection to grow “infinite”:

// create the collection
BlockingCollection<int> collection = new BlockingCollection<int>(10);

 

Next we will add some elements and print some properties of collection (I’ll talk about it later):

collection.Add(200);
collection.Add(300);
collection.Add(400);
collection.Add(500);
 
Console.WriteLine("Count: " + collection.Count);
Console.WriteLine("BoundedCapacity: " + collection.BoundedCapacity);
Console.WriteLine("IsCompleted: " + collection.IsCompleted);
Console.WriteLine("IsAddingCompleted: " + collection.IsAddingCompleted);
Console.WriteLine();

 

After that we start the timer. At the callback delegate more elements will be added, and collection is mark as completed:

Timer timer = new Timer(delegate
{
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Adding " + " 600");
    collection.Add(600);
    Thread.SpinWait(300000000);
 
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Adding " + " 700");
    collection.Add(700);
 
    collection.CompleteAdding();
},
  null, 3000, Timeout.Infinite);

 

Timer callback will be executed in other thread, than the Main() method. It allow us to model concurrent access to the collection. Then we start taking elements from collection (I use different font colors in different threads):

foreach (var item in collection.GetConsumingEnumerable())
{
    Console.ForegroundColor = ConsoleColor.Gray;
    Console.WriteLine(DateTime.Now.ToLongTimeString() + " Taking " + item);
}

Here is the result of work:

 image

Some words on how this example is work. We create a collection, and add 4 element to it. Next we start the timer, so we have 2 threads:

  • the main thread, where method Main() is executing and taking elements from the collection;
  • the timer thread, where timer callback is executing and adding elements;

On the screenshot you can see that first 4 elements took in time less than 1 second. After that main thread is blocked and waiting for new elements. Timer starts with 3 seconds delay, you can see it: element 500 was took in 40 sec. and element 600 was added in 43 sec. In the same 43 sec. main thread is unblocked and took newly added element 600. Next, the timer thread is executing SpinWait(), after that it add element 700 and complete the collection by calling collection.CompleteAdding(). And the main thread is consuming this element and unblock, because the collection is completed and all elements are processed. If we comment CompleteAdding() call, the main thread will be blocked forever, because timer callback is returned and nobody mark collection as completed. And the main thread will wait for new elements infinite. So, the completion mechanism allows to avoid such situation and provide good way to solve the issue.

Some words on collection’s properties. I think, the most interest of them are following:

  • IsAddingCompleted – returns true, if collection is marked as completed, else false;
  • IsCompleted – return true, if collection is completed and empty, else false;
  • BoundedCapacity – return collection’s max capacity. Returns –1 in case of growing collection;

Take a look on the screenshot, and you can notice, when collection contains some elements, both of bool properties are false, because collection in not marked as completed. In the last moment it both are true, because all elements is taken and collection is completed.

Ok, and what about order of elements in the collection? You may notice, that it work as FIFO queue. Is this a predefined behavior? Or we can change it? Answer to these questions give us more careful look at the constructor. It have overload, which contains IProducerConsumerCollection<T> parameter. It implement a storage for the elements. You can use one of the the following collections (described in the part 1), included in the .NET 4 beta 2:

  • ConcurrentStack<T> – thread-safe stack;
  • ConcurrentQueue<T> – thread-safe queue;
  • ConcurrentBag<T> – thread-safe unordered collection, which allows duplicates;

So, BlockingCollection<T> is not a data storage. It implement a logic of access to the storage and mechanism of completion. By default it use ConcurrentQueue<T> as a storage, so you can see FIFO behavior at the samples. We can change it to stack in the constructor:

BlockingCollection<int> collection = 
                new BlockingCollection<int>(new ConcurrentStack<int>(), 10);

and we saw following result:

image

How you can see, order of elements now conform to LIFO. You can realize custom data storage by implementing interface:

public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
    void CopyTo(T[] array, int index);
    T[] ToArray();
    bool TryAdd(T item);
    bool TryTake(out T item);
}

 

In addition, I want to talk about some interesting static methods of the class BlockingCollection<T>. You can interact with a number of collections simultaneously with the following set of static methods:

  • Blocking
    • int BlockingCollection<T>.AddToAny(BlockingCollection<T>[], T);
    • int BlockingCollection<T>.TakeFromAny(BlockingCollection<T>[], out T);
  • Non-Blocking
    • int BlockingCollection<T>.TryAddToAny(BlockingCollection<T>[], T);
    • int BlockingCollection<T>.TryTakeFromAny(BlockingCollection<T>[], out T);

All of these methods take an array of BlockingCollection<T> elements – it’s an array of collections, with it set you’ll interact. They are work as described early Add() and Take() methods, i.e. with blocking logic. Methods return index of the collection, where data has been added or taken, or –1 when the action attempt is fail. More detail description you can read in the MSDN.

And one more interesting fact. In the MSDN you can read, that, for example, TakeFromAny() method return –1, if the taking action is not successful. It’s a blocking method, and when you try to take an element from set of empty non-completed collections, it will wait infinite while an element will be added to any of collection. Or if you complete any of collection, it will throw an ArgumentException. So, you wait, or take an element, or take an exception. And I can’t understood, when TakeFromAny() can return –1. I create a topic on MSDN Parallel Extension forums, and Reed Copsey, Jr accept, that it’s just a documentation issue. He create an appropriate item on the MS Connect. So, you need to be careful when read current MSDN docs about thread-safe collections, because it’s just a preview and may contains an errors.

In conclusion, other interesting samples of BlockingCollection<T> usage you can find at the MSDN code gallery. Now there are only samples for beta 1, but soon refreshed sample pack will be available. I advice you to download Visual Studio 2010 beta 2, that is published this week. You can try to work with new features. It contains the latest Parallel Extensions library, that allow you to use full power of modern multi-core hardware.

Posted on Sunday, October 25, 2009 4:01 AM Parallel Extension , .NET , data , concurrency | Back to top


Comments on this post: Thread-safe data structures .NET 4.0 (part 2)

# re: Thread-safe data structures .NET 4.0 (part 2)
Requesting Gravatar...
Earlier I said that you real geek. And this post one more acknowledgement to it! :)
Left by Sergey Zwezdin on Oct 25, 2009 4:34 AM

# re: Thread-safe data structures .NET 4.0 (part 2)
Requesting Gravatar...
Very good site to learn behind the scenes.
Left by Rajesh on Jul 22, 2010 1:00 AM

Your comment:
 (will show your gravatar)


Copyright © Vitus | Powered by: GeeksWithBlogs.net