The Architect´s Napkin

Software Architecture on the Back of a Napkin
posts - 69 , comments - 227 , trackbacks - 0

My Links

News

Archives

Post Categories

Going asynchronous - AOP made easy with Event-Based Components – Part III

Logging, validation, exception handling: that´s easy aspects to insert into an Event-Based Components design as I´ve shown in my previous post. But what about multi-threading? Or better: parallel and asynchronous processing?

In this article I want to show you, how you could approach multi-core programming using aspects you insert into an existing EBC architecture.

Asynchronous processing

imageWhy use multiple threads at all? It´s because you either want to hide latency, or you want to decrease latency, or you want to increase throughput. Hiding latency means you want some client not to wait for a service to finish before it can go on an do other stuff.

It´s like when you call the pizza delivery service: You order a pizza, then you hang up the phone, and do whatever you like until the pizza is delivered. However long it takes to bake the pizza – you need not freeze during that period. The latency of pizza baking is hidden from you. The pizza service runs asynchronously with regard to its customers.

How could hidden latency help the file indexing scenario? Well, it could make the whole process asynchronous. So whoever calls the indexer does not need to wait for it to finish. This might no be a pressing issue as long as the indexing is wrapped up in a console application. But what if I made the indexing functionality a library?

The Indexer.Console project references the Indexer library. The whole indexing functionality is hidden in in class IndexFiles:

image 

Now any client application can use the indexer like this:

var index = new IndexFiles();
index.Out_Statistics += stats =>
    {
        System.Console.WriteLine("Successfully indexed {0} words.", stats.WordCount);
    };

index.Out_ValidationError += err =>
    {
        System.Console.WriteLine("*** Aborted indexing! Validation error: {0}", err);
    };

index.Out_UnhandledException += ex =>
    {
        System.Console.WriteLine("*** Aborted indexing! Unexpected exception: {0}. See log for details.", ex.Message);
    };

index.In_Process(args[0], args[1]);

And sure a client application would not want to wait for the indexer to finish its work of indexing a whole hard disk with tens of thousands of file. So making the indexing process asynchronous would be a very nice thing to do. But how?

Well, just put an Asynchronize activity right at the start of the feature process and you´re done:

image

Any activities the Asynchronize activity passes data on to are run on a different thread than the one from which the data originated. However, all data passing through Asynchronize is processed on the same thread! That means the data is processed sequentially although in parallel to other code.

The Asynchronize activity creates a single thread and a queue. Data coming in is queued up and the thread is picking data items from the queue whenever it has become idle. If nothing´s left to do it will wait for a notification that new data has arrived.

public class Asynchronize<T>
{
    private readonly Thread worker;

    private readonly Queue<T> dataToProcess;
    private readonly AutoResetEvent dataAvailable;


    public Asynchronize()
    {
        this.dataToProcess = new Queue<T>();
        this.dataAvailable = new AutoResetEvent(false);

        this.worker = new Thread(DispatchDataSequentially) {IsBackground = true};
        this.worker.Start();
    }


    private void DispatchDataSequentially()
    {
        while (true)
        {
            this.dataAvailable.WaitOne();

            T data;
            while (TryGetDataToDispatch(out data))
            {
                this.Out_ProcessSequentially(data);
            }
        }
    }


    private bool TryGetDataToDispatch(out T data)
    {
        data = default(T);

        lock (this.dataToProcess)
        {
            if (this.dataToProcess.Count() == 0) return false;

            data = this.dataToProcess.Dequeue();
            return true;
        }
    }


    public void In_Process(T data)
    {
        lock (this.dataToProcess)
        {
            this.dataToProcess.Enqueue(data);
        }
        this.dataAvailable.Set();
    }


    public event Action<T> Out_ProcessSequentially;
}

This way the latency of processing the data is hidden from its source, but at the same time the order of data item processing it retained.

To plug this aspect in is as easy as plugging in the validation aspect or the exception handling aspect:

public IndexFiles()
{
    // Build
    ...
    var asyncCompileFiles = new Asynchronize<Tuple<string, string>>();
    ...

    // Bind
    this.in_Process = _ => asyncCompileFiles.In_Process(_);

    asyncCompileFiles.Out_ProcessSequentially += handleEx.In_Process;
    handleEx.Out_Process += compileFiles.In_Process;
...

The Asynchronize aspect activity passes on the data it receives – but does so on a single different thread. It´s a generic functional unit. Very simple. It does one job.

If you want more, like exception handling for background processing, then you can combine it with the Handle exception activity into a composite activity, e.g.

image

EBC activities lend themselves to aggreation. It´s very easy to compose them into activities of higher order. So feel free to start implementing your own pluggable application building blocks.

So now the whole indexing process is running on a background thread. Client code is free to work on other stuff and will receive notifications upon completion or exceptional situations.

To exploit a multi-core CPU, though, indexing should happen on more than one thread. By using the Asynchronize activity a second time this can easily be accomplished. With it building the index can be decoupled from Crawling the directory tree and Extracting words:

image

See how the stages of the feature process are running on different threads (marked with their own background color). That means they are working in parallel: the initiator can continue while the compiling and extracting is happening. The compiling and extracting can continue while the building is happening.

Please note: Build index is also working sequentially. So only 3 threads are in use while indexing: initiator thread, compile thread, index builder thread. This ensures maximum speed for both stages on a two core CPU.

In addition Build index does not have to take precautions against concurrent access to its resource, the index. Only a single thread is working on it. That would have been different if Asynchronize would have just used the ThreadPool to dispatch data to a background thread. Stateful asynchronous operations thus do not inevitably lead to headaches ;-)

Switching to a data flow protocol

In my first posting I mentioned two different possible ways of passing data through the feature process: using a data flow protocol or IEnumerable<>. So far IEnumerable<> was just a perfect choice. The index could be written as soon as the IEnumerable<> passed to Build index was exausted.

Now that Build index runs on its own thread, though, it´s not possible to pass an IEnumerable<> to it anymore. Each data item has to be delivered separately. Thus I need to switch to a data flow protocol between Extract words and Build index.

That´s makes Compile words look a little ugly:

public class Compile_words
{
    private Index index = new Index();


    public void In_Process(Tuple<string, string[]> input)
    {
        if(input != null)
        {
            Trace.TraceInformation("Compile words({0}, {1} words) [Thread {2}]", input.Item1, input.Item2.Length,
Thread.CurrentThread.GetHashCode());

            foreach (var word in input.Item2)
                this.index.Add(word, input.Item1);
        }
        else
        {
            this.Out_IndexCompiled(this.index);
            this.Out_Statistics(new IndexStats(this.index.WordCount));

            this.index = new Index();
        }
    }


    public event Action<Index> Out_IndexCompiled;
    public event Action<IndexStats> Out_Statistics;
}

Nevertheless it´s straightforward, I´d say. Just checking for the terminating null is strange.

Parallel processing

What if you´ve more than two cores available? What if you not only want to hide latency, but decrease latency? Then not only asynchronous, but parallel processing is your friend.

This can be as easily introduced into an EBC design. We could, for example, detach Extract words from Compile files so that all files are analyzed in parallel. A Parallelize activitiy is very simple to implement:

public class Parallelize<T>
{
    public void In_Process(T data)
    {
        ThreadPool.QueueUserWorkItem(_ => this.Out_ProcessInParallel((T)_), data);
    }

    public event Action<T> Out_ProcessInParallel;
}

ThreadPool is the only help you need from the .NET framework. Each data item is dispatched for processing on another thread (within the limits of the thread pool). Thus they are all handled in parallel.

Technically it´s easy to insert the Parallelize aspect activity into the EBC design: just plug it between the two activities.

image

But this is not sufficient. It wouldn´t work. Don´t do it just like that!

The problem with this is, it´s not guaranteed that the terminating null is the last data item output by Extract words. There are potentially many Extract word activities running at the same time. And any one of them could issue a null – even if other ones are still analysing a file.

image

To guarantee null is sent on to Build index only after all other data items have been processed by Extract word threads a trick needs to be employed: the feature process needs to count how many “tasks” were send to Extract words…

public class CountItemsUntilNullForScatter<T> where T : class
{
    private int counter = 0;


    public void In_Count(T item)
    {
        if (item == null)
        {
            Trace.TraceInformation("CountUntilNull({0}) [Thread {1}]", this.counter, Thread.CurrentThread.GetHashCode());

            this.Out_Count(this.counter);
            this.counter = 0;
        }
        else
        {
            this.counter++;
            this.Out_Counted(item);
        }
    }


    public event Action<T> Out_Counted;
    public event Action<int> Out_Count;
}

and then count how many results were output by Extract words. Then after as many results left the activity as tasks had entered it the null must be inserted into the data stream:

public class InsertNullAfterItemsForGather<T> where T : class
{
    private int numberOfItemsToGather;
    private int count;


    public void In_Process(T item)
    {
        this.Out_Gather(item);

        lock (this)
        {
            this.count++;
            IssueNull();
        }
    }


    public void In_NumberOfItemsToGather(int numberOfItemsToGather)
    {
        lock (this)
        {
            this.numberOfItemsToGather = numberOfItemsToGather;
            IssueNull();
        }
    }


    private void IssueNull()
    {
        if (this.count >= this.numberOfItemsToGather)
        {
            this.Out_Gather(null);
            this.count = 0;
        }
    }


    public event Action<T> Out_Gather;
}

This is a common pattern and is called scatter-gather. Parallelize scatters work to be done across a number of threads. And a downstream activity gathers the results for further processing – because that´s supposed to be done sequentially, not in parallel.

image

The feature process now works as follows:

  1. A single path is passed to Compile files and is processed asynchronously in some thread T0.
  2. Compile files outputs n filenames terminated by a null.
  3. Count data items counts the number of filenames and passes them on to Parallelize.
  4. Parallelize dispatches processing of those filenames by Extract words to threads T1..Tn (or possibly fewer depending on the thread pool size and the time it takes to process each file).
  5. Extract words is run n times in parallel and issues n tuples containing the words found in the files. The activity “swallows” the null filename, an no null results are output. This made the Extract words activities a bit simpler.
  6. Insert null counts the number of results flowing out of Extract words and inserts a null into the data stream after n items. n had been passed to it by Count data items once it determined how many filenames were generated.
  7. Sequentially processing of the words found is dispatched to thread Tn+1 by Asynchronize. Once Build index encounters the inserted null it write the index to a file.

If you look at the code of the activities you realize they are free of any multi-threading concerns. That makes them easy to test. Asynchronous/parallel processing is an orthogonal aspect and has been completely wrapped into a couple of standard activities which can be combined with domain activities.

However asynchronous processing does not come for free. Some changes needed to be made like moving from IEnumerable<> to a data flow protocol. But I´d argue they did not complicate the business logic.

Sure, async/parallel processing can be more complex. But my feeling is that EBC make it easier to deal with it. With EBC the async/parallel aspect is made visible and very explicit as the above diagrams attest. It´s clearly separate from any domain code. You can reason about the async/parallel processing by walking along graphical feature processes – which then are translated almost mechanically into code.

Also it´s obvious, where synchronizations between threads need to take place.

image

Each dot marks where two running threads meet. At these points you need to think about how to organize access to shared resources. Because data can be passed from one thread to another only via such resources. Which brings me to another isssue…

Synchronization with WinForms main thread

There is a special kind of resource to which not only access from different threads needs to be synchronized, but which can only be accessed from a special thread: most controls on WinForms forms. Access to them is not thread safe by default. So if data arrives on some other thread it needs to be transfered to the main WinForms thread before it can be displayed.

This is another aspect and can be solved by plugging an activity between an output and a WinForms input:

image

I´ve added another project to the indexer sample solution to implement a WinForms frontend:

image

The form is the initiator/data source of the feature process as well as its sink. It´s easy to wire it up to the IndexFiles process (which I´ve adorned with an output for each file found as a progress signal):

var dlg = new WinIndex();
var indexer = new IndexFiles();


dlg.Out_Index += indexer.In_Process;

indexer.Out_FileFoundToIndex += SwitchBackToSyncContext<string>.Wrap(dlg.In_FileFound);
indexer.Out_Statistics += SwitchBackToSyncContext<IndexStats>.Wrap(dlg.In_IndexStats);
indexer.Out_ValidationError += SwitchBackToSyncContext<string>.Wrap(dlg.In_ValidationError);
indexer.Out_UnhandledException += SwitchBackToSyncContext<Exception>.Wrap(dlg.In_Exception);

SwitchBackToSyncContext<>.Wrap creates an aspect activity to be inserted before the one passed to it.

public class SwitchBackToSyncContext<T>
{
    private readonly SynchronizationContext ctx = SynchronizationContext.Current;


    public SwitchBackToSyncContext() : this(SynchronizationContext.Current) { }
    internal SwitchBackToSyncContext(SynchronizationContext ctx)
    {
        this.ctx = ctx;
    }


    public void In_Process(T msg)
    {
        if (this.ctx != null)
            this.ctx.Send(this.Out_ContinueInSyncContext, null);
        else
            this.Out_ContinueInSyncContext(msg);
    }


    public event Action<T> Out_ContinueInSyncContext;


    public static Action<T> Wrap(Action<T> continuation)
    {
        var switchBack = new SwitchBackToSyncContext<T>();
        switchBack.Out_ContinueInSyncContext += continuation;
        return switchBack.In_Process;
    }
}

The aspect activity switches execution back to the synchronization context on which it was created (which is the same as the sync ctx of the WinIndex form).

Check out the code at Google Projects here. It´s the branch leading to the complete async solution including the WinForms frontend.

Summary

This concludes my introduction to Aspect-Oriented Programming (AOP) with Event-Based Components. I hope I was able to show you, how easy it is to insert aspects into EBC designs. But it´s not only easy to do technically. The resulting design is also easy to understand, because aspects are made explicit.

No special tooling is required and yet you´re very flexible. Quickly swap in/out aspects without burdening your domain logic.

Print | posted on Sunday, August 8, 2010 2:12 PM | Filed Under [ Event-Based Components ]

Feedback

No comments posted yet.
Post A Comment
Title:
Name:
Email:
Comment:
Verification:
 

Powered by: