My previous article introduced the Flow Execution Engine NPantaRhei (download here from github). It showed how to define a data flow and register its operations.
By default such flows are executed synchronously and sequentially – although in the background with regard to their initiator. But it´s easy to parallelize execution of operations. Here´s a first suggestion for how to run operations concurrently in the example given in the first article.
The dots in the operations signify asynchronous operation:
- A single dot means, the operation is running on its own single thread. Messages arriving do not hold up other operations. They are executed in the order they arrive.
- Two dots mean, the operation is running on multiple threads. Messages arriving do not hold up other operations. They are executed in parallel. Although execution is started in the order they arrive, the order of results might be different.
- No dot means, the operation is running just on the Execution Engine thread.
What the design above is supposed to mean is: There are two time consuming operations, finding all relevant files and counting the words in them. These operations should run concurrently so counting words can be started before all files have been found. And counting words in one file can be done in parallel to counting words in another file.
Totalling the word counts is not performance sensitive. It need not run on its own thread.
As easily as the above design can be understood and as simple as it could be implemented – it won´t work. Find Files and Count Words currently return a list of results. That´s not compatible with running concurrently. They need to be switched to stream output: Find Files needs to output a stream of filenames so each one can be processed on a different thread by Count Words. And thus there is no single Count Word method call running, but several, so Count Words can no longer output a list, but needs to switch to a stream too.
Totalling, however, needs to work on a list of word counts, or at least it needs to know how many word counts to exepect; its purpose is to output a single result.
For these reasons the design has to change:
The Scatter operation turns the list of file names produced by Find Files into a stream. And the Gather operation turns the stream of word counts back into a list. Neither the output of Find Files nor the input to Total need to change. Just Count Words has to be switched from list processing to stream processing.
Here´s how to set this design up with the Flow Execution Engine:
using(var fr = new FlowRuntime())
var foc = new FlowOperationContainer()
.AddFunc<string, IEnumerable<String>>("Find_files", Find_files).MakeAsync() /*2*/
.AddFunc<string,int>("Count_words", Count_words).MakeParallel() /*3*/
.AddFunc<IEnumerable<int>, Tuple<int,int>>("Total", Total);
fr.AddOperation(new Scatter<string>("scatter")); /*1*/
fr.Process(new Message(".in", "…"));
Tuple<int,int> result = null;
fr.WaitForResult(5000, _ => result = (Tuple<int,int>)_.Data);
- /*1*/ …how the Scatter and Gather operations are standard operations which can be instantiated manually.
- /*2*/ …how any operation can be made async using the fluent interface of the operation container.
- /*3*/ …how any operation can be made parallel using the fluent interface of the operation container.
There is no need to “take back” the parallel processing for Total. But if you need to interface with a GUI framework you might be compelled to switch an operation back to the GUI thread. You can do that by applying .MakeSync() to the operation (optionally providing a SynchronizationContext).
Flow execution is done on 1+n threads. Without any intervention messages are taken from an internal queue by the Execution Engine running on its own thread. Each message is passed to an operation. Execution of an operation results in 0..n output messages which are put into the internal queue. This all happens on the Execution Engine thread. Once the operation finishes the call stack collapses back to the message loop of the Engine which picks the next message from the queue etc. At any time thus there is only one operation executing. That´s the default.
But if you like, you can have operations running on their own threads. Make them async (1 thread) or even parallel (n threads) so they don´t hold up message processing by the Engine. It´s as simple as switching concurrency on using the fluent interface shown above.
But beware: Concurrent operations come with their own problems. Data races and shared data access conflicts need to be taken into consideration. The Execution Engines just makes it easy to implement your designs; you need to get the concepts right yourself ;-)
Because that´s not always easy and bugs will kreep in, I´ll show you how to handle execeptions in my next article.