Blog Stats
  • Posts - 44
  • Articles - 0
  • Comments - 67
  • Trackbacks - 0

 

Tuesday, January 24, 2012

Inside the Concurrent Collections: ConcurrentQueue


ConcurrentQueue is, like ConcurrentStack, a lockless collection, in that it is implemented without using any locks at all. However, the semantics required for a queue impose a quite different approach; unlike ConcurrentStack, which has a single point of concurrent contention, a queue can be changed at both the head and tail. This means that at least two variables are involved, most likely more. The simple approach of atomically modifying a single variable won't work here.

What does System.Collections.Generic.Queue do?

Well, let's have a look at the non-concurrent queue. This is implemented as a circular buffer. However, this design is very hard to use in a lockless way; when a new item is enqueued and the backing array is full, the array is doubled in size to accommodate the new item.

To resize the array, the entire contents of the queue has to be read sequentially without using locks and copied into a brand new array by a single thread, during which other threads can enqueue and dequeue items many times. This is very tricky to implement without being subject to race conditions and still ensure high performance.

So, instead, ConcurrentQueue forgoes the circular buffer, and uses a linear array instead. More specifically, it needs to be a linear array with a (conceptually) infinite capacity - although there will only be a finite number of items stored in the queue, items can be enqueued and dequeued an unlimited number of times. Such an array can't be created as a single object.

However, you don't need to keep the entire infinite array in memory, you only need the section of the array that is actually storing items. By splitting the array into finite segments you can create and discard segments as items get enqueued and dequeued to the queue. This is what ConcurrentQueue does.

Queue Segments

To demonstrate, I'll be using a queue with a segment size of 4. To start off with, three items are enqueued. These are added to the leftmost available slot of the tail segment:

Three more items are then enqueued. A new segment is created and assigned to the tail:

And another three:

Five items are then dequeued from the head. The head segment is now empty, so it is discarded:

Using this method, the illusion of an infinite linear array can be created using a finite number of segments.

Achieving thread-safety

ConcurrentQueue uses a segment size of 32, and along with the backing array each segment stores the index items are to be added (m_high) and where they are to be removed (m_low):

private class Segment {
    volatile T[] m_array;
    volatile int m_high;
    volatile int m_low;
}

Thread-safety, in this case, depends on the operation of both enqueuing and dequeuing. So we'll look at the operations in isolation, then put them both together.

Enqueuing

To enqueue an item, you need to:

  1. Increment the m_high variable. This is now the index the item should be inserted at.
  2. Assign the item to the specified slot in m_array.
The key to this is step 1. If step 1 can be done atomically, returning the new incremented value, that gives each thread trying to enqueue an item at the same time separate slots. Step 2 can then be performed concurrently by each thread without having to worry about conflicts. Interlocked.Increment performs this increment atomically:

public void Enqueue(T item) {
    int insertIndex = Interlocked.Increment(ref m_high);
    m_array[insertIndex] = item;
}

That's it! Simple, really...

Dequeuing

Dequeuing acts in a similar way to enqueuing:

  1. Increment the m_low variable. The previous index is the index of the item to be dequeued.
  2. Return the item stored at the specified slot in m_array.
And, similarly, if each thread trying to dequeue an item can be atomically 'assigned' a slot by performing step 1 using Interlocked.Increment, then step 2 can be performed concurrently by several threads:
public T Dequeue() {
    // Increment returns the incremented value of the variable
    int index = Interlocked.Increment(ref m_low);
    return m_array[index-1];
}

But hang on, if we're dequeuing items, we need to check whether the queue is empty first. So checks need to be added to determine when a queue is empty. This depends on the segment state that represents an empty segment. In the case of ConcurrentQueue, this state is when m_low > m_high.

The exact reasons for this specific state representing an empty segment are a gory implementation detail; if you're wondering the reasons for this, then start by working out exactly what m_high and m_low represent.

public bool TryDequeue(out T item) {
    if (m_low > m_high) {
        item = default(T);
        return false;
    }
    
    int index = Interlocked.Increment(ref m_low);
    item = array[index-1];
    return true;
}

All good? Well, no. We've actually introduced a subtle race condition. If we've got a queue with one item in it, and two threads trying to dequeue that one item, then if both threads perform the if (m_low > m_high) check before m_low has been incremented, one will dequeue the item and one will successfully dequeue on an empty queue.

In ConcurrentStack, a similar race condition was solved by using Interlocked.CompareExchange; taking a snapshot, doing some work, then atomically making that work visible to other threads only if the state is still valid. By judicious use of local variables and loops, we can do the same thing here:

public bool TryDequeue(out T item) {
    int index;
    do {
        index = m_low;
        
        // check if the queue is empty
        if (index > m_high) {
            item = default(T);
            return false;
        }
        
        // I think m_low has the same value as index. If it is, replace it with index+1.
        // Return to me what m_low actually was.
        if (Interlocked.CompareExchange(ref m_low, index+1, index) == index) {
            // success - index is now a valid index to dequeue, specific to this thread
            item = m_array[index]
            return true;
        }
        
        // m_low has been changed in the meantime. Go back to the start and try again.
    }
    while (true);
}

In this case, CompareExchange is acting as a conditional Increment.

Combining the two

This is where it gets complicated. The most obvious issue is another race condition, between enqueuing and dequeuing. To reiterate, the steps performed are:

Enqueuing:
  1. Increment the m_high variable. This is now the index the item should be inserted at.
  2. Assign the item to the specified slot in m_array.
Dequeuing:
  1. Check if the queue is empty (m_low > m_high). If it is, return false.
  2. Increment m_low (if m_low hasn't changed) and return the item.

Start with an empty queue. Thread 1 executes step 1 of enqueuing; m_high is incremented. Thread 2 then tries to dequeue an item; step 1 of dequeuing thinks the queue has items because m_low <= m_high. Step 2 of dequeuing then executes, and the item dequeued is the null value, because thread 1 hasn't yet assigned the enqueued item to the backing array.

Hmm, how to solve this? There's more than one variable involved here; the race condition involves the relationship between two variables, m_high and m_low, not a single variable being changed behind the scenes, which was the problem solved previously using CompareExchange and loops.

Well, we can't swap round the order of operations for enqueuing, because step 1 is what atomically assigns a slot when several threads are enqueuing items at once.

What we need is some sort of notification that enqueuing has finished assigned the item to the backing array. Then the dequeuing thread can wait for that notification before dequeuing the item from the queue:

Enqueuing
  1. Increment the m_high variable. This is now the index the item should be inserted at.
  2. Assign the item to the specified slot in m_array.
  3. Set state indicating the item at that index has been enqueued.
Dequeuing:
  1. Check if the queue is empty (m_high >= m_low). If it is, return false.
  2. Increment m_low (if m_low hasn't changed).
  3. Wait for the state to be set at the index to dequeue.
  4. Return the item.

This is the role played by the m_state array in ConcurrentQueue.Segment. Each segment has an int[] m_state alongside the T[] m_array; a 1 in a particular slot indicates the item has been set, and a 0 (the default value) indicates it isn't set. This turns the Enqueue method into this:

public void Enqueue(T item) {
    int insertIndex = Interlocked.Increment(ref m_high);
    m_array[insertIndex] = item;
    m_state[insertIndex] = 1;
}
and TryDequeue into this:
public bool TryDequeue(out T item) {
    int index;
    do {
        index = m_low;
        
        if (index > m_high) {
            item = default(T);
            return false;
        }
        
        if (Interlocked.CompareExchange(ref m_low, index+1, index) == index) {
            // busywait until m_state[index] is set
            while (m_state[index] == 0) {}
            
            item = m_array[index]
            return true;
        }
    }
    while (true);
}

By marking both m_array and m_state as volatile, this stops the JIT reordering accesses to those arrays, ensuring that the invariant implicit in this code (m_state[index] == 1 if and only if m_array[index] has an item in it) doesn't get broken by the JIT or processors reordering instructions.

Tweaking the control flow a bit, and sprinkling calls to SpinWait.SpinOnce() around as the abstraction of a busywait, and you've got the code of the enqueue and dequeue methods of ConcurrentQueue.

Final notes

There's a couple of things I need to briefly mention. Firstly, I haven't yet covered growing the queue. Creating new segments when the current segment gets full is largely handled by enqueuing. When an item is added to the last slot in a segment, the Enqueue method creates a new segment and assigns it as the next segment in the queue using the m_next variable. When the dequeue method reaches the end of the current segment, it waits for the m_next variable to be set before looking inside that segment to see if there's an item to dequeue.

Secondly, the eagle-eyed among you might have noticed that dequeued items don't get removed from the backing array; they are still referenced from the array when items are dequeued. I can't see any specific threading reason for this, other than giving one less location for possible threading issues. However, more importantly, this guarantees that the invariant I mentioned above is never be broken, which can be important if you're reasoning about the collection.

Keeping references around longer than they need isn't so big an issue; the items referenced in the array are dereferenced all together when the containing segment is dereferenced. However it is something to bear in mind if you have large objects referenced in a ConcurrentQueue for a long time; they might not be garbage collected when they otherwise could be.

Concurrent principles

So what principles can we extract from this examination of ConcurrentQueue?

  1. Separating threads

    Increment and CompareExchange are used to atomically assign each thread performing an action a separate slot. Each thread can then do work on that slot concurrently without conflicting with each other.

  2. Invariants

    An invariant is used to eliminate a race condition between enqueuing and dequeuing, in this case, m_state[index] == 1 iff m_array[index] has an item in it. This ensures enqueuing and dequeuing on the same slot works as it should. Note that this invariant is maintained even at the cost of keeping things in memory longer than they should.

  3. volatile used as a memory barrier

    Furthermore, volatile is used to add memory barriers ensuring the JIT doesn't reorder m_state and m_array array accesses and inadvertantly break the invariant.

Well, that's the core of the ConcurrentQueue class. I haven't even begun to cover the supporting methods, and glossed over most of the gory details. These I leave for you to explore and figure out. It's time to move on! We finally get some locks involved, and peer under the covers at ConcurrentDictionary.

Cross posted from Simple Talk

 

 

Copyright © simonc