BACKGROUND
Recently, I have been working on Windows Workflow Foundation. We are trying to create a custom host for workflow, and this workflow host is supposed to have a facility, such that it shold be able to provide a way to resume from the exact point where a workflow failed from the Admin UI. Something similar to the Biztalk HAT.
Well, having said that the question how can something like this can be achieved ?
What I am going to present here is one of the ways of achieving it. There would be other ways which would be better than the way I am going present here, need less to say, that I am still trying to learn my way through Windows Workflow Foundation.
Well, the idea came to me after going through some of msdn articles about Workflow from Matt Milner. {Personal Opinion: He is damn good at explaining}
I would highly recommend to read these articles before reading below. Even if you are not interested, I would recommend you to read the above articles, they give hell of an insight into workflow.
SOLUTION
So now you have the background information of the required stuff and as well as we have a problem at hand that we want to solve. This idea is based of RetryActivity. So we create a general purpose, WFFaultReportingSystem Service. I am registering the service to the run time. This service becomes the medium of reporting the error to the Host, that something wrong happened.
using System;
namespace Coderslog.Workflow.Services
{
public interface IWFFaultReportingSystem
{
void ReportError(Exception ex, object message);
void ReportResumableError(Guid wfInstanceId, string queueName,Exception ex, object message);
}
}
Now lets look at the activity itself.
using System;
using System.ComponentModel;
using System.ComponentModel.Design;
using System.Collections;
using System.Drawing;
using System.Linq;
using System.Workflow.ComponentModel;
using System.Workflow.ComponentModel.Design;
using System.Workflow.ComponentModel.Compiler;
using System.Workflow.ComponentModel.Serialization;
using System.Workflow.Runtime;
using System.Workflow.Activities;
using System.Workflow.Activities.Rules;
using Coderslog.Workflow.Services;
namespace Coderslog.Workflow.Activities
{
[Designer(typeof(SequenceDesigner), typeof(IDesigner))]
public partial class ResumableActivity : CompositeActivity, IActivityEventListener<QueueEventArgs>
{
public ResumableActivity()
{
//The call to the partial generated class
InitializeComponent();
QueueName = "ResumableActivity";
CurrentActivityIndex = 0;
}
//The Last Error that happened while executing any child activity
[Browsable(false)]
public Exception LastError { get; set; }
//The Current Activity that I am trying to process now
[Browsable(false)]
public int CurrentActivityIndex { get; set; }
protected override ActivityExecutionStatus Execute(ActivityExecutionContext executionContext)
{
if (EnabledActivities.Count == 0)
return ActivityExecutionStatus.Closed;
//try to execute the child activities in a new context
BeginIteration(executionContext);
return ActivityExecutionStatus.Executing;
}
//Initiating the Iteration
private void BeginIteration(ActivityExecutionContext executionContext)
{
//create a new activity execution context
Activity child = EnabledActivities[CurrentActivityIndex];
ActivityExecutionContext newContext = executionContext.ExecutionContextManager.CreateExecutionContext(child);
//register for both closed and faulted events
newContext.Activity.Closed += new EventHandler<ActivityExecutionStatusChangedEventArgs>(child_Closed);
newContext.Activity.Faulting += new EventHandler<ActivityExecutionStatusChangedEventArgs>(Activity_Faulting);
//execute the clone activity
newContext.ExecuteActivity(newContext.Activity);
}
void Activity_Faulting(object sender, ActivityExecutionStatusChangedEventArgs e)
{
//if the child faults, clear the exception if retrying
e.Activity.Faulting -= Activity_Faulting;
//Get hold of the error
LastError = (Exception)e.Activity.GetValue(ActivityExecutionContext.CurrentExceptionProperty);
//Clear the Error, We don't want anyone to know anything about the Error
e.Activity.SetValue(ActivityExecutionContext.CurrentExceptionProperty, null);
}
/// <summary>
/// Creates the queue and registers for items to be available.
/// </summary>
/// <param name="parentContext"></param>
/// <param name="parentEventHandler"></param>
public void Subscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler)
{
WorkflowQueuingService qService = parentContext.GetService<WorkflowQueuingService>();
if (qService != null)
{
if (!qService.Exists(QueueName))
{
WorkflowQueue q = qService.CreateWorkflowQueue(QueueName, false);
q.RegisterForQueueItemAvailable(parentEventHandler, QualifiedName);
}
}
}
//Property for holding the QueueName on which to Subscribe and Expect the response
[Browsable(false)]
public string QueueName { get; private set; }
/// <summary>
/// Removes the listener from the queue and deletes the queue.
/// </summary>
/// <param name="parentContext"></param>
/// <param name="parentEventHandler"></param>
public void Unsubscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler)
{
WorkflowQueuingService qService = parentContext.GetService<WorkflowQueuingService>();
if (qService != null)
{
WorkflowQueue queue = qService.GetWorkflowQueue(QueueName);
if (queue != null)
{
queue.UnregisterForQueueItemAvailable(parentEventHandler);
qService.DeleteWorkflowQueue(QueueName);
}
}
}
// Helper Method to Get Hold of the Queue
private WorkflowQueue GetQueue(ActivityExecutionContext ctx)
{
WorkflowQueuingService qService = ctx.GetService<WorkflowQueuingService>();
if (qService != null && qService.Exists(QueueName))
return qService.GetWorkflowQueue(QueueName);
else
return null;
}
//The Big Bang Close Method
void child_Closed(object sender, ActivityExecutionStatusChangedEventArgs e)
{
ActivityExecutionContext thisContext = sender as ActivityExecutionContext;
ActivityExecutionContext childContext = thisContext.ExecutionContextManager.GetExecutionContext(e.Activity);
e.Activity.Closed -= child_Closed;
thisContext.ExecutionContextManager.CompleteExecutionContext(childContext);
//if of the EnabledActivities, the current one has completed successfully, move to the next one
if (e.ExecutionResult == ActivityExecutionResult.Succeeded && CurrentActivityIndex < EnabledActivities.Count - 1)
{
this.SetValue(ActivityExecutionContext.CurrentExceptionProperty, null);
CurrentActivityIndex += 1;
BeginIteration(thisContext);
return;
}
//if the child completed successfully, then we can close
if (e.ExecutionResult == ActivityExecutionResult.Succeeded && CurrentActivityIndex == EnabledActivities.Count - 1)
{
this.SetValue(ActivityExecutionContext.CurrentExceptionProperty, null);
thisContext.CloseActivity();
return;
}
//otherwise, if we are not done retrying
//we need to resume again and make sure to clean up errors
if (this.ExecutionStatus == ActivityExecutionStatus.Executing)
{
SetResumePoint(e, thisContext);
}
}
// We try and get the instance of the Fault Reposrting System, if yes, we report the error and subscribe to the Queue
// so that on basis of the users input we would be able to resume from the point where it broke or terminate
private void SetResumePoint(ActivityExecutionStatusChangedEventArgs e, ActivityExecutionContext thisContext)
{
IWFFaultReportingSystem wfFRS = thisContext.GetService<IWFFaultReportingSystem>();
var error = LastError;
if (wfFRS == null)
{
throw new Exception("This activity requires that the Service IWFFaultReportingSystem is registered");
}
if (error != null)
{
this.SetValue(ActivityExecutionContext.CurrentExceptionProperty, null);
Subscribe(thisContext, this);
wfFRS.ReportResumableError(WorkflowInstanceId, QueueName, error, null);
}
}
// Fires when there is the required actvity in the Queue
public void OnEvent(object sender, QueueEventArgs e)
{
//start a new iteration after the event happens
ActivityExecutionContext ctx = sender as ActivityExecutionContext;
if (sender == null)
throw new ArgumentException("Sender must be ActivityExecutionContext");
WorkflowQueue q = GetQueue(ctx);
if (q != null)
{
object data = q.Dequeue();
//use the data - Resume
if (data != null && bool.Parse(data.ToString()))
{
Unsubscribe(ctx, this);
BeginIteration(ctx);
}
else
{
throw new ApplicationException("The Workflow cannot be Resumed from this point onwards!");
}
}
}
}
}
The other peripherals:
using System;
using System.ComponentModel;
using System.ComponentModel.Design;
using System.Collections;
using System.Drawing;
using System.Reflection;
using System.Workflow.ComponentModel;
using System.Workflow.ComponentModel.Design;
using System.Workflow.ComponentModel.Compiler;
using System.Workflow.ComponentModel.Serialization;
using System.Workflow.Runtime;
using System.Workflow.Activities;
using System.Workflow.Activities.Rules;
namespace Coderslog.Workflow.Activities
{
public partial class ResumableActivity
{
#region Designer generated code
/// <summary>
/// Required method for Designer support - do not modify
/// the contents of this method with the code editor.
/// </summary>
[System.Diagnostics.DebuggerNonUserCode]
private void InitializeComponent()
{
this.Name = "ResumableActivity";
}
#endregion
}
}
This covers the Activity. I have tried and put comments where ever required. Hopefully it should be self explanatory.
IMPORTANT PART OF CLIENT CODE - TEST HARNESS
Sample IWFFaultReportingSystem.cs implementation.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using CP.Services;
using System.Workflow.Runtime;
namespace Coderslog.Client.ResumableWorkflow
{
public class FaultReportingSystem:IWFFaultReportingSystem
{
WorkflowRuntime _runtime;
Guid _instanceId;
string _queueName;
public FaultReportingSystem(WorkflowRuntime runtime)
{
_runtime = runtime;
}
#region IWFFaultReportingSystem Members
public void ReportError(Exception ex, object message)
{
Console.WriteLine(ex.Message);
}
public void ReportResumableError(Guid wfInstanceId, string queueName, Exception ex, object message)
{
Console.WriteLine("WF ID :" + wfInstanceId.ToString());
Console.WriteLine("Queue Name :" + queueName);
Console.WriteLine("Error :" + ex.Message);
_instanceId = wfInstanceId;
_queueName = queueName;
var action = new Action(ProcessError);
action.BeginInvoke(CallBack, action);
}
void ProcessError()
{
Console.WriteLine("What you want to do ? R for Resuming or any key for Exit");
var key = Console.ReadLine();
var instance = _runtime.GetWorkflow(_instanceId);
if (key == "R" || key == "r")
{
instance.EnqueueItem(_queueName, true, null, null);
}
else
{
instance.EnqueueItem(_queueName, false, null, null);
}
}
#endregion
static void CallBack(IAsyncResult ar)
{
var action = ar.AsyncState as Action;
if (action != null)
action.EndInvoke(ar);
}
}
}
The Highlighted part shows what the user needs to do to resume the workflow. Just drop a message in the Workflow Queue.
You remember the
previous post on dynamic. We will today look into it in a bit more detail. Yeah, let's ILDASM it and try and identify what is going on under the hood. But for the context of the post let's look at a simpler tale.
namespace Coderslog.Net4.Samples
{
public class SimpleDynamic
{
public void CallDoSomething()
{
var instance = GetDoSomething();
instance.DoSometing();
}
private dynamic GetDoSomething()
{
return new BehaviourA();
}
}
}
A very basic class, hmmm which roughly translates to something like, I might not be absolutely correct, but to get a fair idea, it is something like the following
public class SimpleDynamic
{
// Method: A Bit more involved than the reflection way, with Optimization
public void CallDoSomething()
{
object instance = this.GetDoSomething();
if (CallDoSometning_SiteContainer.delegatePlaceHolder == null)
{
CallDoSometning_SiteContainer.delegatePlaceHolder =
CallSite<Action<CallSite, object>>.Create(
new CSharpInvokeMemberBinder(
CSharpCallFlags.None,
"DoSometing",
typeof(SimpleDynamic),
null,
new []
{
new CSharpArgumentInfo(
CSharpArgumentInfoFlags.None
, null
)
}
));
}
CallDoSometning_SiteContainer.
delegatePlaceHolder.
Target.Invoke(
CallDoSometning_SiteContainer.delegatePlaceHolder
, instance);
}
//But theres an attribute on the object of
//Type:System.Runtime.CompilerServices.DynamicAttribute
private object GetDoSomething()
{
return new BehaviourA();
}
// Nested Types
//Compiler Generated Class- Internal Class
private static class CallDoSometning_SiteContainer
{
// Fields
public static CallSite<Action<CallSite, object>> delegatePlaceHolder;
}
}
So as you can see, the internal static class is there so as to ensure that, we don't always spend resources on the delegate, once it is evaluated.
If we look at the GetDomethingMethod, the Dydnamic Attribute on the returned object from the method GetDoSomething tells the run time that "
Indicates that the use of Object on a member is meant to be treated as a dynamically dispatched type.". If you notice the type of the static field is of type
CallSite<Action<CallSite, object>> is System.Runtime.CompilerServices..::.CallSite<(Of <(T>)>) {
MSDN: A dynamic call site base class. This type is used as a parameter type to the dynamic site targets.}.
Lets look at the place where all the action is happening, i.e. the method CallDoSomething. the method checks if the class CallDoSometning_SiteContainer.delegatePlaceHolder is populated or not, but if not it initializes it with the Create Method and you can notice ... CSharpInvokeMemberBinder {CompleteNamespace: Microsoft.CSharp.RuntimeBinder.CSharpInvokeMemberBinder} {MSDN: Represents a dynamic method call in C#, providing the binding semantics and the details about the operation. Instances of this class are generated by the C# compiler }.
This feels like .Net Reflection and one tends to ask, how optimized is it. Well, something Microsoft can enlighten us more about.
Lets examine the dynamic key word introduced in .Net 4.0. It can keep an instance of any type. Then what is the difference between it and System.Object. Well the essential difference between them is the dynamic method dispatching which enables lazy binding, i.e. it allows you to write method, operator and indexer calls, property and field accesses, and even object invocations which bypass the normal static binding of C# and instead gets resolved dynamically.
It provides a unified approach to selecting opeartions dynamically.
Lets see an example:
//Class Program.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Coderslog.Net4.Samples
{
class Program
{
private static Type _toggle;
static void Main(string[] args)
{
var val = "";
do
{
//Dynamic way
dynamic d = GetMeMyDynamicObject();
Console.WriteLine(d.GetType().FullName);
d.DoSometing();
//Reflection way
object inst = GetMeMyDynamicObject();
Console.WriteLine(inst.GetType().FullName);
Invoke(inst, "DoSometing" );
Console.WriteLine(
"Press Q/q to quit and y/Y to continue!");
val = Console.ReadLine();
} while (val.ToLower() != "q");
}
private static void Invoke(object inst, string methodName)
{
var methodInfo = inst.GetType().GetMethod(methodName);
methodInfo.Invoke(inst, null);
}
private static dynamic GetMeMyDynamicObject()
{
if (_toggle == null)
_toggle = typeof(BehaviourB);
if (typeof(BehaviourB) == _toggle)
{
_toggle = typeof(BehaviourA);
return new BehaviourA();
}
else
{
_toggle = typeof(BehaviourB);
return new BehaviourB();
}
}
}
}
//Class BehaviourA.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Coderslog.Net4.Samples
{
public class BehaviourA
{
public void DoSometing()
{
Console.WriteLine("Behaviour A - Do Something!");
}
}
}
//Class BehaviourB.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Coderslog.Net4.Samples
{
public class BehaviourB
{
public void DoSometing()
{
Console.WriteLine("Behaviour B - Do Something!");
}
}
}
Notice the dynamic way of invoking the method and Reflection way. Thats the benefit of dyanmic and many more. But it comes with its own problem. The same problem that is there with the reflection way is still there.
We will as well look at some other aspects of the dynamic in the following posts.