esper.codehaus.org and espertech.comDocumentation
Data flows in Esper EPL have the following purposes:
Support for data flow programming and flow-based programming.
Declarative and runtime manageable integration of Esper input and output adapters that may be provided by EsperIO or by an application.
Remove the need to use an event bus achieving dataflow-only visibility of events and event types for performance gains.
Data flow operators communicate via streams of either underlying event objects or wrapped events. Underlying event objects are POJO, Map, Object-array or DOM/XML. Wrapped events are represented by EventBean
instances that associate type information to underlying event objects.
For more information on data flow programming or flow-based programming please consult the Wikipedia FBP Article.
Esper offers a number of useful built-in operators that can be combined in a graph to program a data flow. In addition EsperIO offers prebuilt operators that act as sources or sinks of events. An application can easily create and use its own data flow operators.
Using data flows an application can provide events to the data flow operators directly without using an engine's event bus. Not using an event bus (as represented by EPRuntime.sendEvent
) can achieve performance gains as the engine does not need to match events to statements and the engine does not need to wrap underlying event objects in EventBean
instances.
Data flows also allow for finer-grained control over threading, synchronous and asynchronous operation.
Data flows are new in release 4.6 and may be subject to evolutionary change.
create dataflow HelloWorldDataFlow BeaconSource -> helloworld.stream { text: 'hello world' , iterations: 1} LogSink(helloworld.stream) {}
The next program code snippet declares the data flow to the engine:
String epl = "create dataflow HelloWorldDataFlow\n" + "BeaconSource -> helloworldStream { text: 'hello world' , iterations: 1}\n" + "LogSink(helloworldStream) {}"; epService.getEPAdministrator().createEPL(epl);
The following program code snippet instantiates the data flow:
EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime().instantiate("HelloWorldDataFlow");
A data flow instance is represented by an EPDataFlowInstance
object.
The next code snippet executes the data flow instance:
instance.run();
The synopsis for declaring a data flow is:
create dataflow name [schema_declarations] [operator_declarations]
Schema declarations define an event type. Specify any number of create schema
clauses as part of the data flow declaration followed by a comma character to end each schema declaration. The syntax for create schema
is described in Section 5.16, “Declaring an Event Type: Create Schema”.
All event types that are defined as part of a data flow are private to the data flow and not available to other EPL statements. To define event types that are available across data flows and other EPL statements, use a create schema
EPL statement, runtime or static configuration.
Annotations as well as expression declarations and scripts can also be pre-pended to the data flow declaration.
For each operator, declare the operator name, input streams, output streams and operator parameters.
The syntax for declaring a data flow operator is:
operator_name [(input_streams)] [-> output_streams] { [parameter_name : parameter_value_expr] [, ...] }
The operator name is an identifier that identifies an operator.
If the operator accepts input streams then those may be listed in parenthesis after the operator name, see Section 13.2.2.2, “Declaring Input Streams”.
If the operator can produce output streams then specify ->
followed by a list of output stream names and types. See Section 13.2.2.3, “Declaring Output Streams”.
Following the input and output stream declaration provide curly brackets ({}
) containing operator parameters. See Section 13.2.2.4, “Declaring Operator Parameters”.
An operator that receives no input streams, produces no output streams and has no parameters assigned to it is shown in this EPL example data flow:
create dataflow MyDataFlow MyOperatorSimple {}
The next EPL shows a data flow that consists of an operator MyOperator
that receives a single input stream myInStream
and produces a single output stream myOutStream
holding MyEvent
events.
The EPL configures the operator parameter myParameter
with a value of 10:
create dataflow MyDataFlow create schema MyEvent as (id string, price double), MyOperator(myInStream) -> myOutStream<MyEvent> { myParameter : 10 }
The next sections outline input stream, output stream and parameter assignment in greater detail.
The EPL shown next declares myInStream
and assigns the alias mis
:
create dataflow MyDataFlow MyOperator(myInStream as mis) {}
create dataflow MyDataFlow MyOperator(streamOne as one, streamTwo as two) {}
create dataflow MyDataFlow MyOperator( (streamA, streamB) as streamsAB) {}
Input and output stream names can have the dot-character in their name.
The following is also valid EPL:
create dataflow MyDataFlow MyOperator(my.in.stream) -> my.out.stream {}
create dataflow MyDataFlow MyOperator -> my.out.one, my.out.two {}
create dataflow MyDataFlow create objectarray schema RFIDSchema (tagId string, locX double, locY double), MyOperator -> rfid.stream<RFIDSchema> {}
create dataflow MyDataFlow create objectarray schema RFIDSchema (tagId string, locX double, locy double), MyOperator -> rfid.stream<eventbean<RFIDSchema>> {}
Use questionmark (?
) to indicate that the type of events is not known in advance.
In the next EPL the stream my.stream
carries EventBean
instances of any type:
create dataflow MyDataFlow MyOperator -> my.stream<eventbean<?>> {}
name : value_expr [,...]
The next EPL demonstrates operator parameters that are scalar values:
create dataflow MyDataFlow MyOperator { stringParam : 'sample', secondString : "double-quotes are fine", intParam : 10 }
The EPL shown below lists operator parameters that are expressions:
create dataflow MyDataFlow MyOperator { intParam : 24*60*60, threshold : var_threshold // a variable defined in the engine }
The following EPL sets operator parameters to a value obtained from a system property:
create dataflow MyDataFlow MyOperator { someSystemProperty : systemProperties('mySystemProperty') }
The below EPL demonstrates operator parameters that are JSON values:
create dataflow MyDataFlow MyOperator { myStringArray: ['a', "b"], myMapOrObject: { a : 10, b : 'xyz', }, myInstance: { class: 'com.myorg.myapp.MyImplementation', myValue : 'sample' } }
The below table summarizes the built-in data flow operators (Esper only) available:
Table 13.1. Esper Built-in Operators
Operator | Description |
---|---|
BeaconSource |
Utility source that generates events. See Section 13.3.1, “BeaconSource”. |
Emitter |
Special operator for injecting events into a stream. See Section 13.4.5, “Start Captive”. |
EPStatementSource |
One or more EPL statements act as event sources. See Section 13.3.2, “EPStatementSource”. |
EventBusSink |
The event bus is the sink: Sends events from the data flow into the event bus. See Section 13.3.3, “EventBusSink”. |
EventBusSource |
The event bus is the source: Receives events from the event bus into the data flow. See Section 13.3.4, “EventBusSource”. |
Filter |
Filters an input stream and produces an output stream containing the events passing the filter criteria. See Section 13.3.5, “Filter”. |
LogSink |
Utility sink that outputs events to console or log. See Section 13.3.6, “LogSink”. |
Select |
An EPL select statement that executes on the input stream events. See Section 13.3.7, “Select”. |
The below table summarizes the built-in EsperIO data flow operators. Please see the EsperIO documentation and source for more information.
Table 13.2. EsperIO Built-in Operators
Operator | Description |
---|---|
AMQPSource |
Attaches to AMQP broker to receive messages to process. |
AMQPSink |
Attaches to AMQP broker to send messages. |
FileSource |
Reads one or more files and produces events from file data. |
FileSink |
Write one or more files from events received. |
The BeaconSource operator generates events and populates event properties.
The BeaconSource operator does not accept any input streams and has no input ports.
Parameters for the BeaconSource operator are all optional parameters:
Event properties to be populated can simply be added to the parameters.
If your declaration provides an event type for the output stream then BeaconSource will populate event properties of the underlying events. If no event type is specified, BeaconSource creates an anonymous object-array event type to carry the event properties that are generated and associates this type with its output stream.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type // BeaconSource that produces empty object-array events without delay // or interval until cancelled. BeaconSource -> stream.one {} // BeaconSource that produces one RFIDSchema event populating event properties // from a user-defined function "generateTagId" and the provided values. BeaconSource -> stream.two<SampleSchema> { iterations : 1, tagId : generateTagId(), locX : 10 } // BeaconSource that produces 10 object-array events populating // the price property with a random value. BeaconSource -> stream.three { iterations : 1, interval : 10, // every 10 seconds initialDelay : 5, // start after 5 seconds price : Math.random() * 100 }
The EPStatementSource operator does not accept any input streams and has no input ports.
Either the statement name or the statement filter parameter is required:
If a statement name is provided, the operator subscribes to output events of the statement if the statement exists or when it gets created at a later point in time.
If a statement filter is provided instead, the operator subscribes to output events of all statements that currently exist and pass the filter pass
method or that get created at a later point in time and pass the filter pass
method.
The collector
can be specified to transform output events. If no collector is specified the operator submits the underlying events of the insert stream received from the statement. The collector object must implement the interface EPDataFlowIRStreamCollector
.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type // Consider only the statement named MySelectStatement when it exists. // No transformation. EPStatementSource -> stream.one<eventbean<?>> { statementName : 'MySelectStatement' } // Consider all statements that match the filter object provided. // No transformation. EPStatementSource -> stream.two<eventbean<?>> { statementFilter : { class : 'com.mycompany.filters.MyStatementFilter' } } // Consider all statements that match the filter object provided. // With collector that performs transformation. EPStatementSource -> stream.two<SampleSchema> { collector : { class : 'com.mycompany.filters.MyCollector' }, statementFilter : { class : 'com.mycompany.filters.MyStatementFilter' } }
The EventBusSink operator cannot declare any output streams.
Parameters for the EventBusSink operator are all optional parameters:
The collector
can be specified to transform data flow events to event bus events. If no collector is specified the operator submits the events directly to the event bus. The collector object must implement the interface EPDataFlowEventCollector
.
Examples are:
create dataflow MyDataFlow BeaconSource -> instream<SampleSchema> {} // produces a sample stream // Send SampleSchema events produced by beacon to the event bus. EventBusSink(instream) {} // Send SampleSchema events produced by beacon to the event bus. // With collector that performs transformation. EventBusSink(instream) { collector : { class : 'com.mycompany.filters.MyCollector' } }
The EventBusSource operator does not accept any input streams and has no input ports.
All parameters to EventBusSource are optional:
The collector
can be specified to transform output events. If no collector is specified the operator submits the underlying events of the stream received from the event bus. The collector object must implement the interface EPDataFlowEventBeanCollector
.
The filter
is an expression that the event bus compiles and efficiently matches even in the presence of a large number of event bus sources. The filter expression must return a boolean-typed value, returning true for those events that the event bus passes to the operator.
Examples are:
create dataflow MyDataFlow // Receive all SampleSchema events from the event bus. // No transformation. EventBusSource -> stream.one<SampleSchema> {} // Receive all SampleSchema events with tag id '001' from the event bus. // No transformation. EventBusSource -> stream.one<SampleSchema> { filter : tagId = '001' } // Receive all SampleSchema events from the event bus. // With collector that performs transformation. EventBusSource -> stream.two<SampleSchema> { collector : { class : 'com.mycompany.filters.MyCollector' }, }
The Filter operator accepts a single input stream.
The Filter operator has a single required parameter:
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type BeaconSource -> samplestream<SampleSchema> {} // sample source // Filter all events that have a tag id of '001' Filter(samplestream) -> tags_001 { filter : tagId = '001' } // Filter all events that have a tag id of '001', // putting all other events into the second stream Filter(samplestream) -> tags_001, tags_other { filter : tagId = '001' }
The LogSink operator cannot declare any output streams.
Parameters for the LogSink operator are all optional parameters:
Examples are:
create dataflow MyDataFlow BeaconSource -> instream {} // produces sample stream to use below // Output textual event to log using defaults. LogSink(instream) {} // Output JSON-formatted to console. LogSink(instream) { format : 'json', layout : '%t [%e]', log : false, linefeed : true, title : 'My Custom Title:' }
The Select operator accepts one or more input streams.
The Select operator requires a single output stream.
The Select operator requires the select
parameter, all other parameters are optional:
Set the optional iterate
flag to false (the default) to have the operator output results continuously. Set the iterate
flag to true to indicate that the operator outputs results only when the final marker arrives. If iterate
is true then output rate limiting clauses are not supported.
The select
parameter is required and provides an EPL select statement within parenthesis.
For each input port the statement should list the input stream name or the alias name in the from
clause. Only filter-based streams are allowed in the from
clause and patterns or named windows are not supported. Also not allowed are
the insert into
clause, the irstream
keyword and subselects.
The Select operator determines the event type of output events based on the select
clause. It is not necessary to declare an event type for the output stream.
Examples are:
create dataflow MyDataFlow create schema SampleSchema(tagId string, locX double), // sample type BeaconSource -> instream<SampleSchema> {} // sample stream BeaconSource -> secondstream<SampleSchema> {} // sample stream // Simple continuous count of events Select(instream) -> outstream { select: (select count(*) from instream) } // Demonstrate use of alias Select(instream as myalias) -> outstream { select: (select count(*) from myalias) } // Output only when the final marker arrives Select(instream as myalias) -> outstream { select: (select count(*) from myalias), iterate: true } // Same input port for the two sample streams Select( (instream, secondstream) as myalias) -> outstream { select: (select count(*) from myalias) } // A join with multiple input streams, // joining the last event per stream forming pairs Select(instream, secondstream) -> outstream { select: (select a.tagId, b.tagId from instream.std:lastevent() as a, secondstream.std:lastevent() as b) } // A join with multiple input streams and using aliases. Select(instream as S1, secondstream as S2) -> outstream { select: (select a.tagId, b.tagId from S1.std:lastevent() as a, S2.std:lastevent() as b) }
This section outlines the steps to declare, instantiate, execute and cancel or complete data flows.
The next program code snippet declares a data flow to the engine:
String epl = "@Name('MyStatementName') create dataflow HelloWorldDataFlow\n" + "BeaconSource -> helloworldStream { text: 'hello world' , iterations: 1}\n" + "LogSink(helloworldStream) {}"; EPStatement stmt = epService.getEPAdministrator().createEPL(epl);
The following code snippet instantiates the data flow:
EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime().instantiate("HelloWorldDataFlow");
The following table outlines all states:
The next code snippet executes the data flow instance as a blocking call:
instance.run();
The next code snippet executes the data flow instance as a non-blocking call:
instance.start();
Use the cancel
method to cancel execution of a running data flow instance:
instance.cancel();
instance.join();
EPDataFlowInstantiationOptions options = new EPDataFlowInstantiationOptions(); options.addParameterURI("FileSource/file", filename); EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime() .instantiate("MyFileReaderDataFlow",options); instance.run();
Set the operatorStatistics
flag to true to obtain statistics for operator execution.
Set the cpuStatistics
flag to true to obtain CPU statistics for operator execution.
The example EPL below creates a data flow that uses emitter.
create dataflow HelloWorldDataFlow create objectarray schema SampleSchema(text string), // sample type Emitter -> helloworld.stream<SampleSchema> { name: 'myemitter' } LogSink(helloworld.stream) {}
EPDataFlowInstance instance = epService.getEPRuntime().getDataFlowRuntime().instantiate("HelloWorldDataFlow", options); EPDataFlowInstanceCaptive captiveStart = instance.startCaptive(); Emitter emitter = captiveStart.getEmitters().get("myemitter"); emitter.submit(new Object[] {"this is some text"});
create dataflow RollingTopWords create objectarray schema WordEvent (word string), Emitter -> wordstream<WordEvent> {name:'a'} {} // Produces word stream Select(wordstream) -> wordcount { // Sliding time window count per word select: (select word, count(*) as wordcount from wordstream.win:time(30) group by word) } Select(wordcount) -> wordranks { // Rank of words select: (select window(*) as rankedWords from wordcount.ext:sort(3, wordcount desc) output snapshot every 2 seconds) } LogSink(wordranks) {}
create dataflow VWAPSample create objectarray schema TradeQuoteType as (type string, ticker string, price double, volume long, askprice double, asksize long), MyObjectArrayGraphSource -> TradeQuoteStream<TradeQuoteType> {} Filter(TradeQuoteStream) -> TradeStream { filter: type = "trade" } Filter(TradeQuoteStream) -> QuoteStream { filter: type = "quote" } Select(TradeStream) -> VwapTrades { select: (select ticker, sum(price * volume) / sum(volume) as vwap, min(price) as minprice from TradeStream.std:groupwin(ticker).win:length(4) group by ticker) } Select(VwapTrades as T, QuoteStream as Q) -> BargainIndex { select: (select case when vwap > askprice then asksize * (Math.exp(vwap - askprice)) else 0.0d end as index from T.std:unique(ticker) as t, Q.std:lastevent() as q where t.ticker = q.ticker) } LogSink(BargainIndex) {}
create dataflow WordCount MyLineFeedSource -> LineOfTextStream {} MyTokenizerCounter(LineOfTextStream) -> SingleLineCountStream {} MyWordCountAggregator(SingleLineCountStream) -> WordCountStream {} LogSink(WordCountStream) {}
create dataflow WordCount MyLineFeedSource -> LineOfTextStream {} MyTokenizerCounter(LineOfTextStream) -> SingleLineCountStream {} MyWordCountAggregator(SingleLineCountStream) -> WordCountStream {} LogSink(WordCountStream) {}
In order to resolve application operators, add the package or operator class to imports:
// Sample code adds 'package.*' to simply import the package. epService.getEPAdministrator().getConfiguration() .addImport(MyTokenizerCounter.class.getPackage().getName() + ".*");
The implementation class must implement the DataFlowSourceOperator
interface.
The implementation for the sample MyLineFeedSource
with comments is:
// The OutputTypes annotation can be used to specify the type of events // that are output by the operator. // If provided, it is not necessary to declare output types in the data flow. // The event representation is object-array. @OutputTypes(value = { @OutputType(name = "line", typeName = "String") }) // Provide the DataFlowOpProvideSignal annotation to indicate that // the source operator provides a final marker. @DataFlowOpProvideSignal public class MyLineFeedSource implements DataFlowSourceOperator { // Use the DataFlowContext annotation to indicate the field that receives the emitter. // The engine provides the emitter. @DataFlowContext private EPDataFlowEmitter dataFlowEmitter; // Mark a parameter using the DataFlowOpParameter annotation @DataFlowOpParameter private String myStringParameter; private final Iterator<String> lines; public MyLineFeedSource(Iterator<String> lines) { this.lines = lines; } // Invoked by the engine at time of data flow instantiation. public DataFlowOpInitializeResult initialize(DataFlowOpInitializateContext context) throws Exception { return null; // can return type information here instead } // Invoked by the engine at time of data flow instante execution. public void open(DataFlowOpOpenContext openContext) { // attach to input } // Invoked by the engine in a tight loop. // Submits the events which contain lines of text. public void next() { // read and submit events if (lines.hasNext()) { dataFlowEmitter.submit(new Object[] {lines.next()}); } else { dataFlowEmitter.submitSignal(new EPDataFlowSignalFinalMarker() {}); } } // Invoked by the engine at time of cancellation or completion. public void close(DataFlowOpCloseContext openContext) { // detach from input } }
The implementation for the sample MyTokenizerCounter
with comments is:
// Annotate with DataFlowOperator so the engine knows its a data flow operator @DataFlowOperator @OutputTypes({ @OutputType(name = "line", type = int.class), @OutputType(name = "wordCount", type = int.class), @OutputType(name = "charCount", type = int.class) }) public class MyTokenizerCounter { @DataFlowContext private EPDataFlowEmitter dataFlowEmitter; // Name the method that receives data onInput(...) public void onInput(String line) { // tokenize StringTokenizer tokenizer = new StringTokenizer(line, " \t"); int wordCount = tokenizer.countTokens(); int charCount = 0; while(tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); charCount += token.length(); } // submit count of line, words and characters dataFlowEmitter.submit(new Object[] {1, wordCount, charCount}); } }
The implementation for the sample MyWordCountAggregator
with comments is:
@DataFlowOperator @OutputTypes(value = { @OutputType(name = "stats", type = MyWordCountStats.class) }) public class MyWordCountAggregator { @DataFlowContext private EPDataFlowEmitter dataFlowEmitter; private final MyWordCountStats aggregate = new MyWordCountStats(); public void onInput(int lines, int words, int chars) { aggregate.add(lines, words, chars); } // Name the method that receives a marker onSignal public void onSignal(EPDataFlowSignal signal) { // Received puntuation, submit aggregated totals dataFlowEmitter.submit(aggregate); } }