Chapter 7. Scaling and Parallel Processing

Many batch processing problems can be solved with single threaded, single process jobs, so it is always a good idea to properly check if that meets your needs before thinking about more complex implementations. Measure the performance of a realistic job and see if the simplest implementation meets your needs first: you can read and write a file of several hundred megabytes in well under a minute, even with bog standard hardware.

When you are ready to start implementing a job with some parallel processing, Spring Batch offers a range of options, which are described in this chapter, although some features are covered elsewhere. At a high level there are two modes of parallel processing: single process, multi-threaded; and multi-process. These break down into categories as well, as follows:

Next we review the single-process options first, and then the multi-process options.

7.1. Multi-threaded Step

The simplest way to start parallel processing is to add a TaskExecutor to your Step configuration, e.g. as an attribute of the tasklet:

<step id="loading">
    <tasklet reader="stagingReader" 
             processor="stagingProcessor" 
             writer="tradeWriter" 
             commit-interval="1" 
             task-executor="taskExecutor"/>
</step>

In this example the taskExecutor is a reference to another bean definition, implementing the TaskExecutor interface. TaskExecutor is a standard Spring interface, so consult the Spring User Guide for details of available implementations. The simplest multi-threaded TaskExecutor is a SimpleAsyncTaskExecutor.

The result of the above configuration will be that the Step executes by reading, processing and writing each chunk of items (each commit interval) in a separate thread of execution.

There are some practical limitations of using multi-threaded Steps for some common Batch use cases. Many participants in a Step (e.g. readers and writers) are stateful, and if the state is not segregated by thread, then those components are not usable in a multi-threaded Step. In particular most of the off-the-shelf readers and writers from Spring Batch are not designed for multi-threaded use. It is, however, possible to work with stateless or thread safe readers and writers, and there is a sample (parallelJob) in the Spring Batch Samples that show the use of a process indicator (see Section 6.12, “Preventing State Persistence”) to keep track of items that have been processed in a database input table.

7.2. Parallel Steps

As long as the application logic that needs to be parallelized can be split into distinct responsibilities, and assigned to individual steps then it can be parallelized in a single process. Parallel Step execution is easy to configure and use, for example, to execute steps (step1,step2) in parallel with step3, you could configure a flow like this:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

The configurable "task-executor" attribute is used to specify which TaskExecutor implementation should be used to execute the individual flows. The default is SyncTaskExecutor, but an asynchronous TaskExecutor is required to run the steps in parallel. Note that the job will ensure that every flow in the split completes before aggregating the exit statuses and transitioning.

See the section on Section 5.3.5, “Split Flows” for more detail.

7.3. Remote Chunking

In Remote Chunking the Step processing is split across multiple processes, communicating with each other through some middleware. Here is a picture of the pattern in action:

The Master component is a single process, and the Slaves are multiple remote processes. Clearly this pattern works best if the Master is not a bottleneck, so the processing must be more expensive than the reading of items (this is often the case in practice).

The Master is just an implementation of a Spring Batch Step, with the ItemWriter replaced with a generic version that knows how to send chunks of items to the middleware as messages. The Slaves are standard listeners for whatever middleware is being used (e.g. with JMS they would be MesssageListeners), and their role is to process the chunks of items using a standard ItemWriter or ItemProcessor plus ItemWriter, through the ChunkProcessor interface. One of the advantages of using this pattern is that the reader, processor and writer components are off-the-shelf (the same as would be used for a local execution of the step). The items are divided up dynamically and work is shared through the middleware, so if the listeners are all eager consumers, then load balancing is automatic.

The middleware has to be durable, with guaranteed delivery and single consumer for each message. JMS is the obvious candidate, but other options exist in the grid computing and shared memory product space (e.g. Java Spaces).

Spring Batch has a sub-project (Spring Batch Integration), providing implementations of various patterns like this one using Spring Integration. Spring Batch Integration is available in subversion for people to use, but is not intended to be part of the official general release of Spring Batch until it builds up more of a community of users.

7.4. Partitioning

Spring Batch also provides an SPI for partitioning a Step execution and executing it remotely. In this case the remote participants are simply Step instances that could just as easily have been configured and used for local processing. Here is a picture of the pattern in action:

The Job is executing on the left hand side as a sequence of Steps, and one of the Steps is labelled as a Master. The Slaves in this picture are all identical instances of a Step, which could in fact take the place of the Master resulting in the same outcome for the Job. The Slaves are typically going to be remote services, but could also be local threads of execution. The messages sent by the Master to the Slaves in this pattern do not need to be durable, or have guaranteed delivery: Spring Batch meta-data in the JobRepository will ensure that each Slave is executed once and only once for each Job execution.

The SPI in Spring Batch consists of a special implementation of Step (the PartitionStep), and two strategy interfaces that need to be implemented for the specific environment. The strategy interfaces are PartitionHandler and StepExecutionSplitter, and their role is show in the sequence diagram below:

The Step on the right in this case is the "remote" Slave, so potentially there are many objects and or processes playing this role, and the PartitionStep is shown driving the execution. The PartitionStep configuration looks like this:

<bean name="step1:master" class="org.sfw...PartitionStep">
    <property name="partitionHandler" ref="partitionHandler"/>
    <property name="stepExecutionSplitter" ref="stepExecutionSplitter"/>
    <property name="jobRepository" ref="jobRepository" />
</bean>

There is a simple example which can be copied and extended in the unit test suite for Spring Batch Core (see org.springframework.batch.core.partition package).

7.4.1. PartitionHandler

The PartitionHandler is the component that knows about the fabric of the remoting or grid environment. It is able to send StepExecution requests to the remote Steps, wrapped in some fabric-specific format, like a DTO. It does not have to know how to split up the input data, or how to aggregate the result of multiple Step executions. Generally speaking it probably also doesn't need to know about resilience or failover, since those are features of the fabric in many cases, and anyway Spring Batch always provides restartability independent of the fabric: a failed Job can always be restarted and only the failed Steps will be re-executed.

The PartitionHandler interface can have specialized implementations for a variety of fabric types: e.g. simple RMI remoting, EJB remoting, custom web service, JMS, Java Spaces, shared memory grids (like Terracotta or Coherence), grid execution fabrics (like GridGain). Spring Batch does not contain implementations for any proprietary grid or remoting fabrics.

Spring Batch does however provide a useful implementation of PartitionHandler that executes Steps locally in separate threads of execution, using the TaskExecutor strategy from Spring. The implementation is called TaskExecutorPartitionHandler, and it can be configured like this:

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

The gridSize determines the number of separate step executions to create, so it can be matched to the size of the thread pool in the TaskExecutor, or else it can be set to be larger than the number of threads available, in which case the blocks of work are smaller.

The TaskExecutorPartitionHandler is quite useful for IO intensive Steps, like copying large numbers of files or replicating filesystems into content management systems.

7.4.2. StepExecutionSplitter

The StepExecutionSplitter is responsible for splitting up a StepExecution into blocks of work, and providing input parameters for the remote Slaves in the form of an ExecutionContext for each one. The principal method for this in the interface is

public interface StepExecutionSplitter {
    ...
    Set<StepExecution> split(StepExecution stepExecution, int gridSize) 
          throws JobExecutionException;
}

So an execution instance for the Master step is passed in, along with a hint about the grid size, and the splitter has to create a set of partitioned StepExecution instances, each with a different ExecutionContext.

A convenient generic implementation of StepExecutionSplitter is provided by Spring Batch, which handles concerns like interpreting the grid size and handling restart. It is recommended that you use this implementation (the SimpleStepExecutionSplitter) and inject specific knowledge of the input data through its Partitioner property. The Partitioner has a simpler responsibility: to generate execution contexts as input parameters for new step executions only (no need to worry about restarts). It has a single method:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

The return value from this method associates a unique name for each step execution (the String), with input parameters in the form of an ExecutionContext. The names show up later in the Batch meta data as the step name in the partitioned StepExecutions. The ExecutionContext is just a bag of name-value pairs, so it might contain a range of primary keys, or line numbers, or the location of an input file. The remote Step then normally binds to the context input using #{...} placeholders (late binding in step scope), as illustrated in the next section.

The names of the step executions (the keys in the Map returned by Partitioner) need to be unique amongst the step executions of a Job, but do not have any other specific requirements. The easiest way to do this, and to make the names meaningful for users, is to use a prefix+suffix naming convention, where the prefix is the name of the step that is being executed (which itself is unique in the Job), and the suffix is just a counter. There is a SimplePartitioner in the framework that uses this convention.

7.4.3. Binding Input Data to Steps

It is very efficient for the steps that are executed by the PartitionHandler to have identical configuration, and for their input parameters to be bound at runtime from the ExecutionContext. This is easy to do with the StepScope feature of Spring Batch (covered in more detail in the section on Late Binding). For example if the Partitioner creates ExecutionContext instances with an attribute key fileName, pointing to a different file (or directlory) for each step invocation, the Partitioner output might look like this:

Table 7.1. Example step execution name to execution context provided by Partitioner targeting directory processing

Step Execution Name (key)ExecutionContext (value)
filecopy:partition0fileName=/home/data/one
filecopy:partition1fileName=/home/data/two
filecopy:partition2fileName=/home/data/three

Then the file name can be bound to a step using late binding to the execution context:

<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resource" value="#{stepExecutionContext[fileName]}/*"/>
</bean>