Geeks With Blogs
Scott Wojan DotRant BLOG
Below is a multithreaded queue with progress reporting and nice exception handling that I implemented for a project. I hope it might help someone else. You can copy and paste the following code into a console app to see how to use it.
 
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
 
namespace WorkQueueTest
{
    class Program
    {
        static readonly object locker = new object();
        static void Main(string[] args)
        {
            //You can play with these values to test
            var workQueueSettings = new {NumberOfTasks = 6, NumberOfTaskSteps = 3, TaskStepSleepTime = 500, NumberOfThreads = 3};
            var nonThreadingTime = workQueueSettings.NumberOfTasks * workQueueSettings.NumberOfTaskSteps * workQueueSettings.TaskStepSleepTime;
            Console.WriteLine("====================\nWithout a work queue, this would take {0} milliseconds\n====================", nonThreadingTime);
 
            var workQueue = new WorkQueue(workQueueSettings.NumberOfThreads);
            var stopWatch = new Stopwatch();
            stopWatch.Start();
            for (var i = 0; i < workQueueSettings.NumberOfTasks; i++)
            {
                var index = i;
                workQueue.Enqueue(new WorkItem
                                      {
                                          OnException = OnException,
                                          OnProgressChanged = OnProgressChanged,
                                          Task = onProgressChangedCallBack =>
                                          {
                                              #region - Code to actually do stuff goes here... the rest of this is sample fluff
                                              //
                                              var taskName = "Task #" + index;
                                              //fake some long running process
                                              for (var j = 0; j < workQueueSettings.NumberOfTaskSteps; j++)
                                              {
                                                  Thread.Sleep(workQueueSettings.TaskStepSleepTime);
                                                  var percent = (int)(((decimal)j / workQueueSettings.NumberOfTaskSteps) * 100);
                                                  onProgressChangedCallBack(new ProgressChangedEventArgs(percent, taskName));
                                              }
 
                                              if (workQueueSettings.NumberOfTasks>2 && index == 2)//fake a sample exception
                                                  throw new Exception("Some fake exception.");
 
                                              onProgressChangedCallBack(new ProgressChangedEventArgs(100, taskName));
                                              #endregion
                                          }
                                             
                                      });
            }
 
            workQueue.WaitForTasksToComplete();//wait for the tasks to complete before continuing
            workQueue.Stop(false);
            stopWatch.Stop();
            Console.WriteLine("====================\nTotal time = {0} milliseconds\n====================\nPress any key to exit", stopWatch.ElapsedMilliseconds);
            Console.ReadLine();
        }
 
        private static void OnException(Exception ex)
        {
            lock (locker)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("EXCEPTION: {0}", ex);
                Console.ResetColor();
            }
        }
 
        private static void OnProgressChanged(ProgressChangedEventArgs eventArgs)
        {
            lock (locker)
            {
                switch (eventArgs.ProgressPercentage)
                {
                    case 0:
                        Console.ForegroundColor = ConsoleColor.Yellow;
                        Console.WriteLine("Starting {0}", eventArgs.UserState);
                        break;
                    case 100:
                        Console.ForegroundColor = ConsoleColor.Green;
                        Console.WriteLine("Completed 100% of {0}", eventArgs.UserState);
                        break;
                    default:
                        Console.WriteLine("Completed {0} of {1}", eventArgs.ProgressPercentage, eventArgs.UserState);
                        break;
                }
                Console.ResetColor();
            }
        }
    }
 
    public class WorkItem
    {
        public virtual Action<Exception> OnException { get; set; }
        public virtual Action<ProgressChangedEventArgs> OnProgressChanged { get; set; }
        public virtual Action<Action<ProgressChangedEventArgs>> Task { get; set; }
    }
 
    public class WorkQueue
    {
        private readonly object locker = new object();
        private readonly Thread[] threads;
        private readonly Queue<WorkItem> queue = new Queue<WorkItem>();
        internal class QueueStopWorkItem : WorkItem{}
 
        public WorkQueue(int workerCount)
        {
            threads = new Thread [workerCount];
 
            // Create and start a separate thread for each worker
            for (var i = 0; i < workerCount; i++)
                (threads [i] = new Thread (RunWorkItem)).Start();
        }
 
        public void Stop (bool waitForWorkersToComplete)
        {
            if (waitForWorkersToComplete == false)
            {
                lock (locker)
                {
                    queue.Clear();
                }
            }
 
            // Enqueue stop worker item for each worker to make each exit.
            for (var i = 0; i < threads.Length; i++)
            {
                Enqueue(new QueueStopWorkItem());
            }
 
            // Wait for threads to finish
            if (waitForWorkersToComplete)
            {
                for (var i = 0; i < threads.Length; i++)
                {
                    threads[i].Join();
                }
            }
        }
 
        public void Enqueue (WorkItem workItem)
        {
            lock (locker)
            {
                queue.Enqueue(workItem); //Push one element into the queue.
                Monitor.Pulse(locker);   //Release the waiting thread
            }
        }
 
        void RunWorkItem()
        {
            while (true)// loop until a stop work item is found.
            {
                WorkItem item;
                lock (locker)
                {
                    while (queue.Count == 0)
                        Monitor.Wait(locker); ////Wait, if the queue is busy.
                    item = queue.Dequeue();
                }
 
                if (item is QueueStopWorkItem)
                    return; // Time to exit.
 
                try
                {
                    item.Task(item.OnProgressChanged); // Execute item.
                }
                catch(Exception ex)
                {
                    if (item.OnException != null)
                        item.OnException(ex);
                    else
                        throw;
                }
            }
        }
 
        public void WaitForTasksToComplete()
        {
            Stop(true);
        }
    }
}
 
 
 
The result should look like the following:
 
 
Posted on Wednesday, September 8, 2010 11:46 AM | Back to top

Copyright © Scott Wojan | Powered by: GeeksWithBlogs.net