Parallel Data Processing

This type of scalability is currently only available for ETL graphs. Jobflow and Profiler jobs can't run in parallel.

When a transformation is processed in parallel, the whole graph (or its parts) runs in parallel on multiple cluster nodes having each node process just a part of the data.

So the more nodes we have in the cluster, the more data can be processed in the specified time.

The data may be split (partitioned) before the graph execution or by the graph itself on the fly. The resulting data may be stored in partitions or gathered and stored as one group of data.

The curve of scalability may differ according to the type of transformation. It may be almost linear, which is almost always ideal, except when there is a single data source which cannot be read by multiple readers in parallel limiting the speed of further data transformation. In such cases it is not beneficial to have parallel data processing since it would actually wait for input data.

ETL Graph Allocation

Each ETL graph executed in cluster environment is automatically subjected to transformation analysis. The main goal of this analysis is to find so called ETL graph allocation. The graph allocation is set of instructions for cluster environment how the transformation should be executed. For better understanding how the parallel data processing works, it is necessary to get deeper information about the graph analysis and resulted allocation.

First of all, analysis needs to find allocation for each individual component. The component allocation is set of cluster nodes where the component should be running. There are several ways how the component allocation can be specified, see following section of the documentation. But important information for now is, that a component can be requested to run in multiple instances - that is necessary for parallel data processing. Next step of analysis is to find optimal graph decomposition to ensure all component allocation will be satisfied and tries to minimise number of remote edges between graph instances.

Resulted analysis says how many instances (workers) of the graph needs to be executed, on which cluster nodes these instances will be running and which components will be present in the instances. In other words, one executed graph can be running in many instances, each instance can be processed on an arbitrary cluster node and moreover each instance contains only convenient components.

Component allocations example

Figure 29.1. Component allocations example


This figure shows a sample graph with a few components with various component allocations. First component FlatFileReader requests to be executed on node1, the following Reformat component should be running on cluster node2, the ParallelPartition component is a special component which makes possible to change cardinality of allocation of two interconnected components (detailed description of cluster partitioning and gathering follows this section). FlatFileWriter, the last component, requires to be executed right on three cluster nodes node1, node2 and node3. Visualisation of transformation analysis shows the following figure. Three workers (graphs) will be executed, each on a different cluster node (which is not necessary, even multiple workers can be associated with a single node). Worker on cluster node1 contains only FlatFileReader and first of three instances of FlatFileWriter component. Both components are connected by remote edges with components, which are running on node2. The worker running on node3 contains only FlatFileWriter fed by data remotely transferred from ParallelPartitioner running on node2.

Graph decomposition based on component allocations

Figure 29.2. Graph decomposition based on component allocations


Component Allocation

Allocation of a single component can be derived in several ways (list is ordered according priority):

  • Explicit definition - all components have common attribute Allocation. CloverETL Designer allows user to use convenient dialog.

    Component allocation dialog

    Figure 29.3. Component allocation dialog


    Three different approaches are available for explicit allocation definition:

    • Allocation based on number of workers - component will be executed in requested instances on some cluster nodes, which are preferred by CloverETL Cluster. Server can use build-in loadbalancing algorithm to ensure fastest data processing.

    • Allocation based on reference on a partitioned sandbox - component allocation corresponds with locations of given partitioned sandbox. Each partitioned sandbox has a list of locations, each bound to specific cluster node. Thus allocation would be equivalent to list of locations. See "Partitioned sandbox" in Partitioned and Local Sandboxes for details.

    • allocation defined by a list of cluster node identifiers (a single cluster node can be used more times)

  • Reference to a partitioned sandbox FlatFileReader, FlatFileWriter and ParallelReader components derives theirs allocation from fileURL attribute. In case the URL refers to a file in a partitioned sandbox, the component allocation is automatically derived from locations of the partitioned sandbox. So in case you manipulate with one of these components with a file in partitioned sandbox suitable allocation is used automatically.

  • Adoption from neighbour components By default, allocation is inherited from neighbour components. Components on the left side have higher priority. Cluster partitioners and cluster gathers are nature bounds for recursive allocation inheritance.

Partitioning/Gathering Data

As mentioned before, data may be partitioned and gathered in multiple ways. It may be prepared before the graph is executed or it may be partitioned on the fly.

Partitioning/gathering "on the fly"

There are six special components to consider: ParallelPartition, ParallelLoadBalancingPartition, ParallelSimpleCopy, ParallelSimpleGather, ParallelMerge and ParallelRepartition. All the components work similarly to their non-cluster variation. But their splitting or gathering nature is used to change data flow allocation, so they may be used to change distribution of the data among workers.

ParallelPartition and ParallelLoadBalancingPartition work similar to a common partitioner, they change the data allocation from 1 to N. Component preceding the ParallelPartitioner run on just one node, whereas component behind the ParallelPartitioner run in parallel according to node allocation. ParallelSimpleCopy component can be use in similar locations. This component does not distribute the data records, but copies them to all output workers.

ParallelGather and ParallelMerge work in the opposite way. They change the data allocation from N to 1. Component preceding the gather/merge run in parallel while component behind the gather run on just one node.

Partitioning/gathering data by external tools

Partitioning data on the fly may in some cases be an unnecessary bottleneck. Splitting data using low-level tools can be much better for scalability. The optimal case being, that each running worker reads data from an independent data source. Thus there does not have to be a ParallelPartitioner component and the graph runs in parallel from the beginning.

Or the whole graph may run in parallel, however the results would be partitioned.

Node Allocation Limitations

As described above, each component may have its own node allocation specified which may result in some conflicts.

  • Node allocation of neighbouring components must have the same cardinality. So it doesn't have to be the same allocation, but the cardinality must be the same. E.g. There is an ETL graph with 2 components: DataGenerator and Trash. DataGenerator allocated on NodeA sending data to Trash allocated on NodeB works fine. DataGenerator allocated on NodeA sending data to Trash allocated on NodeA and NodeB fails.

  • Node allocation behind the ParallelGather and ParallelMerge must have cardinality 1. So it may be of any allocation, but the cardinality must be just 1.

  • Node allocation of components in front of the ParallelPartition, ParallelLoadBalancingPartition and ParallelSimpleCopy must have cardinality 1.

Partitioned and Local Sandboxes

Partitioned and local sandboxes were mentioned in previous sections. These new sandbox types were introduced in version 3.0 and they are vital for parallel data processing.

Together with shared sandboxes, we have three sandbox types in total.

Shared sandbox

This type of sandbox must be used for all data which is supposed to be accessible on all cluster nodes. This includes all graphs, jobflows, metadata, connections, classes and input/output data for graphs which should support HA, as described above. All shared sandboxes reside in the directory, which must be properly shared among all cluster nodes. You can use suitable sharing/replicating tool according to the operating system and filesystem.

Dialog form for creating new shared sandbox

Figure 29.4. Dialog form for creating new shared sandbox


As you can see in the screenshot above, you can specify the root path on the filesystem and you can use placeholders or absolute path. Placeholders available are environment variables, system properties or CloverETL Server config property intended for this use sandboxes.home. Default path is set as [user.data.home]/CloverETL/sandboxes/[sandboxID] where the sandboxID is ID specified by the user. The user.data.home placeholder refers to the home directory of the user running the Java Virtual Machine process (/home subdirectory on Unix-like OS); it is determined as first writable directory selected from following values:

  • USERPROFILE environment variable on Windows OS

  • user.home system property (user home directory)

  • user.dir system property (JVM process working directory)

  • java.io.tmpdir system property (JVM process temporary directory)

Note that the path must be valid on all cluster nodes. Not just nodes currently connected to the cluster, but also on the nodes that may be connected later. Thus when the placeholders are resolved on the node, the path must exist on the node and it must be readable/writable for the JVM process.

Local sandbox

This sandbox type is intended for data, which is accessible only by certain cluster nodes. It may include massive input/output files. The purpose being, that any cluster node may access content of this type of sandbox, but only one has local (fast) access and this node must be up and running to provide data. The graph may use resources from multiple sandboxes which are physically stored on different nodes since cluster nodes are able to create network streams transparently as if the resource were a local file. See Using a Sandbox Resource as a Component Data Source for details.

Do not use local sandbox for common project data (graphs, metadata, connections, lookups, properties files, etc.). It would cause odd behaviour. Use shared sandboxes instead.

Dialog form for creating a new local sandbox

Figure 29.5. Dialog form for creating a new local sandbox


Sandbox location path is pre-filled with sandboxes.home.local placeholder which by default points to the [user.data.home]/CloverETL/sandboxes-local. The placeholder can be configured as any other CloverETL configuration property.

Partitioned sandbox

This type of sandbox is actually an abstract wrapper for a couple of physical locations existing typically on different cluster nodes. However, there may be multiple locations on the same node. A partitioned sandbox has two purposes which are both closely related to parallel data processing.

  1. node allocation specification - locations of a partitioned sandbox define the workers which will run the graph or its parts. So each physical location will cause a single worker to run. This worker does not have to actually store any data to "its" location. It is just a way to tell the CloverETL Server: "execute this part of ETL graph in parallel on these nodes"

  2. storage for part of the data during parallel data processing. Each physical location contains only part of the data. In a typical use, we have input data split in more input files, so we put each file into a different location and each worker processes its own file.

Dialog form for creating new local sandbox

Figure 29.6. Dialog form for creating new local sandbox


As you can see on the screenshot above, for a partitioned sandbox, you can specify one or more physical locations on different cluster nodes.

Sandbox location path is pre-filled with sandboxes.home.partitioned placeholder which by default points to the [user.data.home]/CloverETL/sandboxes-paritioned. Anyway the sandboxes.home.partitioned config property may be configured as any other CloverETL Server configuration property. Note that directory must be readable/writable for the user running JVM process.

Do not use partitioned sandbox for common project data (graphs, metadata, connections, lookups, properties files, etc.). It would cause odd behavior. Use shared sandboxes instead.

Using a Sandbox Resource as a Component Data Source

A sandbox resource, whether it is a shared, local or partitioned sandbox (or ordinary sandbox on standalone server), is specified in the graph under the fileURL attributes as a so called sandbox URL like this:

sandbox://data/path/to/file/file.dat

where "data" is a code for the sandbox and "path/to/file/file.dat" is the path to the resource from the sandbox root. URL is evaluated by CloverETL Server during job execution and a component (reader or writer) obtains the opened stream from the server. This may be a stream to a local file or to some other remote resource. Thus, a job does not have to run on the node which has local access to the resource. There may be more sandbox resources used in the job and each of them may be on a different node.

The sandbox URL has a specific use for parallel data processing. When the sandbox URL with the resource in a partitioned sandbox is used, that part of the graph/phase runs in parallel, according to the node allocation specified by the list of partitioned sandbox locations. Thus, each worker has it is own local sandbox resource. CloverETL Server evaluates the sandbox URL on each worker and provides an open stream to a local resource to the component.

The sandbox URL may be used on standalone server as well. It is excellent choice when graph references some resources from different sandboxes. It may be metadata, lookup definition or input/output data. Of course, referenced sandbox must be accessible for the user who executes the graph.