BeanFlow is a lightweight Java library for building workflows using beans to orchestrate events. You can think of BeanFlow as a simple alternative to BPEL where the workflows are all specified and implemented using Java code rather than declarative XML.
Quick Links
Motivation
When building highly concurrent or distributed applications it is very common for there to be many events happening; often asynchronously and in different threads and its very common to need to perform kinds of workflow or orchestration across these events.
Now you certainly can use things like BPEL to solve these kinds of problems. However often this is a bit heavy weight & complex and you just want to have a bean based workflow using regular Java code to represent the activities involved in the workflow.
Transforming the traditional workflow approach
One of the main goals of BeanFlow is to reuse what the Java platform is good for in the workflow space; then supplement it with missing abstractions rather than inventing the wheel in a sub-optimal way.
Traditional approach |
BeanFlow approach |
use a declarative workflow language to specify a workflow |
write a Java class |
use XML markup or pictures to define loops and flows |
use regular Java code (if/for/while) |
implement a generic state and persistence framework for workflow instances |
use regular Java fields in your workflow class, then use JDBC/DAO/JPA to deal with persistence |
Use cases
When working with concurrent or distributed applications there are many use cases for needing to orchestrate among multiple concurrent events. Here are a few examples from our use cases implementing ServiceMix and ActiveMQ
- starting dependent services; where a parent component is dependent on the child components starting; where the start process may be asynchronous in different threads. e.g. there may be a recovery process on startup which you need to wait for.
- implementing master-slave type protocols where you need to monitor the state of the master and slave to make decisions on what to do; together with dealing with transitions from Started to Recovering to Running then maybe to FailingOver etc.
- implementing message orchestration. You may want to implement some simple orchestrations, waiting for either a response to arrive or a timeout to fire etc
Code overview
First we'll give an overview of the two main interfaces in BeanFlow then we'll progress to concepts like timeouts and composing activities.
State class
We have a State<T> interface which is identical to the AtomicReference<T> interface included in java.util.concurrent in Java 5 apart from that it allows you to register listeners to the state. This makes it easy to respond to the changes in state.
One added detail is the that State<T> allows you to specify the Notifier implementation used to perform the notifications (which always occur outside of the semaphore to allow things to be re-entrant).
The default notifier used is the SynchronousNotifier but its easy to switch to an AsynchronousNotifier by just passing it into the constructor of the State<T> object.
Activity class
An activity is just a POJO which implements the Activity interface; it has a simple lifecycle, you can start them and stop them and ask their status.
Once a flow has been started via the start() method it can take an arbitrarily long time to complete; you can see if the flow has completed via the isStopped() method. You can explicitly complete a flow using the stop() method whenever you like; typically when you are responding to state changes.
An activity could fail for some reason (such as it timed out or some error occurred) so there is a method isFailed() which can be used to easily inspect the activity. Rather like the stop() method, you can call fail(reason) if you wish to terminate an activity with the activity marked as the Failed rather than Stopped state.
Using timeouts
Its very common to add timeouts onto activities so that if the activity is not complete by a certain time then the activity is stopped & failed; often another parent activity may then do something differently if one of its child activities fails.
There is a useful base activity called TimeoutActivity which you can derive from to make your own activity which can be started/stopped/failed and which can be timed out.
You can explicitly register the activity with a timer and call the onTimedOut() method yourself or just call the startWithTimeout() method to start the activity registering the timeout.
Composing activities
One of the main reasons for using an object orientated language is to make composition and reuse possible; similarly BeanFlow allows you to compose activities together to make modular and reusable workflow constructs easily. So BeanFlow attempts to create a collection of reusable activities which you can then use to derive from or aggregate to make whatever activities you need.
Composition Activities |
Description |
JoinAll |
Performs a join on all the given child activities. So the activity waits until all the child activities have completed, then it completes itself. You can add additional constraints to the activity using derivation. The default is to wait for all the child activities to complete; though you can enable fast-fail mode so that the activity fails as soon as a child activity fails |
JoinQuorum |
This activity is useful for implementing clustering style activities where you want a quorum of activities to complete. e.g. if you have 5 child activities you want to wait for at least 3 activities to complete succesfully before continuing |
AsynchronousActivity |
Executes any Runnable or Callable on an Executor and complete the activity when it has completed so that it can be used in joins |
ParallelActivity |
Allows a collection of parallel Runnable or Callable objects to be performed on an Executor then performing some kind of join on them completing |
Here is an example of performing joins using collections of child workflows
Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20
Join conditions
In workflows you often want to fork parallel steps and then join on certain events. You often have complex logic to decide on what the join condition is. e.g. in pseudo code
fork steps A, B, C
join on A and (B or C)
In BeanFlow we implement join conditions using regular Java code. The basic idea is that a Activity will listen to changes in a number of different State objects such as fields a, b and c. You can then write an activity bean to perform the join condition you need. When the condition is met you can then perform whatever logic you wish such as
- stopping the activity
- forking off a new child activity
- changing your state
- changing the state of some other bean
- calling arbitrary Java code
Here is an example activity implemented in Java code for the above code.
Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20
Building workflows
So we've been over the basics of activities and showed how we can perform joins along with compose them. Remember they are just Java bean so you can implement them however you wish.
However often folks think of workflows as a number of discrete steps; using a kind of state machine to move from step to step. One real simple approach to
writing workflows in this model is to use a specific method for each workflow step.
So we have a base class called Workflow which adds a new State object for the current step (which allows actvities to listen to the step changing) together with helper methods for moving to different steps, for suspending and so forth.
Moving steps
Inside a step you can call any Java methods on any objects. However to move from step to step in a declarative fashion you can call the setNextStep() method.
Another option is to use a method which returns a String or enum value; a non-null value is then interpreted as being the next step that is navigated to next.
Note that the movement to the next step always takes place outside of the step method to avoid deeply recursive methods.
Suspending workflows
Often in a workflow you have to wait for some external event such as
- a user enters some text
- a message is received
Once that happens you re-awaken the workflow and commence execution at some step. To implement this just call the setNextStep() method to cause the workflow to resume at the given step
Simple Example
There now follows a simple workflow example...
Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20
As you can see the step is defined by an enumeration Step and we have a regular Java method to implement each step. The step method can then return the step to go to next after the current method terminates. If you return null or use a void method then the workflow suspends.
Complete Example
The following example is a fairly large one, but it shows a variety of different concepts
- suspending workflows (such as when they wait for external events such as user input or an asynchronous message to arrive)
- looping & using predicates
- forking and joining child activities
- declaratively moving from one step to another
Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20
In the above example, each workflow step is implemented as a method. If the method returns a String it is interpreted as the next step which will be executed. Returning null (or using a void return type) will suspend the workflow.
You can also explicitly call suspend() or stop() yourself.
The setNextStep(step) will instruct the workflow engine to move to the given step.
Validating Workflows
To avoid typos between the workflow step enum and the methods in the workflow class, on startup we check that there are suitable methods available for the enum. Many thanks to Sam for this idea! e.g. the following bad workflow is automatically caught and a useful error is generated.
Error formatting macro: snippet: java.lang.IndexOutOfBoundsException: Index: 20, Size: 20