public abstract class AbstractParallelNodeModel extends NodeModel
NodeModel
that offers parallel processing of DataTable
s. Therefore the
executeByChunk( BufferedDataTable, BufferedDataTable[],
RowAppender[], ExecutionMonitor)
method must be overriden. This method is
called with a DataTable
containing only a part of the input rows as
often as necessary. A default value for the maximal chunk size (i.e. the
number of rows in the chunked data table) is given in the constructor.executeByChunk(
BufferedDataTable, BufferedDataTable[], RowAppender[], ExecutionMonitor)
completely.Modifier and Type | Field and Description |
---|---|
protected ThreadPool |
m_workers
The execution service that is used.
|
Constructor and Description |
---|
AbstractParallelNodeModel(int nrDataIns,
int nrDataOuts,
int chunkSize,
ThreadPool workers)
Creates a new AbstractParallelNodeModel.
|
Modifier and Type | Method and Description |
---|---|
protected BufferedDataTable[] |
execute(BufferedDataTable[] data,
ExecutionContext exec)
This function is invoked by the
Node#executeNode() method of the node (through the
#executeModel(BufferedDataTable[],ExecutionMonitor) method) only after all predecessor nodes have
been successfully executed and all data is therefore available at the input ports. |
protected abstract void |
executeByChunk(BufferedDataTable inDataChunk,
BufferedDataTable[] additionalData,
RowAppender[] outputTables,
ExecutionMonitor exec)
This method is called as often as necessary by multiple threads.
|
int |
getChunkSize()
Returns the current chunk size.
|
protected abstract DataTableSpec[] |
prepareExecute(DataTable[] data)
This method is called before the first chunked is processed.
|
void |
setChunkSize(int newValue)
Sets the chunk size of the split data tables.
|
addWarningListener, computeFinalOutputSpecs, configure, configure, continueLoop, createInitialStreamableOperatorInternals, createMergeOperator, createStreamableOperator, execute, finishStreamableExecution, getAvailableFlowVariables, getAvailableInputFlowVariables, getCredentialsProvider, getInHiLiteHandler, getInPortType, getInputPortRoles, getInteractiveNodeView, getLogger, getLoopEndNode, getLoopStartNode, getNrInPorts, getNrOutPorts, getOutHiLiteHandler, getOutPortType, getOutputPortRoles, getWarningMessage, iterate, loadInternals, loadValidatedSettingsFrom, notifyViews, notifyWarningListeners, onDispose, peekFlowVariableDouble, peekFlowVariableInt, peekFlowVariableString, pushFlowVariableDouble, pushFlowVariableInt, pushFlowVariableString, removeWarningListener, reset, resetAndConfigureLoopBody, saveInternals, saveSettingsTo, setInHiLiteHandler, setWarningMessage, stateChanged, validateSettings
protected final ThreadPool m_workers
public AbstractParallelNodeModel(int nrDataIns, int nrDataOuts, int chunkSize, ThreadPool workers)
nrDataIns
- The number of DataTable
elements expected as
inputs.nrDataOuts
- The number of DataTable
objects expected at the
output.chunkSize
- the default number of rows in the DataTables that are
passed to executeByChunk( BufferedDataTable,
BufferedDataTable[], RowAppender[], ExecutionMonitor)
workers
- a thread pool where threads for processing the chunks are
taken fromprotected abstract DataTableSpec[] prepareExecute(DataTable[] data) throws Exception
RowAppender
passed to executeByChunk(
BufferedDataTable, BufferedDataTable[], RowAppender[], ExecutionMonitor)
must be constructed accordingly.data
- the input data tablesException
- if something goes wrong during preparationprotected final BufferedDataTable[] execute(BufferedDataTable[] data, ExecutionContext exec) throws Exception
Node#executeNode()
method of the node (through the
#executeModel(BufferedDataTable[],ExecutionMonitor)
method) only after all predecessor nodes have
been successfully executed and all data is therefore available at the input ports. Implement this function with
your task in the derived model.
The input data is available in the given array argument inData
and is ensured to be neither
null
nor contain null
elements (with few non-standard exception, which are described in
more detail in NodeModel.execute(PortObject[], ExecutionContext)
).
In order to create output data, you need to create objects of class BufferedDataTable
. Use the
execution context argument to create BufferedDataTable
.
execute
in class NodeModel
data
- An array holding DataTable
elements, one for each input.exec
- The execution monitor for this execute method. It provides us with means to create new
BufferedDataTable
. Additionally, it should be asked frequently if the execution should be
interrupted and throws an exception then. This exception might me caught, and then after closing all
data streams, been thrown again. Also, if you can tell the progress of your task, just set it in this
monitor.null
DataTable elements with the size of the number of outputs. The result
of this execution.Exception
- If you must fail the execution. Try to provide a meaningful error message in the exception as
it will be displayed to the user.Please be advised to check frequently the canceled
status by invoking ExecutionMonitor#checkCanceled
which will throw an
CanceledExcecutionException
and abort the execution.protected abstract void executeByChunk(BufferedDataTable inDataChunk, BufferedDataTable[] additionalData, RowAppender[] outputTables, ExecutionMonitor exec) throws Exception
inData
-table will contain at most
maxChunkSize
rows from the the first table in the array
passed to execute(BufferedDataTable[], ExecutionContext)
, the
additionalData
-tables are passed completely.inDataChunk
- the chunked input data tableadditionalData
- the complete tables of additional dataoutputTables
- data containers for the output tables where the
computed rows must be addedexec
- an execution monitor which is actually a subprogress monitorException
- if an exception occurspublic void setChunkSize(int newValue)
newValue
- the new value which is number of rowspublic int getChunkSize()
KNIME GmbH, Konstanz, Germany
You may not modify, publish, transmit, transfer or sell, reproduce, create derivative works from, distribute, perform, display, or in any way exploit any of the content, in whole or in part, except as otherwise expressly permitted in writing by the copyright owner or as specified in the license file distributed with this product.