Alois Kraus

blog

  Home  |   Contact  |   Syndication    |   Login
  111 Posts | 8 Stories | 296 Comments | 162 Trackbacks

News



Article Categories

Archives

Post Categories

Image Galleries

Programming

Threading was never so easy since .NET 4 with the TPL has been released. I know I am a bit late but there are so many nice things which might still be new to many of us. The IEnumerable interface has become famous with the introduction of LINQ but many of us have not yet realized that IEnumerable<T> and  T[] or List<T> can be exchanged in many cases but there are cases where it is important to fall back to a pure IEnumerable<T> if you want to support lazy evaluation. .NET 4 has for example taken advantage of the lazy nature of IEnumerable<T> with the introduction of Directory.EnumerateFiles which returns immediately until the first file is found. Previously you had only the option to call Directory.GetFiles which does potentially search for a long time and will only return when all matching files have been found. This can make a big difference if you search recursively in a big file tree or a directory with many files. I had up to 40s delays in some applications which did process a large directory. 40s waiting time until you can process the first file is certainly not something you want. I did solve this issue in .NET 3.5 with DirectorySearcherAsync which did work quite well.

One additional optimization you do is to continue searching for your files on another thread when the first file has been found and returned by the enumerator. My DirectorySearcherAsync does this as well. This is working but the code is quite complex. We can get the same functionality much easier with .NET 4. The pattern applied here is a simple producer consumer pattern where the producer and the consumer live on different threads. It is easy to transform an IEnumerable<T> to an IEnumerable<T> which does the enumeration on another thread and gives you access to the returned items via a blocking queue in a thread safe manner. The new concurrent collections in .NET 4 are named a little bit different and we can find it under the name BlockingCollection<T> which was made for this exact purpose. Since I did not want to return a BlockingCollection<T> directly but I wanto to give you the possibility to

  • Block with a timeout to get all elements fetched so far
  • Block until the next item was found
  • Cancel a running operation

I have created an extension method not with the signature

static BlockingCollection<T> EnumerateAsync(this IEnumerable<T> source)

but these two

public static IEnumerable<T> EnumerateAsync<T>(this IEnumerable<T> source)
public static IEnumerable<T[]> EnumerateAsync<T>(this IEnumerable<T> source, CancellationToken token, int millisecondsTimeout=-1, int maxCount=0)

The first one is simply a convenience method over the second one which does basically cover the use case you expect from a responsive UI. I do need the ability to cancel the current operation which can be done nicely with a CancellationToken if the user has changed his mind and pressed impatiently the stop button. At the same time he does want to see the results as fast as possible when they are ready. This is done with the millisecondsTimeout flag which does unblock the enumerator e.g. every 500ms to give you the e.g. found files so far. If none have been found yet you are not woken up since there is no work to do. At the same it is also disturbing to update the UI with 10 000 entries at once even if it did take only 500ms. There is need to update the UI more often than every 500ms if many items are pending. For this case the maxCount flag is there to give you at most e.g. 100 items at once. For your exact use case the numbers may turn out different but for file based operations they turned out to be quite good. You did notice that the return value of the second method is not IEnumerable<T> but IEnumerable<T[]>?

The reason is simple: Chunking. What would happen if you get 10000 items in 500ms? You have most likely some code like this in the beginning.

void WorkerThread()
{
    foreach(var x in Directory.EnumerateFiles(“C:\\”,”*.*”, SearchOption.AllDirectories)
                              .EnumerateAsync(source, token,500, 100)) 
         UpdateUI(x);
}

You will find that 10K updates in 500ms will render your UI unusable and you do need a way to update your UI less often with a chunk of gathered data. Every UpdateUI call will need to post a message into the message loop of your to reach the UI thread which is a costly undertaking. If x is not of type T but of type T[] you get a coarse grained UI update. In the previous example we did reduce the pressure on the window message pump loop by a factor 100 which feels much better.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;

namespace AsyncEnumeration
{
    public static class Extensions
    {
        /// <summary>
        /// Start enumerating on another thread. The returned enumerator blocks until the next element
        /// is available or the source enumeration has no more elements.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="source">Source enumerable</param>
        /// <returns>Async enumerable</returns>
        public static IEnumerable<T> EnumerateAsync<T>(this IEnumerable<T> source)
        {
            foreach (var t in EnumerateAsync(source, CancellationToken.None,-1,1))
            {
                yield return t[0];
            }
        }

        /// <summary>
        /// Start enumerating on another thread. The returned enumerator blocks until the next element
        /// is available or the source enumeration has no more elements.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="source">source enumerable</param>
        /// <param name="token">Cancellation token if cancellation support is desired or CancellationToken.None</param>
        /// <param name="millisecondsTimeout">Maximum number of milliseconds to wait until the so far fetched elements are returned</param>
        /// <param name="maxCount">Maximum number of elements to fetch before they are returned</param>
        /// <returns>An enumerable with an array of fetched elements to support chunking in asynchronous operations.</returns>
        public static IEnumerable<T[]> EnumerateAsync<T>(this IEnumerable<T> source, CancellationToken token, int millisecondsTimeout=-1, int maxCount=0)
        {
            BlockingCollection<T> coll = new BlockingCollection<T>();

            Task t = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        // enumerator on other thread and add element to thread safe collection
                        foreach (var item in source)
                        {
                            token.ThrowIfCancellationRequested();
                            coll.Add(item);
                        }
                    }
                    finally
                    { 
                        // signal collection that no more elements will be added.
                        coll.CompleteAdding();
                    }
                },
                token);

            DateTime last = DateTime.Now;
            TimeSpan timeout = millisecondsTimeout == -1 ? TimeSpan.MaxValue : new TimeSpan(0, 0, 0, 0, millisecondsTimeout);

            List<T> chunk = new List<T>();
            T got = default(T);

            while (!coll.IsCompleted)
            {
                // Get next element or block until timeout
                if (coll.TryTake(out got, millisecondsTimeout, token))
                {
                    chunk.Add(got);
                }

                // if timeout has elapsed or the maximum chunk size has been 
                // reached yield fetched elements
                if( (chunk.Count > 0 && DateTime.Now - last > timeout ) ||
                    (maxCount > 0 && chunk.Count > maxCount))
                {
                    yield return chunk.ToArray();
                    chunk.Clear();
                    last = DateTime.Now;
                }
            }

            // if there are pending elements yield them as well
            if (chunk.Count > 0)
            {
                yield return chunk.ToArray();
            }
            
            // not really necessary but it is a good idea to wait here 
            // to allow exceptions to be thrown by the task here if there were any.
            t.Wait();
        }

        /// <summary>
        /// Wrap an enumerable to cache any yielded elements so far for faster retrieval.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="source">Source enumerable</param>
        /// <returns>Caching enumerable</returns>
        public static IEnumerable<T> Cache<T>(this IEnumerable<T> source)
        {
            return new MemoizingEnumerator<T>(source);
        }
    }
}

 

That are some quite simple but powerful methods. I have not yet talked about the last method which does cache the returned elements of the source enumerator. It is sometimes useful to check if there are elements at all before you start doing something. To stay with the directory example I want to validate that the directory query does match any files at all. I can do this by

        IEnumerable<string[]> CheckQuery()
        {
            var cachedQuery = Directory.EnumerateFiles(@"C:\", "*.*", SearchOption.AllDirectories)
                                       .EnumerateAsync(Cancel.Token, 500, 100)
                                       .Cache();

            if (cachedQuery.FirstOrDefault() == null)
            {
                throw new ArgumentException("No files found");
            }

            return cachedQuery;
        }

I do start the async operation and check if there are any matches. If there are none I do throw an exception and if yes I do not want to start over again with the file search but continue. This something you would do during parameter validation in a console application to verify that the entered file queries do yield at least one file.

The MemoizingIterator behind the Cache method is very simple

using System.Collections;
using System.Collections.Generic;

namespace AsyncEnumeration
{
    class MemoizingEnumerator<T> : IEnumerable<T>, IEnumerable
    {
        public MemoizingEnumerator(IEnumerable<T> input)
        {
            _Input = input;
        }

        #region IEnumerable Members

        #endregion

        #region IEnumerable<T> Members

        public IEnumerator<T> GetEnumerator()
        {
            if (_Cache == null)
            {
                _enumerator = _Input.GetEnumerator();
                _Cache = new List<T>();
            }
            else
            { 
                foreach (var cachedValue in _Cache)
                {
                    yield return cachedValue;
                }
            }

            if (_enumerator != null)
            {
                while (_enumerator.MoveNext())
                {
                    _Cache.Add(_enumerator.Current);
                    yield return _enumerator.Current;
                }
                _enumerator = null;
            }
        }

        #endregion

        #region IEnumerable Members

        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }

        #endregion

        IEnumerable<T> _Input;
        IEnumerator<T> _enumerator;
        List<T> _Cache;
    }
}

Now we can create a simple UI but responsive UI with only a few lines of Code to display the file names of our hard drive.

SimpleUI

The code to make it is not much but you have to be careful where you need locking and where not. When you press the Start/Stop button we toggle the text between Start and Stop. If the operation has finished on the worker thread we do switch back to start. But in the code below you will not find any locks. Why? Since we do know that our UI does run (in fact can only run) on one thread with the UI message pump there is no need to protect our self from other threads since the UI is always single threaded.

If you want to dive deeper into threading with .NET I can recommend

If you want to know more about the changes in the TPL with the upcoming .NET 4.5 I can recommend TPL Performance Improvements in .NET 4.5 which does contain a detailed analysis what was changed to make continuations as fast as possible. Continuations are supported with .NET 4.5 via the await keyword which would make the code below even simpler.

using System;
using System.Linq;
using System.Windows.Forms;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncEnumeration
{
    public partial class AsyncEnumeration : Form
    {
        CancellationTokenSource Cancel = new CancellationTokenSource();
        Task RunningTask = null;

        public AsyncEnumeration()
        {
            InitializeComponent();
            TaskScheduler.UnobservedTaskException += new EventHandler<UnobservedTaskExceptionEventArgs>(TaskScheduler_UnobservedTaskException);
        }

        private void StartStopButton_Click(object sender, EventArgs e)
        {
            if (!TryCancelCurrentOperation())
            {
                StartSearch();
            }
        }

        private void StartSearch()
        {
            StartStopButton.Text = "Stop"; // switch start button to stop button
            ListViewDisplay.Clear();     // delete old content
            string dirName = cDirectory.Text; // get from UI start directory

            // Do the file search not on the UI thread
            RunningTask = Task.Factory.StartNew(() =>
            {
                // EnumerateAsync starts another task which lets the enumeration running while you
                // can fetch data from the chunked results.
                foreach (var dirArray in Directory.EnumerateFiles(dirName, "*.*", SearchOption.AllDirectories)
                                                  .EnumerateAsync(Cancel.Token, 500, 100))
                {
                    // EnumerateAsync returns either when 500ms have elapsed and some data was found
                    // OR 100 items were found. 
                    var listViewItems = (from x in dirArray
                                         select new ListViewItem(x)).ToArray();

                    // block the UI thread as short as possible by adding a bunch of results
                    this.Invoke(new MethodInvoker(() => ListViewDisplay.Items.AddRange(listViewItems)));
                }
            },
            Cancel.Token);

            // When the enumeration task has finished (with or without error)
            // display the Start button again
            var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
            RunningTask.ContinueWith((t) =>
            {
                StartStopButton.Text = "Start";
                if (t.IsFaulted)
                {
                    MessageBox.Show(t.Exception.ToString());
                }
                RunningTask = null;
            }, uiScheduler);
        }

        bool TryCancelCurrentOperation()
        {
            if (RunningTask != null)
            {
                Cancel.Cancel();
                Cancel = new CancellationTokenSource();
                return true;
            }

            return false;
        }

        void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
        {
            MessageBox.Show(e.Exception.ToString());
            e.SetObserved();
        }
    }
}
posted on Friday, December 2, 2011 12:24 PM

Feedback

# re: Simple Producer Consumer With Tasks And .NET 4 12/3/2011 7:48 PM Volker von Einem
You might want to have a look at Rx. Especially the Window extenstion. E.g.:

Directory.EnumerateFiles(@"C:\Windows", "*.*", SearchOption.AllDirectories)
.ToObservable()
.Window(TimeSpan.FromSeconds(1))
.Subscribe(x => x.Subscribe(
Console.WriteLine,
e2 => Console.WriteLine(e2.Message)),
e => Console.WriteLine(e.Message));

# re: Simple Producer Consumer With Tasks And .NET 4 12/5/2011 4:09 AM Marty Neal
Your MemoizingEnumerator<T> idea is a good one, but it has been done before. One problem with yours is if you enumerate partially, and then later want to enumerate fully, you will only be able to enumerate up to where you enumerated to last time. Also, it lacks thread safety. Have a look at: http://blog.nerdbank.net/2009/05/caching-results-of-net-ienumerable.html Cheers.

# re: Simple Producer Consumer With Tasks And .NET 4 12/5/2011 6:45 PM Alois Kraus
@Volker: RX is very nice but it is quite implicit about its threading behaviour. This is partially a documentation problem but also a problem of what the library tries to solve. Originally it was created to make events composable. But it can now create from any IEnumerable<T> also an IObservable which may be helpful but does deviate from the original design goal. RX is now a library aimed at high skilled programmers with interest in functional programming and composable events with thread scheduling support.

@Marty: Yes I do know that I am not the first. RX had also an overload called Memoize for enumerables. But the released version does no longer contain it (perhaps naming conflicts with other existing libs). I have tested the Cache operator and subsequent calls to it will return the cached values and missing values as well. The _enumerator value will only be set to null when no more items are in the sequence.
Yes the returned enumerator for Cache is not thread safe but in my case it was ok. I could also use a BlockingQueue here but that seems like overkill for most cases.

# re: Simple Producer Consumer With Tasks And .NET 4 12/13/2011 10:18 PM Igor
Agree with Volker. Rx is pretty simple to use but is hard to understand. If you spend a time to learn examples then you will love this library :)

Post A Comment
Title:
Name:
Email:
Comment:
Verification: