Below is a multithreaded queue with progress reporting and nice exception handling that I implemented for a project. I hope it might help someone else. You can copy and paste the following code into a console app to see how to use it.
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
namespace WorkQueueTest
{
class Program
{
static readonly object locker = new object();
static void Main(string[] args)
{
//You can play with these values to test
var workQueueSettings = new {NumberOfTasks = 6, NumberOfTaskSteps = 3, TaskStepSleepTime = 500, NumberOfThreads = 3};
var nonThreadingTime = workQueueSettings.NumberOfTasks * workQueueSettings.NumberOfTaskSteps * workQueueSettings.TaskStepSleepTime;
Console.WriteLine("====================\nWithout a work queue, this would take {0} milliseconds\n====================", nonThreadingTime);
var workQueue = new WorkQueue(workQueueSettings.NumberOfThreads);
var stopWatch = new Stopwatch();
stopWatch.Start();
for (var i = 0; i < workQueueSettings.NumberOfTasks; i++)
{
var index = i;
workQueue.Enqueue(new WorkItem
{
OnException = OnException,
OnProgressChanged = OnProgressChanged,
Task = onProgressChangedCallBack =>
{
#region - Code to actually do stuff goes here... the rest of this is sample fluff
//
var taskName = "Task #" + index;
//fake some long running process
for (var j = 0; j < workQueueSettings.NumberOfTaskSteps; j++)
{
Thread.Sleep(workQueueSettings.TaskStepSleepTime);
var percent = (int)(((decimal)j / workQueueSettings.NumberOfTaskSteps) * 100);
onProgressChangedCallBack(new ProgressChangedEventArgs(percent, taskName));
}
if (workQueueSettings.NumberOfTasks>2 && index == 2)//fake a sample exception
throw new Exception("Some fake exception.");
onProgressChangedCallBack(new ProgressChangedEventArgs(100, taskName));
#endregion
}
});
}
workQueue.WaitForTasksToComplete();//wait for the tasks to complete before continuing
workQueue.Stop(false);
stopWatch.Stop();
Console.WriteLine("====================\nTotal time = {0} milliseconds\n====================\nPress any key to exit", stopWatch.ElapsedMilliseconds);
Console.ReadLine();
}
private static void OnException(Exception ex)
{
lock (locker)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("EXCEPTION: {0}", ex);
Console.ResetColor();
}
}
private static void OnProgressChanged(ProgressChangedEventArgs eventArgs)
{
lock (locker)
{
switch (eventArgs.ProgressPercentage)
{
case 0:
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("Starting {0}", eventArgs.UserState);
break;
case 100:
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("Completed 100% of {0}", eventArgs.UserState);
break;
default:
Console.WriteLine("Completed {0} of {1}", eventArgs.ProgressPercentage, eventArgs.UserState);
break;
}
Console.ResetColor();
}
}
}
public class WorkItem
{
public virtual Action<Exception> OnException { get; set; }
public virtual Action<ProgressChangedEventArgs> OnProgressChanged { get; set; }
public virtual Action<Action<ProgressChangedEventArgs>> Task { get; set; }
}
public class WorkQueue
{
private readonly object locker = new object();
private readonly Thread[] threads;
private readonly Queue<WorkItem> queue = new Queue<WorkItem>();
internal class QueueStopWorkItem : WorkItem{}
public WorkQueue(int workerCount)
{
threads = new Thread [workerCount];
// Create and start a separate thread for each worker
for (var i = 0; i < workerCount; i++)
(threads [i] = new Thread (RunWorkItem)).Start();
}
public void Stop (bool waitForWorkersToComplete)
{
if (waitForWorkersToComplete == false)
{
lock (locker)
{
queue.Clear();
}
}
// Enqueue stop worker item for each worker to make each exit.
for (var i = 0; i < threads.Length; i++)
{
Enqueue(new QueueStopWorkItem());
}
// Wait for threads to finish
if (waitForWorkersToComplete)
{
for (var i = 0; i < threads.Length; i++)
{
threads[i].Join();
}
}
}
public void Enqueue (WorkItem workItem)
{
lock (locker)
{
queue.Enqueue(workItem); //Push one element into the queue.
Monitor.Pulse(locker); //Release the waiting thread
}
}
void RunWorkItem()
{
while (true)// loop until a stop work item is found.
{
WorkItem item;
lock (locker)
{
while (queue.Count == 0)
Monitor.Wait(locker); ////Wait, if the queue is busy.
item = queue.Dequeue();
}
if (item is QueueStopWorkItem)
return; // Time to exit.
try
{
item.Task(item.OnProgressChanged); // Execute item.
}
catch(Exception ex)
{
if (item.OnException != null)
item.OnException(ex);
else
throw;
}
}
}
public void WaitForTasksToComplete()
{
Stop(true);
}
}
}
The result should look like the following: