Learn.Develop.Share

Twitter












Resumable Workflow Activity

BACKGROUND
Recently, I have been working on Windows Workflow Foundation. We are trying to create a custom host for workflow, and this workflow host is supposed to have a facility, such that it shold be able to provide a way to  resume from the exact point where a workflow failed from the Admin UI. Something similar to the Biztalk HAT.
Well, having said that the question how can something like this can be achieved ?
What I am going to present here is one of the ways of achieving it. There would be other ways which would be better than the way I am going present here, need less to say, that I am still trying to learn my way through Windows Workflow Foundation.
Well, the idea came to me after going through some of msdn articles about Workflow from Matt Milner. {Personal Opinion: He is damn good at explaining}
1. Error Handling In Workflows - Matt Milner
2. ActivityExecutionContext in Workflows - Matt Milner
3. Build Custom Activities To Extend The Reach Of Your Workflows - Matt Milner
I would highly recommend to read these articles before reading below. Even if you are not interested, I would recommend you to read the above articles, they give hell of an insight into workflow.
SOLUTION
So now you have the background information of the required stuff and as well as we have a problem at hand that we want to solve. This idea is based of RetryActivity. So we create a general purpose, WFFaultReportingSystem Service. I am registering the service to the run time. This service becomes the medium of reporting the error to the Host, that something wrong happened.
 
using System;
namespace Coderslog.Workflow.Services
{
    public interface IWFFaultReportingSystem
    {
        void ReportError(Exception ex, object message);
        void ReportResumableError(Guid wfInstanceId, string queueName,Exception ex, object message);
    }
}
Now lets look at the activity itself.
using System;
using System.ComponentModel;
using System.ComponentModel.Design;
using System.Collections;
using System.Drawing;
using System.Linq;
using System.Workflow.ComponentModel;
using System.Workflow.ComponentModel.Design;
using System.Workflow.ComponentModel.Compiler;
using System.Workflow.ComponentModel.Serialization;
using System.Workflow.Runtime;
using System.Workflow.Activities;
using System.Workflow.Activities.Rules;
using Coderslog.Workflow.Services;

namespace Coderslog.Workflow.Activities
{
        
[Designer(typeof(SequenceDesigner), typeof(IDesigner))]
        public partial class ResumableActivity : CompositeActivity, IActivityEventListener<QueueEventArgs>
        {
                public ResumableActivity()
                {
                                                  //The call to the partial generated class                         
                                                 InitializeComponent();
            
                        
                                                QueueName               = "ResumableActivity";
            
                        CurrentActivityIndex    = 0;
                }
                               
                                //The Last Error that happened while executing any child activity
        
               [Browsable(false)]
        
               public Exception LastError { get; set; }
                              
                               //The Current Activity that I am trying to process now
        
               [Browsable(false)]
        
               public int CurrentActivityIndex { get; set; }

                
        protected override ActivityExecutionStatus Execute(ActivityExecutionContext executionContext)
        {
            if (EnabledActivities.Count == 0)
                return ActivityExecutionStatus.Closed;

            
//try to execute the child activities in a new context
            BeginIteration(executionContext);
            return ActivityExecutionStatus.Executing;
        }
                //Initiating the Iteration
        private void BeginIteration(ActivityExecutionContext executionContext)
        {
            /
/create a new activity execution context
            Activity child = EnabledActivities[CurrentActivityIndex];
            ActivityExecutionContext newContext = executionContext.ExecutionContextManager.CreateExecutionContext(child);

            
//register for both closed and faulted events
            newContext.Activity.Closed += new EventHandler<ActivityExecutionStatusChangedEventArgs>(child_Closed);
            newContext.Activity.Faulting += new EventHandler<ActivityExecutionStatusChangedEventArgs>(Activity_Faulting);

            
//execute the clone activity
            newContext.ExecuteActivity(newContext.Activity);
        }

        void Activity_Faulting(object sender, ActivityExecutionStatusChangedEventArgs e)
        {
            
//if the child faults, clear the exception if retrying
            e.Activity.Faulting -= Activity_Faulting;
            
            
//Get hold of the error
            LastError = (Exception)e.Activity.GetValue(ActivityExecutionContext.CurrentExceptionProperty);
                         //Clear the Error, We don't want anyone to know anything about the Error            
                         e.Activity.SetValue(ActivityExecutionContext.CurrentExceptionProperty, null);
        }

        
/// <summary>
        /// Creates the queue and registers for items to be available.
        /// </summary>
        /// <param name="parentContext"></param>
        /// <param name="parentEventHandler"></param>

        public void Subscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler)
        {
            WorkflowQueuingService qService = parentContext.GetService<WorkflowQueuingService>();
            if (qService != null)
            {
                if (!qService.Exists(QueueName))
                {
                    WorkflowQueue q = qService.CreateWorkflowQueue(QueueName, false);
                    q.RegisterForQueueItemAvailable(parentEventHandler, QualifiedName);
                }

            }
        }
                //Property for holding the QueueName on which to Subscribe and Expect the response
        [Browsable(false)]
        public string QueueName { get; private set; }
        

   
     /// <summary>
        /// Removes the listener from the queue and deletes the queue.  
        /// </summary>
        /// <param name="parentContext"></param>
        /// <param name="parentEventHandler"></param>

        public void Unsubscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler)
        {
            WorkflowQueuingService qService = parentContext.GetService<WorkflowQueuingService>();
            if (qService != null)
            {
                WorkflowQueue queue = qService.GetWorkflowQueue(QueueName);
                if (queue != null)
                {
                    queue.UnregisterForQueueItemAvailable(parentEventHandler);
                    qService.DeleteWorkflowQueue(QueueName);
                }
            }
        }
               
               // Helper Method to Get Hold of the Queue
        private WorkflowQueue GetQueue(ActivityExecutionContext ctx)
        {
            WorkflowQueuingService qService = ctx.GetService<WorkflowQueuingService>();
            if (qService != null && qService.Exists(QueueName))
                return qService.GetWorkflowQueue(QueueName);
            else
                return null;
        }


                //The Big Bang Close Method
        void child_Closed(object sender, ActivityExecutionStatusChangedEventArgs e)
        {
            ActivityExecutionContext thisContext = sender as ActivityExecutionContext;
            ActivityExecutionContext childContext = thisContext.ExecutionContextManager.GetExecutionContext(e.Activity);
            e.Activity.Closed -= child_Closed;

            thisContext.ExecutionContextManager.CompleteExecutionContext(childContext);
                        
                         //if of the EnabledActivities, the current one has completed successfully, move to the next one
            if (e.ExecutionResult == ActivityExecutionResult.Succeeded && CurrentActivityIndex < EnabledActivities.Count - 1)
            {
                this.SetValue(ActivityExecutionContext.CurrentExceptionProperty, null);
                CurrentActivityIndex += 1;
                BeginIteration(thisContext);
                return;
            }

            
//if the child completed successfully, then we can close
            if (e.ExecutionResult == ActivityExecutionResult.Succeeded && CurrentActivityIndex == EnabledActivities.Count - 1)
            {
                this.SetValue(ActivityExecutionContext.CurrentExceptionProperty, null);
                thisContext.CloseActivity();
                return;
            }

            
            
//otherwise, if we are not done retrying
            //we need to resume again and make sure to clean up errors

            if (this.ExecutionStatus == ActivityExecutionStatus.Executing)
            {
                SetResumePoint(e, thisContext);
            }
        }
               
                // We try and get the instance of the Fault Reposrting System, if yes, we report the error and subscribe to the Queue
                // so that on basis of the users input we would be able to resume from the point where it broke or terminate
        private void SetResumePoint(ActivityExecutionStatusChangedEventArgs e, ActivityExecutionContext thisContext)
        {
            IWFFaultReportingSystem wfFRS = thisContext.GetService<IWFFaultReportingSystem>();
            var error = LastError;
            if (wfFRS == null)
            {
                throw new Exception(
"This activity requires that the Service IWFFaultReportingSystem is registered");
            }
            if (error != null)
            {
                this.SetValue(ActivityExecutionContext.CurrentExceptionProperty, null);
                Subscribe(thisContext, this);
                wfFRS.ReportResumableError(WorkflowInstanceId, QueueName, error, null);
            }
        }
               
               // Fires when there is the required actvity in the Queue
        public void OnEvent(object sender, QueueEventArgs e)
        {
            
//start a new iteration after the event happens
            ActivityExecutionContext ctx = sender as ActivityExecutionContext;
            if (sender == null)
                throw new ArgumentException(
"Sender must be ActivityExecutionContext");
            
            WorkflowQueue q = GetQueue(ctx);
            if (q != null)
            {
                object data = q.Dequeue();
                
//use the data - Resume
                if (data != null && bool.Parse(data.ToString()))
                {
                    Unsubscribe(ctx, this);
                    BeginIteration(ctx);
                }
                else
                {
                    throw new ApplicationException(
"The Workflow cannot be Resumed from this point onwards!");
                }
            }
        }
    }
}
The other peripherals:
using System;
using System.ComponentModel;
using System.ComponentModel.Design;
using System.Collections;
using System.Drawing;
using System.Reflection;
using System.Workflow.ComponentModel;
using System.Workflow.ComponentModel.Design;
using System.Workflow.ComponentModel.Compiler;
using System.Workflow.ComponentModel.Serialization;
using System.Workflow.Runtime;
using System.Workflow.Activities;
using System.Workflow.Activities.Rules;

namespace Coderslog.Workflow.Activities
{
    
        public partial class ResumableActivity
        {
                #region Designer generated code
                
                
/// <summary>
                /// Required method for Designer support - do not modify
                /// the contents of this method with the code editor.
                /// </summary>
        
                [System.Diagnostics.DebuggerNonUserCode]
                private void InitializeComponent()
                {
                        this.Name = "ResumableActivity";
                }

                #endregion
        }
}
This covers the Activity. I have tried and put comments where ever required. Hopefully it should be self explanatory.
IMPORTANT PART OF CLIENT CODE - TEST HARNESS
Sample IWFFaultReportingSystem.cs implementation.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using CP.Services;
using System.Workflow.Runtime;

namespace Coderslog.Client.ResumableWorkflow
{
        public class FaultReportingSystem:IWFFaultReportingSystem
        {
        WorkflowRuntime _runtime;
        
Guid            _instanceId;
        string          _queueName;



        public FaultReportingSystem(WorkflowRuntime runtime)
        {
            _runtime = runtime;
        }

        #region IWFFaultReportingSystem Members

        public void ReportError(Exception ex, object message)
        {
            Console.WriteLine(ex.Message);
        }

        public void ReportResumableError(Guid wfInstanceId, string queueName, Exception ex, object message)
        {
            Console.WriteLine("WF ID :" +  wfInstanceId.ToString());
            Console.WriteLine("Queue Name :" + queueName);
            Console.WriteLine("Error :" + ex.Message);

            
_instanceId = wfInstanceId;
            _queueName = queueName;


            var action = new Action(ProcessError);
            action.BeginInvoke(CallBack, action);   
        }

        void ProcessError()
        {
            Console.WriteLine("What you want to do ? R for Resuming or any key for Exit");
            var key = Console.ReadLine();
            
var instance = _runtime.GetWorkflow(_instanceId);
            if (key == "R" || key == "r")
            {
                
instance.EnqueueItem(_queueName, true, null, null);
            }
            else
            {
                
instance.EnqueueItem(_queueName, false, null, null);
            }
        }
        #endregion

        static void CallBack(IAsyncResult ar)
        {
            var action = ar.AsyncState as Action;
            if (action != null)
                action.EndInvoke(ar);
        }
    }
}
The Highlighted part shows what the user needs to do to resume the workflow. Just drop a message in the Workflow Queue.

  • Share This Post:
  • Share on Twitter
  • Share on Facebook
  • Share on Technorati

dynamic - II - introduction

You remember the previous post on dynamic. We will today look into it in a bit more detail. Yeah, let's ILDASM it and try and identify what is going on under the hood. But for the context of the post let's look at a simpler tale.
 
namespace Coderslog.Net4.Samples
{
    public class SimpleDynamic
    {

        public void CallDoSomething()
        {
            var instance = GetDoSomething();
            instance.DoSometing();
        }

        private dynamic GetDoSomething()
        {
            return new BehaviourA();
        }
    }
}
A very basic class, hmmm which roughly translates to something like, I might not be absolutely correct, but to get a fair idea, it is something like the following
 
public class SimpleDynamic
{
    // Method: A Bit more involved than the reflection way, with Optimization    
    public void CallDoSomething()
    {
        object instance = this.GetDoSomething();
        if (CallDoSometning_SiteContainer.delegatePlaceHolder == null)
        {
            CallDoSometning_SiteContainer.delegatePlaceHolder =
                CallSite<Action<CallSite, object>>.Create
(
                        new CSharpInvokeMemberBinder
(
                                
CSharpCallFlags.None,
                                "
DoSometing",
                                typeof(
SimpleDynamic),
                                null
,
                                new []
                                        {
                                                
new CSharpArgumentInfo(
                                                        CSharpArgumentInfoFlags.None
                                                        , null
                                                )

                                        }
                        
));
        }
        CallDoSometning_SiteContainer.
                delegatePlaceHolder.
                        Target.Invoke(
                                CallDoSometning_SiteContainer.delegatePlaceHolder
                                , instance);
    }

    
    //But theres an attribute on the object of
    //Type:System.Runtime.CompilerServices.DynamicAttribute
    private object GetDoSomething()
    {
        return new BehaviourA();
    }

    // Nested Types
    //Compiler Generated Class- Internal Class
    private static class CallDoSometning_SiteContainer
    {
        // Fields
        public static CallSite<Action<CallSite, object>> delegatePlaceHolder;
    }

}
 
So as you can see, the internal static class is there so as to ensure that, we don't always spend resources on the delegate, once it is evaluated.
If we look at the GetDomethingMethod, the Dydnamic Attribute on the returned object from the method GetDoSomething tells the run time that "Indicates that the use of  Object on a member is meant to be treated as a dynamically dispatched type.". If you notice the type of the static field is of type CallSite<Action<CallSite, object>> is  System.Runtime.CompilerServices..::.CallSite<(Of <(T>)>) { MSDN: A dynamic call site base class. This type is used as a parameter type to the dynamic site targets.}.  
 
Lets look at the place where all the action is happening, i.e. the method  CallDoSomething. the method checks if the class CallDoSometning_SiteContainer.delegatePlaceHolder is populated or not, but if not it initializes it with the Create Method and you can notice ... CSharpInvokeMemberBinder {CompleteNamespace: Microsoft.CSharp.RuntimeBinder.CSharpInvokeMemberBinder} {MSDN:  Represents a dynamic method call in C#, providing the binding semantics and the details about the operation. Instances of this class are generated by the C# compiler }.
 
This feels like .Net Reflection and one tends to ask, how optimized is it. Well, something Microsoft can enlighten us more about.

  • Share This Post:
  • Share on Twitter
  • Share on Facebook
  • Share on Technorati

dynamic - Introduction

Lets examine the dynamic key word introduced in .Net 4.0. It can keep an instance of any type. Then what is the difference between it and System.Object. Well the essential difference between them is the dynamic method dispatching which enables lazy binding, i.e. it allows you to write method, operator and indexer calls, property and field accesses, and even object invocations which bypass the normal static binding of C# and instead gets resolved dynamically.
It provides a unified approach to selecting opeartions dynamically.
 
Lets see an example:
 
//Class Program.cs
 
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Coderslog.Net4.Samples
{
    class Program
    {
        private static Type _toggle;

        static void Main(string[] args)
        {
            var val = "";
            do
            {
                                 //Dynamic way
                dynamic d = GetMeMyDynamicObject();
                Console.WriteLine(d.GetType().FullName);
                d.DoSometing();
                                 
                                //Reflection way
                object inst = GetMeMyDynamicObject();
                Console.WriteLine(inst.GetType().FullName);
                Invoke(inst, "DoSometing" );       
         
                
                                 Console.WriteLine(
                                             "Press Q/q to quit and y/Y to continue!");
                val = Console.ReadLine();

            } while (val.ToLower() != "q");
        }

        
private static void Invoke(object inst, string methodName)
        {
            var methodInfo = inst.GetType().GetMethod(methodName);
            methodInfo.Invoke(inst, null);
        }


        private static dynamic GetMeMyDynamicObject()
        {
            if (_toggle == null)
                _toggle = typeof(BehaviourB);
            if (typeof(BehaviourB) == _toggle)
            {
                _toggle = typeof(BehaviourA);
                return new BehaviourA();
            }
            else
            {
                _toggle = typeof(BehaviourB);
                return new BehaviourB();
            }
        }
    }
}
 
//Class BehaviourA.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Coderslog.Net4.Samples
{
    public class BehaviourA
    {
        public void DoSometing()
        {
            Console.WriteLine("Behaviour A - Do Something!");
        }
    }
}
 
//Class BehaviourB.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Coderslog.Net4.Samples
{
    public class BehaviourB
    {
        public void DoSometing()
        {
            Console.WriteLine("Behaviour B - Do Something!");
        }
    }
}
 
Notice the dynamic way of invoking the method and Reflection way. Thats the benefit of dyanmic and many more. But it comes with its own problem. The same problem that is there with the reflection way is still there.
 
We will as well look at some other aspects of the dynamic in the following posts.

  • Share This Post:
  • Share on Twitter
  • Share on Facebook
  • Share on Technorati