The Architect´s Napkin

Software Architecture on the Back of a Napkin
posts - 69 , comments - 229 , trackbacks - 0

My Links


Article Categories


Post Categories

Catch Exceptions in Concurrent Flows Using Causalities

The previous article showed you how to easily parallelize flow operation execution with NPantaRhei (download the sources from github).

Once operations run concurrently, though, you´ve to pay your dues. Benefits or concurrent execution like higher performance, lower latency, or increased throughput come at a price. What that means conceptually e.g. for access to shared data, I leave to you to research. Here I want to show you how to practically handle errors in such flows. Because error handling cannot rely on the usual try-catch statement, you need to take care of it in your flow definition.

Causalities: Contexts for Exceptions

The question is: How to catch exceptions when there is no call stack anymore? You could do it by providing each operation with its own exception port and catch the exceptions yourself. This is ok, if you don´t want to check on too many operations. But what if an exeption occurs in an unchecked operation? One that you thought would not fail?

Wouldn´t it be better to just be able to say: “From here on, however the data flows, if an exception occurs, it should be handled such and such.”

That´s what try-catch does in a call stack world. And that´s what CCR causalities do in a concurrent world.

Causalities have been introduced by Microsofts Concurrency Coordination Runtime of ill fate. It was a great approach to more manageable concurrent programming, but did not gain much attention due to being buried inside Microsoft Robotics Studio. Nevertheless the concept stuck with me, and so I introduced it to the Flow Execution Engine.

Think of a causality as a very flexible sleeve or tube around a flow:


All operations inside this sleeve share a common exception handler. If an exception occurs it is caught by the causality and relayed to an exception handling operation. Nothing has to be changed within the operations inside the causality sleeve. The information on where to pass an exception is out of band. Any exception is caught by the Execution Engine which checks if a causality is active, and if so passes the exception wrapped inside a FlowRuntimeException to the causality´s exception port.

Once switched on, causalities stretch across the flow regardless where messages flow – until they are switched off. Even though the above drawing shows the causality as part of the flow it is in fact part of the messages flowing. When a message passes through the standard operation to switch on a causality, the causality is attached to the message. And when the message is worked on by an operation, the causality is copied to any messages produced by the operation. But that´s technical details. Better to think of a causality as a sleeve stretched over parts of a flow.

Like try-catch clauses causalities can be nested. Switching on a causality in fact is pushing a causality on a stack:


An exception occurring in operations C,E,F will be relayed to the red inner causality´s exception port, an exception in A,B,D,G will be relayed to the green outer causality´s port.

Please note: A causality is just a context for exceptions. It catches exceptions in any encompassed operations and relays them to a port. What you do with the exception is your business. The Execution Engine won´t halt because an exception occurred; it will continue processing messages. So if you want to change flow execution due to an exception, you have to build that into your flows.

Putting Causalities into a Flow

Back to the sample flow to show you, how to implement causalities using the Flow Execution Engine:


First define the streams flowing to/from the causality operations. To push the causality on the stack, just use the push causality operation´s name. The output port to connect to the exception handler is called “.exception”, though.

fr.AddStream(".in", "Pushc");
fr.AddStream("Pushc", "Find Files");
fr.AddStream("Pushc.exception", "Handle Exception");
fr.AddStream("Find Files", "Count Words");
fr.AddStream("Count Words", "Popc");
fr.AddStream("Popc", "Total");
fr.AddStream("Total", ".out");

Pushing and poping causalities is a matter of standard operations. Call them anyway you like. You can even reuse them in different parts of the flow:

var foc = new FlowOperationContainer()
    .AddFunc<string, IEnumerable<String>>("Find Files", Find_files)
    .AddFunc<IEnumerable<string>, IEnumerable<int>>("Count Words", Count_words)
    .AddFunc<IEnumerable<int>, Tuple<int, int>>("Total", Total)
    .AddAction<FlowRuntimeException>("Handle Exception", Handle_exception);

The exception handler is a method accepting a FlowRuntimeException:

void Handle_exception(FlowRuntimeException ex)

Exceptions flow like any other data. Once the are caught by a causality you can process them in their own sub-flows.

Handling Unhandled Exceptions

If you don´t set up causalities or there are parts of your flows not covered by them, you can still catch exceptions. Just register an handler with the UnhandledException event of the Execution Engine:

fr.UnhandledException += ex => {…}

It´s that easy.

Just remember: Regardless of how you catch exceptions, they arrive on some background thread. So if you want to display some information on them in a GUI, you probably need to switch processing to the GUI thread. Use MakeSync() on a causality exception handler operation; do it your own way for unhandled exceptions.

Print | posted on Saturday, May 5, 2012 2:19 PM | Filed Under [ Software modelling ]


No comments posted yet.
Post A Comment

Powered by: