BeanFlow is a lightweight Java library for building workflows using beans to orchestrate events. You can think of BeanFlow as a simple alternative to BPEL where the workflows are all specified and implemented using Java code rather than declarative XML.

Quick Links

Motivation

When building highly concurrent or distributed applications it is very common for there to be many events happening; often asynchronously and in different threads and its very common to need to perform kinds of workflow or orchestration across these events.

Now you certainly can use things like BPEL to solve these kinds of problems. However often this is a bit heavy weight & complex and you just want to have a bean based workflow using regular Java code to represent the activities involved in the workflow.

Transforming the traditional workflow approach

One of the main goals of BeanFlow is to reuse what the Java platform is good for in the workflow space; then supplement it with missing abstractions rather than inventing the wheel in a sub-optimal way.

Traditional approach BeanFlow approach
use a declarative workflow language to specify a workflow write a Java class
use XML markup or pictures to define loops and flows use regular Java code (if/for/while)
implement a generic state and persistence framework for workflow instances use regular Java fields in your workflow class, then use JDBC/DAO/JPA to deal with persistence

Use cases

When working with concurrent or distributed applications there are many use cases for needing to orchestrate among multiple concurrent events. Here are a few examples from our use cases implementing ServiceMix and ActiveMQ

  • starting dependent services; where a parent component is dependent on the child components starting; where the start process may be asynchronous in different threads. e.g. there may be a recovery process on startup which you need to wait for.
  • implementing master-slave type protocols where you need to monitor the state of the master and slave to make decisions on what to do; together with dealing with transitions from Started to Recovering to Running then maybe to FailingOver etc.
  • implementing message orchestration. You may want to implement some simple orchestrations, waiting for either a response to arrive or a timeout to fire etc

Code overview

First we'll give an overview of the two main interfaces in BeanFlow then we'll progress to concepts like timeouts and composing activities.

State class

We have a State<T> interface which is identical to the AtomicReference<T> interface included in java.util.concurrent in Java 5 apart from that it allows you to register listeners to the state. This makes it easy to respond to the changes in state.

One added detail is the that State<T> allows you to specify the Notifier implementation used to perform the notifications (which always occur outside of the semaphore to allow things to be re-entrant).

The default notifier used is the SynchronousNotifier but its easy to switch to an AsynchronousNotifier by just passing it into the constructor of the State<T> object.

Activity class

An activity is just a POJO which implements the Activity interface; it has a simple lifecycle, you can start them and stop them and ask their status.

Once a flow has been started via the start() method it can take an arbitrarily long time to complete; you can see if the flow has completed via the isStopped() method. You can explicitly complete a flow using the stop() method whenever you like; typically when you are responding to state changes.

An activity could fail for some reason (such as it timed out or some error occurred) so there is a method isFailed() which can be used to easily inspect the activity. Rather like the stop() method, you can call fail(reason) if you wish to terminate an activity with the activity marked as the Failed rather than Stopped state.

Using timeouts

Its very common to add timeouts onto activities so that if the activity is not complete by a certain time then the activity is stopped & failed; often another parent activity may then do something differently if one of its child activities fails.

There is a useful base activity called TimeoutActivity which you can derive from to make your own activity which can be started/stopped/failed and which can be timed out.

You can explicitly register the activity with a timer and call the onTimedOut() method yourself or just call the startWithTimeout() method to start the activity registering the timeout.

Composing activities

One of the main reasons for using an object orientated language is to make composition and reuse possible; similarly BeanFlow allows you to compose activities together to make modular and reusable workflow constructs easily. So BeanFlow attempts to create a collection of reusable activities which you can then use to derive from or aggregate to make whatever activities you need.

Composition Activities Description
JoinAll Performs a join on all the given child activities. So the activity waits until all the child activities have completed, then it completes itself. You can add additional constraints to the activity using derivation. The default is to wait for all the child activities to complete; though you can enable fast-fail mode so that the activity fails as soon as a child activity fails
JoinQuorum This activity is useful for implementing clustering style activities where you want a quorum of activities to complete. e.g. if you have 5 child activities you want to wait for at least 3 activities to complete succesfully before continuing
AsynchronousActivity Executes any Runnable or Callable on an Executor and complete the activity when it has completed so that it can be used in joins
ParallelActivity Allows a collection of parallel Runnable or Callable objects to be performed on an Executor then performing some kind of join on them completing

Here is an example of performing joins using collections of child workflows

// lets create a join on a number of child flows completing
JoinAll flow = new JoinAll(child1, child2, child3);
flow.startWithTimeout(timer, timeout);

// now lets test the flow
child1.stop();
assertStarted(flow);

child2.stop();
assertStarted(flow);

child3.stop();
assertStopped(flow);

Join conditions

In workflows you often want to fork parallel steps and then join on certain events. You often have complex logic to decide on what the join condition is. e.g. in pseudo code

fork steps A, B, C
join on A and (B or C)

In BeanFlow we implement join conditions using regular Java code. The basic idea is that a Activity will listen to changes in a number of different State objects such as fields a, b and c. You can then write an activity bean to perform the join condition you need. When the condition is met you can then perform whatever logic you wish such as

  • stopping the activity
  • forking off a new child activity
  • changing your state
  • changing the state of some other bean
  • calling arbitrary Java code

Here is an example activity implemented in Java code for the above code.

// lets create some child workflows
final Activity a = new TimeoutActivity();
final Activity b = new TimeoutActivity();
final Activity c = new TimeoutActivity();

// lets create the activity with a custom join condition
Activity activity = new JoinSupport(a, b, c) {
    @Override
    protected void onChildStateChange(int childCount, int stoppedCount, int failedCount) {

        if (a.isStopped() && (b.isStopped() || c.isStopped())) {
            // lets stop the activity we're done
            stop();
        }
    }
};

// lets start the activities
activity.startWithTimeout(timer, timeout);

// now lets test things behave properly
assertStarted(activity);

a.stop();
assertStarted(activity);

b.stop();
assertStopped(activity);

Building workflows

So we've been over the basics of activities and showed how we can perform joins along with compose them. Remember they are just Java bean so you can implement them however you wish.

However often folks think of workflows as a number of discrete steps; using a kind of state machine to move from step to step. One real simple approach to
writing workflows in this model is to use a specific method for each workflow step.

So we have a base class called Workflow which adds a new State object for the current step (which allows actvities to listen to the step changing) together with helper methods for moving to different steps, for suspending and so forth.

Moving steps

Inside a step you can call any Java methods on any objects. However to move from step to step in a declarative fashion you can call the setNextStep() method.
Another option is to use a method which returns a String or enum value; a non-null value is then interpreted as being the next step that is navigated to next.

Note that the movement to the next step always takes place outside of the step method to avoid deeply recursive methods.

Suspending workflows

Often in a workflow you have to wait for some external event such as

  • a user enters some text
  • a message is received

Once that happens you re-awaken the workflow and commence execution at some step. To implement this just call the setNextStep() method to cause the workflow to resume at the given step

Simple Example

There now follows a simple workflow example...

public class SimpleWorkflow extends Workflow<SimpleWorkflow.Step> {

    public static enum Step {
        stepA, stepB, stepC, stop
    };

    public SimpleWorkflow() {
        super(Step.stepA);
    }

    // Workflow steps
    // -------------------------------------------------------------------------
    public Step stepA() {
        return Step.stepB;
    }
    
    public Step stepB() {
        return Step.stepC;
    }
    
    public Step stepC() {
        return Step.stop;
    }
}

As you can see the step is defined by an enumeration Step and we have a regular Java method to implement each step. The step method can then return the step to go to next after the current method terminates. If you return null or use a void method then the workflow suspends.

Complete Example

The following example is a fairly large one, but it shows a variety of different concepts

  • suspending workflows (such as when they wait for external events such as user input or an asynchronous message to arrive)
  • looping & using predicates
  • forking and joining child activities
  • declaratively moving from one step to another
public class ExampleWorkflow extends Workflow<ExampleWorkflow.Step> {
    private static final Log log = LogFactory.getLog(ExampleWorkflow.class);

    private int loopCount;
    private long timeout = 500;
    private String userEmailAddress;

    public static enum Step {
        startStep, afterEnteredEmailStep, loopStep, waitForUserInputStep, forkStep, 
        aCompletedStep, abcCompletedStep, stop
    };

    public ExampleWorkflow() {
        super(Step.startStep);
    }

    // Workflow steps
    // -------------------------------------------------------------------------

    public void startStep() {
        // lets use an explicit goTo() to tell the workflow
        // which step to go to next; though we can just return Strings
        addStep(Step.loopStep);
    }

    // lets use the return value to specify the next step
    public Step loopStep() {
        if (++loopCount > 3) {
            return Step.waitForUserInputStep;
        }
        // lets keep looping
        return Step.loopStep;
    }

    public void waitForUserInputStep() {
        // we are going to park here until a user
        // enters a valid email address
        // so lets park the workflow engine
    }

    public Step afterEnteredEmailStep() {
        // we are going to park here until a user
        // enters a valid email address
        log.info("User entered email address: " + userEmailAddress);
        return Step.forkStep;
    }

    public void forkStep() {
        // lets fork some child flows
        TimeoutActivity a = new TimeoutActivity();
        TimeoutActivity b = new TimeoutActivity();
        TimeoutActivity c = new TimeoutActivity();

        log.info("Forking off processes a, b, c");
        fork(a, b, c);

        // now lets add some joins
        joinAll(Step.aCompletedStep, timeout, a);
        joinAll(Step.abcCompletedStep, timeout, a, b, c);
    }

    public void aCompletedStep() {
        log.info("child flow A completed!");
    }

    public Step abcCompletedStep() {
        log.info("child flows A, B and C completed!");

        // we are completely done now
        return Step.stop;
    }

    // External events
    // -------------------------------------------------------------------------
    public void userEntered(String emailAddress) {
        if (emailAddress != null && emailAddress.indexOf("@") > 0) {
            this.userEmailAddress = emailAddress;

            log.info("Lets re-start the suspended workflow");
            addStep(Step.afterEnteredEmailStep);
        }
    }
}

In the above example, each workflow step is implemented as a method. If the method returns a String it is interpreted as the next step which will be executed. Returning null (or using a void return type) will suspend the workflow.

You can also explicitly call suspend() or stop() yourself.

The setNextStep(step) will instruct the workflow engine to move to the given step.

Validating Workflows

To avoid typos between the workflow step enum and the methods in the workflow class, on startup we check that there are suitable methods available for the enum. Many thanks to Sam for this idea! e.g. the following bad workflow is automatically caught and a useful error is generated.

public class BadWorkflow extends Workflow<BadWorkflow.Step> {
    
    public static enum Step {
        startStep, fooStep, doesNotExistStep
    };

    public BadWorkflow() {
        super(Step.startStep);
    }

    // Workflow steps
    // -------------------------------------------------------------------------
    public Step startStep() {
        return Step.fooStep;
    }
    
    public Step fooStep() {
        return Step.doesNotExistStep;
    }
}