www.espertech.comDocumentation
Esper has the following primary interfaces:
The event and event type interfaces are described in Section 15.6, “Event and Event Type”.
The administrative interface to create and manage EPL and pattern statements, and set runtime configurations, is described in Section 15.3, “The Administrative Interface”.
The runtime interface to send events into the engine, set and get variable values and execute on-demand queries, is described in Section 15.4, “The Runtime Interface”.
For EPL introductory information please see Section 5.1, “EPL Introduction” and patterns are described at Section 7.1, “Event Pattern Overview”.
The JavaDoc documentation is also a great source for API information.
Create event pattern expression and EPL statements via the administrative interface EPAdministrator
.
For managing one or more related statements as a module, please consider the deployment administrative API and EPL modules as further described in Section 17.4, “Packaging and Deploying Overview”.
This code snippet gets an Esper engine then creates an event pattern and an EPL statement.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPStatement 10secRecurTrigger = admin.createPattern( "every timer:at(*, *, *, *, *, */10)"); EPStatement countStmt = admin.createEPL( "select count(*) from MarketDataBean.win:time(60 sec)");
Note that event pattern expressions can also occur within EPL statements. This is outlined in more detail in Section 5.4.2, “Pattern-based Event Streams”.
The create
methods on EPAdministrator
are overloaded and allow an optional statement name to be passed to the engine. A statement name can be useful for retrieving a statement
by name from the engine at a later time. The engine assigns a statement name if no statement name is supplied on statement creation.
The createPattern
and createEPL
methods return EPStatement
instances. Statements are automatically started and active when created. A statement can also be stopped and started again via the stop
and start
methods shown in the code snippet below.
countStmt.stop(); countStmt.start();
The create
methods on EPAdministrator
also accept a user object.
The user object is associated with a statement at time of statement creation and is a single, unnamed field that is stored with every statement.
Applications may put arbitrary objects in this field. Use the getUserObject
method on EPStatement
to obtain the user object of a statement
and StatementAwareUpdateListener
for listeners.
Your application may create new statements or stop and destroy existing statements using any thread and also within listener or subscriber code. If using POJO events, your application may not create or manage statements in the event object itself while the same event is currently being processed by a statement.
Esper provides three choices for your application to receive statement results. Your application can use all three mechanisms alone or in any combination for each statement. The choices are:
Table 15.1. Choices For Receiving Statement Results
Name | Methods on EPStatement | Description |
---|---|---|
Listener Callbacks | addListener and removeListener |
Your application provides implementations of the The engine continuously indicates results to all listeners as soon they occur, and following output rate limiting clauses if specified. |
Subscriber Object | setSubscriber |
Your application provides a POJO (plain Java object) that exposes methods to receive statement results.
The name of the method that a subscriber object provides to receive results is The engine continuously indicates results to the single subscriber as soon they occur, and following output rate limiting clauses if specified.
This is the fastest method to receive statement results, as the engine delivers strongly-typed results directly to your application objects without the need for
building an There can be at most 1 Subscriber Object registered per statement. If you require more than one listener, use the Listener Callback instead (or in addition). The Subscriber Object is bound to the statement with a strongly typed support which ensure direct delivery of new events without type conversion. This optimization is made possible because there can only be 0 or 1 Subscriber Object per statement. |
Pull API | safeIterator and iterator |
Your application asks the statement for results and receives a set of events via This is useful if your application does not need continuous indication of new results in real-time. |
Your application may attach one or more listeners, zero or one single subscriber and in addition use the Pull API on the same statement. There are no limitations to the use of iterator, subscriber or listener alone or in combination to receive statement results.
The best delivery performance can generally be achieved by attaching a subscriber and by not attaching listeners. The engine is aware of the listeners and subscriber attached to a statement. The engine uses this information internally to reduce statement overhead. For example, if your statement does not have listeners or a subscriber attached, the engine does not need to continuously generate results for delivery.
If your application attaches both a subscriber and one or more listeners then the subscriber receives the result first before any of the listeners.
If your application attaches more than one listener then the UpdateListener
listeners receive results first in the order they were added to the statement, and
StatementAwareUpdateListener
listeners receive results next in the order they were added to the statement. To change the order of delivery among listeners your application can add and remove listeners at runtime.
If you have configured outbound threading, it means a thread from the outbound thread pool delivers results to the subscriber and listeners instead of the processing or event-sending thread.
If outbound threading is turned on, we recommend turning off the engine setting preserving the order of events delivered to listeners as described in Section 16.4.10.1, “Preserving the order of events delivered to listeners”. If outbound threading is turned on statement execution is not blocked for the configured time in the case a subscriber or listener takes too much time.
A subscriber object is a direct binding of query results to a Java object. The object, a POJO, receives statement results via method invocation. The subscriber class does not need to implement an interface or extend a superclass. Only one subscriber object may be set for a statement.
Subscriber objects have several advantages over listeners. First, they offer a substantial performance benefit: Query results are delivered directly to your method(s) through Java virtual machine method calls, and there is no intermediate representation (EventBean
). Second, as subscribers receive strongly-typed parameters, the subscriber code tends to be simpler.
This chapter describes the requirements towards the methods provided by your subscriber class.
The engine can deliver results to your subscriber in two ways:
Each evert in the insert stream results in a method invocation, and each event in the remove stream results in further method invocations. This is termed row-by-row delivery.
A single method invocation that delivers all rows of the insert and remove stream. This is termed multi-row delivery.
Your subscriber class must provide a method by name update
to receive insert stream events row-by-row. The number and types of parameters declared by the update
method must match the number and types of columns as specified in the select
clause, in the same order as in the select
clause.
For example, if your statement is:
select orderId, price, count(*) from OrderEvent
Then your subscriber update
method looks as follows:
public class MySubscriber { ... public void update(String orderId, double price, long count) {...} ... }
Each method parameter declared by the update
method must be assignable from the respective column type as listed in the select
-clause, in the order selected. The assignability rules are:
Widening of types follows Java standards. For example, if your select
clause selects an integer value, the method parameter for the same column can be typed int, long, float or double (or any equivalent boxed type).
Auto-boxing and unboxing follows Java standards. For example, if your select
clause selects an java.lang.Integer
value, the method parameter for the same column can be typed int
. Note that if your select
clause column may generate null
values, an exception may occur at runtime unboxing the null
value.
Interfaces and super-classes are honored in the test for assignability. Therefore java.lang.Object
can be used to accept any select
clause column type
In the case that your subscriber class offers multiple update
method footprints, the engine selects the closest-matching footprint by comparing the output types and method parameter types. The engine prefers the update method that is an exact match of types, followed by an update method that requires boxing or unboxing, followed by an update method that requires widening and finally any other allowable update method.
For example, your statement may be:
select *, count(*) from OrderEvent
Then your subscriber update
method looks as follows:
public void update(OrderEvent orderEvent, long count) {...}
select *, count(*) from OrderEvent order, OrderHistory hist
Then your subscriber update
method should be:
public void update(OrderEvent orderEvent, OrderHistory orderHistory, long count) {...}
The stream wildcard syntax and the stream name itself can also be used:
select hist.*, order from OrderEvent order, OrderHistory hist
The matching update
method is:
public void update(OrderHistory orderHistory, OrderEvent orderEvent) {...}
The update
method for Map
delivery is:
public void update(Map row) {...}
public void update(Object[] row) {...}
select orderId, count(*) from OrderEvent.win:time(20 sec) group by orderId
Then your subscriber update
and updateRStream
methods should be:
public void update(String, long count) {...} public void updateRStream(String orderId, long count) {...}
An example set of delivery methods:
// Called by the engine before delivering events to update methods public void updateStart(int insertStreamLength, int removeStreamLength) // To deliver insert stream events public void update(String orderId, long count) {...} // To deliver remove stream events public void updateRStream(String orderId, long count) {...} // Called by the engine after delivering events public void updateEnd() {...}
In place of row-by-row delivery, your subscriber can receive all events in the insert and remove stream via a single method invocation. This is applicable when an EPL delivers multiple output rows for a given input event or time advancing, for example when multiple pattern matches occur for the same incoming event, for a join producing multiple output rows or with output rate limiting, for example.
The event delivery follow the scheme as described earlier in Section 15.3.3.1.2, “Row Delivery as Map and Object Array ”. The subscriber class must provide one of the following methods:
Your application can subscribe to updates posted by a statement via the addListener
and removeListener
methods on EPStatement
. Your application must to provide an implementation of the UpdateListener
or the StatementAwareUpdateListener
interface to the statement:
UpdateListener myListener = new MyUpdateListener(); countStmt.addListener(myListener);
EPL statements and event patterns publish old data and new data to registered UpdateListener
listeners.
New data published by statements is the events representing the new values of derived data held by the statement.
Old data published by statements constists of the events representing the prior values of derived data held by the statement.
UpdateListener
listeners receive multiple result rows in one invocation by the engine: the new data and old data parameters to your listener are array parameters. For example, if your application uses one of the batch data windows, or your application creates a pattern that matches multiple times when a single event arrives, then the engine indicates such multiple result rows in one invocation and your new data array carries two or more rows.
A second listener interface is the StatementAwareUpdateListener
interface. A StatementAwareUpdateListener
is especially useful for registering the same listener object with multiple statements,
as the listener receives the statement instance and engine instance in addition to new and old data when the engine indicates new results to a listener.
StatementAwareUpdateListener myListener = new MyStmtAwareUpdateListener(); statement.addListener(myListener);
To indicate results the engine invokes this method on StatementAwareUpdateListener
listeners: update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider)
Subscribing to events posted by a statement is following a push model. The engine pushes data to listeners when events are received that cause data to change or patterns to match. Alternatively, you need to know that statements serve up data that your application can obtain via the safeIterator
and iterator
methods on EPStatement
. This is called the pull API and can come in handy if your application is not interested in all new updates, and only needs to perform a frequent or infrequent poll for the latest data.
The safeIterator
method on EPStatement
returns a concurrency-safe iterator returning current statement results, even while concurrent threads may send events into the engine for processing. The engine employs a read-write lock per context partition and obtains a read lock for iteration. Thus safe iterator guarantees correct results even as events are being processed by other threads and other context partitions. The cost is that the iterator obtains and holds zero, one or multiple context partition locks for that statement that must be released via the close
method on the SafeIterator
instance.
The iterator
method on EPStatement
returns a concurrency-unsafe iterator. This iterator is only useful for applications that are single-threaded, or applications that themselves perform coordination between the iterating thread and the threads that send events into the engine for processing. The advantage to this iterator is that it does not hold a lock.
When statements are used with contexts and context partitions, the APIs to identify, filter and select context partitions for statement iteration are described in Section 15.19, “Context Partition Selection”.
The next code snippet shows a short example of use of safe iterators:
EPStatement statement = epAdmin.createEPL("select avg(price) as avgPrice from MyTick"); // .. send events into the engine // then use the pull API... SafeIterator<EventBean> safeIter = statement.safeIterator(); try { for (;safeIter.hasNext();) { // .. process event .. EventBean event = safeIter.next(); System.out.println("avg:" + event.get("avgPrice"); } } finally { safeIter.close(); // Note: safe iterators must be closed }
This is a short example of use of the regular iterator that is not safe for concurrent event processing:
double averagePrice = (Double) eplStatement.iterator().next().get("average");
The safeIterator
and iterator
methods can be used to pull results out of all statements, including statements that join streams, contain aggregation functions, pattern statements, and statements that contain a where
clause, group by
clause, having
clause or order by
clause.
For statements without an order by
clause, the iterator
method returns events in the order maintained by the data window. For statements that contain an order by
clause, the iterator
method returns events in the order indicated by the order by
clause.
Consider using the on-select
clause and a named window if your application requires iterating over a partial result set or requires indexed access for fast iteration; Note that on-select
requires that you sent a trigger event, which may contain the key values for indexed access.
Esper places the following restrictions on the pull API and usage of the safeIterator
and iterator
methods:
In multithreaded applications, use the safeIterator
method. Note: make sure your application closes the iterator via the close
method when done, otherwise the iterated statement context partitions stay locked and event processing for statement context partitions does not resume.
In multithreaded applications, the iterator
method does not hold any locks. The iterator returned by this method does not make any guarantees towards correctness of results and fail-behavior, if your application processes events into the engine instance by multiple threads. Use the safeIterator
method for concurrency-safe iteration instead.
Since the safeIterator
and iterator
methods return events to the application immediately, the iterator does not honor an output rate limiting clause, if present. That is, the iterator returns results as if there is no output-rate clause for the statement in statements without grouping or aggregation. For statements with grouping or aggregation, the iterator in combintion with an output clause returns last output group and aggregation results. Use a separate statement and the insert into
clause to control the output rate for iteration, if so required.
When iterating a statement that operates on an unbound stream (no data window declared), please note the following:
When iterating a statement that groups and aggregates values from an unbound stream and that specifies output snapshot
, the engine retains groups and aggregations for output as iteration results or upon the output snapshot condition .
When iterating a statement that groups and aggregates values from an unbound stream and that does not specify output snapshot
, the engine only retains the last aggregation values and the iterated result contains only the last updated group.
When iterating a statement that operates on an unbound stream the iterator returns no rows. This behavior can be changed by specifying either the @IterableUnbound
annotation or by changing the global view resources configuration.
Certain configuration changes are available to perform on an engine instance while in operation. Such configuration operations are available via the getConfiguration
method on EPAdministrator
,
which returns a ConfigurationOperations
object.
Please consult the JavaDoc of ConfigurationOperations
for further information. The section Section 16.6, “Runtime Configuration” provides a summary of available configurations.
In summary, the configuration operations available on a running engine instance are as follows:
Add new event types for all event representations, check if an event type exists, update an existing event type, remove an event type, query a list of types and obtain a type by name.
Add and remove variables (get and set variable values is done via the runtime API).
Add a variant stream.
Add a revision event type.
Add event types for all event classes in a given Java package, using the simple class name as the event name.
Add import for user-defined functions.
Add a plug-in aggregation function, plug-in single row function, plug-in event type, plug-in event type resolution URIs.
Control metrics reporting.
Additional items please see the ConfigurationOperations
interface.
For examples of above runtime configuration API functions please consider the Configuration chapter, which applies to both static configuration and runtime configuration as the ConfigurationOperations
interface is the same.
The EPRuntime
interface is used to send events for processing into an Esper engine, set and get variable values and execute on-demand queries.
The below code snippet shows how to send a Java object event to the engine. Note that the sendEvent
method is overloaded. As events can take on different representation classes in Java, the sendEvent
takes parameters to reflect the different
types of events that can be send into the engine. The Chapter 2, Event Representations section explains the types of events
accepted.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); // Send an example event containing stock market data runtime.sendEvent(new MarketDataBean('IBM', 75.0));
Events, in theoretical terms, are observations of a state change that occurred in the past. Since one cannot change an event that happened in the past, events are best modelled as immutable objects.
The engine relies on events that are sent into an engine to not change their state. Typically, applications create a new event object for every new event, to represent that new event. Application should not modify an existing event that was sent into the engine.
Another important method in the runtime interface is the route
method. This method is designed for use by UpdateListener
and
subscriber implementations as well as engine extensions that need to send events into an engine instance to avoid the possibility of a stack overflow due to nested calls
to sendEvent
and to ensure correct processing of the current and routed event.
Note that if outbound-threading is enabled, listeners and subscribers should use sendEvent
and not route
.
EventSender sender = epService.getEPRuntime().getEventSender("MyEvent"); sender.sendEvent(myEvent);
A second method to obtain an event sender is the method getEventSender(URI[])
, which takes an array of URIs. This method is for use with plug-in event representations.
The event sender returned by this method processes event objects that are of one of the types of one or more plug-in event representations. Please consult Section 18.8, “Event Type And Event Object” for more information.
Esper provides the facility to explicitly index named windows and tables to speed up on-demand and continuous queries. Please consult Section 6.9, “Explicitly Indexing Named Windows and Tables” for more information.
When named windows and tables are used with contexts and context partitions, the APIs to identify, filter and select context partitions for on-demand queries can be found in Section 15.19, “Context Partition Selection”.
The EPRuntime
interface provides three ways to run on-demand queries:
Use the executeQuery
method to executes a given on-demand query exactly once, see Section 15.5.1, “On-Demand Query Single Execution”.
Use the prepareQuery
method to prepare a given on-demand query such that the same query can be executed multiple times without repeated parsing, see Section 15.5.2, “On-Demand Query Prepared Unparameterized Execution”.
Use the prepareQueryWithParameters
method to prepare a given on-demand query that may have substitution parameters such that the same query can be parameterized and executed multiple times without repeated parsing, see Section 15.5.3, “On-Demand Query Prepared Parameterized Execution”
If your application must execute the same EPL on-demand query multiple times with different parameters use prepareQueryWithParameters
.
If your application must execute the same EPL on-demand query multiple times without use either prepareQuery
or prepareQueryWithParameters
and specify no substitution parameters.
By using any of the prepare...
methods the engine can compile an EPL query string or object model once and reuse the object and thereby speed up repeated execution.
The following limitations apply:
An on-demand EPL expression only evaluates against the named windows and tables that your application creates. On-demand queries may not specify any other streams or application event types.
The following clauses are not allowed in on-demand EPL: insert into
and output
.
Views and patterns are not allowed to appear in on-demand queries.
On-demand EPL may not perform subqueries.
The previous
and prior
functions may not be used.
Prepared on-demand queries are designed for repeated execution and may perform better then the dynamic single-execution method if running the same query multiple times. For use with parameter placeholders please see Section 15.5.3, “On-Demand Query Prepared Parameterized Execution”.
The next code snippet demonstrates prepared on-demand queries without parameter placeholder:
String query = "select * from MyNamedWindow where orderId = '123'" EPOnDemandPreparedQuery prepared = epRuntime.prepareQuery(query); EPOnDemandQueryResult result = prepared.execute(); // ...later on execute once more ... prepared.execute(); // execute a second time
?:name
If substitution parameters do not have a name assigned, the engine assigns the first substitution parameter an index of 1 and subsequent parameters increment the index by one. Please see Section 15.14, “Prepared Statement and Substitution Parameters” for additional detail and examples.
Substitution parameters can be inserted into any EPL construct that takes an expression.
All substitution parameters must be replaced by actual values before an on-demand query with substitution parameters can be executed. Substitution parameters can be replaced with an actual value using the setObject
method for each index or name. Substitution parameters can be set to new values and the query executed more than once.
While the setObject
method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the requirements of the expression the parameter resides in.
The next program listing runs a prepared and parameterized on-demand query against a named window MyNamedWindow
(this example does not assign names to substitution parameters):
String query = "select * from MyNamedWindow where orderId = ?"; EPOnDemandPreparedQueryParameterized prepared = epRuntime.prepareQueryWithParameters(query); // Set the required parameter values before each execution prepared.setObject(1, "123"); result = epRuntime.executeQuery(prepared); // ...execute a second time with new parameter values... prepared.setObject(1, "456"); result = epRuntime.executeQuery(prepared);
This second example uses the in
operator and has multiple parameters:
String query = "select * from MyNamedWindow where orderId in (?) and price > ?"; EPOnDemandPreparedQueryParameterized prepared = epRuntime.prepareQueryWithParameters(query); prepared.setObject(1, new String[] {"123", "456"}); prepared.setObject(2, 1000.0});
An EventBean
object represents a row (event) in your continuous query's result set. Each EventBean
object has an associated EventType
object providing event metadata.
An UpdateListener
implementation receives one or more EventBean
events with each invocation. Via the iterator
method on EPStatement
your application can poll or read data out of statements. Statement iterators also return EventBean
instances.
Each statement provides the event type of the events it produces, available via the getEventType
method on EPStatement
.
An event object is an EventBean
that provides:
The underlying event object of an EventBean
can be obtained via the getUnderlying
method. Please see Chapter 2, Event Representations for more information on different event representations.
From a threading perspective, it is safe to retain and query EventBean
and EventType
objects in multiple threads.
select symbol, avg(price) as avgprice, count(*) as mycount from org.sample.StockTickEvent group by symbol
The next table summarizes the property names and types as posted by the statement above:
A code snippet out of a possible UpdateListener
implementation to this statement may look as below:
String symbol = (String) newEvents[0].get("symbol"); Double price= (Double) newEvents[0].get("avgprice"); Long count= (Long) newEvents[0].get("mycount");
The engine supplies the boxed java.lang.Double
and java.lang.Long
types as property values rather than primitive Java types. This is because aggregated values can return a null
value to indicate that no data is available for aggregation. Also, in a select statement that computes expressions, the underlying event objects to EventBean
instances are either of type Object[]
(object-array) or of type java.util.Map
.
Use statement.getEventType().getUnderlyingType()
to inspect the underlying type for all events delivered to listeners. Whether the engine delivers Map or Object-array events to listeners can be specified as follows. If the statement provides the @EventRepresentation(array=true)
annotation the engine delivers the output events as object array. If the statement provides the @EventRepresentation(array=false)
annotation the engine delivers output events as a Map. If neither annotation is provided, the engine delivers the configured default event representation as discussed in Section 16.4.11.1, “Default Event Representation”.
Consider the next statement that specifies a wildcard selecting the same type of event:
select * from org.sample.StockTickEvent where price > 100
The property names and types provided by an EventBean
query result row, as posted by the statement above are as follows:
As an alternative to querying individual event properties via the get
methods, the getUnderlying
method on EventBean
returns the underlying object representing the query result.
In the sample statement that features a wildcard-select, the underlying event object is of type org.sample.StockTickEvent
:
StockTickEvent tick = (StockTickEvent) newEvents[0].getUnderlying();
// Look for a pattern where BEvent follows AEvent String pattern = "a=AEvent -> b=BEvent"; EPStatement stmt = epService.getEPAdministrator().createPattern(pattern); stmt.addListener(testListener);
// Example listener code public class MyUpdateListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("a event=" + newData[0].get("a")); System.out.println("b event=" + newData[0].get("b")); } }
select * from pattern[ every a=A -> (timer:interval(60 sec) and not B(id=a.id))]
EventBean a = (EventBean) newEvents[0].getFragment("a"); // ... or using a nested property expression to get a value out of A event... double value = (Double) newEvent[0].get("a.value");
select * from pattern [a=A until b=B]
A possible code to retrieve different fragments or property values:
EventBean[] a = (EventBean[]) newEvents[0].getFragment("a"); // ... or using a nested property expression to get a value out of A event... double value = (Double) newEvent[0].get("a[0].value");
Esper is designed from the ground up to operate as a component to multi-threaded, highly-concurrent applications that require efficient use of Java VM resources. In addition, multi-threaded execution requires guarantees in predictability of results and deterministic processing. This section discusses these concerns in detail.
In Esper, an engine instance is a unit of separation. Applications can obtain and discard (initialize) one or more engine instances within the same Java VM and can provide the same or different engine configurations to each instance. An engine instance efficiently shares resources between statements. For example, consider two statements that declare the same data window. The engine matches up view declarations provided by each statement and can thus provide a single data window representation shared between the two statements.
Applications can use Esper APIs to concurrently, by multiple threads of execution, perform such functions as creating and managing statements, or sending events into an engine instance for processing. Applications can use application-managed threads or thread pools or any set of same or different threads of execution with any of the public Esper APIs. There are no restrictions towards threading other than those noted in specific sections of this document.
Esper does not prescribe a specific threading model. Applications using Esper retain full control over threading, allowing an engine to be easily embedded and used as a component or library in your favorite Java container or process.
In the default configuration it is up to the application code to use multiple threads for processing events by the engine, if so desired. All event processing takes places within your application thread call stack. The exception is timer-based processing if your engine instance relies on the internal timer (default). If your application relies on external timer events instead of the internal timer then there need not be any Esper-managed internal threads.
The fact that event processing can take place within your application thread's call stack makes developing applications with Esper easier: Any common Java integrated development environment (IDE) can host an Esper engine instance. This allows developers to easily set up test cases, debug through listener code and inspect input or output events, or trace their call stack.
In the default configuration, each engine instance maintains a single timer thread (internal timer) providing for time or schedule-based processing within the engine. The default resolution at which the internal timer operates is 100 milliseconds. The internal timer thread can be disabled and applications can instead send external time events to an engine instance to perform timer or scheduled processing at the resolution required by an application.
Each engine instance performs minimal locking to enable high levels of concurrency. An engine instance locks on a context partition level to protect context partition resources. For stateless EPL select-statements the engine does not use a context-partition lock and operates lock-free for the context partition. For stateful statements, the maximum (theoretical) degree of parallelism is 2^31-1 (2,147,483,647) parallel threads working to process a single EPL statement under a hash segmented context.
You may turn off context partition locking engine-wide (also read the caution notice) as described in Section 16.4.24.3, “Disable Locking”. You may disable context partition locking for a given statement by providing the @NoLock
annotation as part of your EPL. Note, we provide the @NoLock
annotation for the purpose of identifying locking overhead, or when your application is single-threaded, or when using an external mechanism for concurrency control or for example with virtual data windows or plug-in data windows to allow customizing concurrency for a given statement or named window. Using this annotation may have unpredictable results unless your application is taking concurrency under consideration.
For an engine instance to produce predictable results from the viewpoint of listeners to statements, an engine instance by default ensures that it dispatches statement result events to listeners in the order in which a statement produced result events. Applications that require the highest possible concurrency and do not require predictable order of delivery of events to listeners, this feature can be turned off via configuration, see Section 16.4.10.1, “Preserving the order of events delivered to listeners”. For example, assume thread T1 processes an event applied to statement S producing output event O1. Assume thread T2 processes another event applied to statement S and produces output event O2. The engine employs a configurable latch system to ensure that listeners to statement S receive and may complete processing of O1 before receiving O2. When using outbound threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.
In multithreaded environments, when one or more statements make result events available via the insert into
clause to further statements, the engine preserves the order of events inserted into the generated insert-into stream, allowing statements that consume other statement's events to behave deterministic. This feature can also be turned off via configuration, see , see Section 16.4.10.2, “Preserving the order of events for insert-into streams”. For example, assume thread T1 processes an event applied to statement S and thread T2 processes another event applied to statement S. Assume statement S inserts into into stream ST. T1 produces an output event O1 for processing by consumers of ST1 and T2 produces an output event O2 for processing by consumers of ST. The engine employs a configurable latch system such that O1 is processed before O2 by consumers of ST. When using route execution threading (advanced threading options) or changing the configuration this guarantee is weakened or removed.
We generally recommended that listener implementations block minimally or do not block at all. By implementing listener code as non-blocking code execution threads can often achieve higher levels of concurrency.
We recommended that, when using a single listener or subscriber instance to receive output from multiple statements, that the listener or subscriber code is multithread-safe. If your application has shared state between listener or subscriber instances then such shared state should be thread-safe.
Please consult Section 16.4.10, “Engine Settings related to Concurrency and Threading” for instructions on how to configure threading options. Threading options take effect at engine initialization time.
There are two modes for an engine to keep track of time: The internal timer based on JVM system time (the default), and externally-controlled time giving your application full control over the concept of time within an engine or isolated service.
An isolated service is an execution environment separate from the main engine runtime, allowing full control over the concept of time for a group of statements, as further described in Section 15.10, “Service Isolation”.
By default the internal timer provides time and evaluates schedules. External clocking can be used to supply time ticks to the engine instead. The latter is useful for testing time-based event sequences or for synchronizing the engine with an external time source.
The internal timer relies on the java.util.concurrent.ScheduledThreadPoolExecutor
class for time tick events. The next section describes timer resolution for the internal timer, by default set to 100 milliseconds but is configurable via the threading options. When using externally-controlled time the timer resolution is in your control.
To disable the internal timer and use externally-provided time instead, there are two options. The first option is to use the configuration API at engine initialization time. The second option toggles on and off the internal timer at runtime, via special timer control events that are sent into the engine like any other event.
If using a timer execution thread pool as discussed above, the internal timer or external time event provide the schedule evaluation however do not actually perform the time-based processing. The time-based processing is performed by the threads in the timer execution thread pool.
This code snippet shows the use of the configuration API to disable the internal timer and thereby turn on externally-provided time (see the Configuration section for configuration via XML file):
Configuration config = new Configuration(); config.getEngineDefaults().getThreading().setInternalTimerEnabled(false); EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
After disabling the internal timer, it is wise to set a defined time so that any statements created thereafter start relative to the time defined. Use the CurrentTimeEvent
class to indicate current time to the engine
and to move time forward for the engine (a.k.a application-time model).
This code snippet obtains the current time and sends a timer event in:
long timeInMillis = System.currentTimeMillis(); CurrentTimeEvent timeEvent = new CurrentTimeEvent(timeInMillis); epService.getEPRuntime().sendEvent(timeEvent);
Alternatively, you can use special timer control events to enable or disable the internal timer. Use the TimerControlEvent
class to control timer operation at runtime.
The next code snippet demonstrates toggling to external timer at runtime, by sending in a TimerControlEvent
event:
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); // switch to external clocking runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));
Your application sends a CurrentTimeEvent
event when it desires to move the time forward. All aspects of Esper engine time related to EPL statements and patterns are driven by the time provided by the CurrentTimeEvent
that your application sends in.
The next example sequence of instructions sets time to zero, then creates a statement, then moves time forward to 1 seconds later and then 6 seconds later:
// Set start time at zero. runtime.sendEvent(new CurrentTimeEvent(0)); // create a statement here epAdministrator.createEPL("select * from MyEvent output every 5 seconds"); // move time forward 1 second runtime.sendEvent(new CurrentTimeEvent(1000)); // move time forward 5 seconds runtime.sendEvent(new CurrentTimeEvent(6000));
When sending external timer events, your application should make sure that long
-type time values are ascending. That is, each long-type value should be either the same value or a larger value then the prior value provided by a CurrentTimeEvent
.
Your application may use the getNextScheduledTime
method in EPRuntime
to determine the earliest time a schedule for any statement requires evaluation.
The following code snippet sets the current time, creates a statement and prints the next scheduled time which is 1 minute later then the current time:
// Set start time to the current time. runtime.sendEvent(new CurrentTimeEvent(System.currentTimeMillis())); // Create a statement. epService.getEPAdministrator().createEPL("select * from pattern[timer:interval(1 minute)]"); // Print next schedule time System.out.println("Next schedule at " + new Date(runtime.getNextScheduledTime());
Consider the following example:
// Set start time to Jan.1, 2010, 00:00 am for this example SimpleDateFormat format = new SimpleDateFormat("yyyy MM dd HH:mm:ss SSS"); Date startTime = format.parse("2010 01 01 00:00:00 000"); runtime.sendEvent(new CurrentTimeEvent(startTime.getTime())); // Create a statement. EPStatement stmt = epService.getEPAdministrator().createEPL("select current_timestamp() as ct " + "from pattern[every timer:interval(1 minute)]"); stmt.addListener(...); // add a listener // Advance time to 10 minutes after start time runtime.sendEvent(new CurrentTimeSpanEvent(startTime.getTime() + 10*60*1000));
To advance time according to a given resolution, you may provide the resolution as shown below:
// Advance time to 10 minutes after start time at 100 msec resolution runtime.sendEvent(new CurrentTimeSpanEvent(startTime.getTime() + 10*60*1000, 100));
The minimum resolution that all data windows, patterns and output rate limiting operate at is the millisecond.
Parameters to time window views, pattern operators or the output
clause that are less then 1 millisecond are not allowed. As stated earlier, the default frequency at which the internal timer operates is 100 milliseconds (configurable).
The internal timer thread, by default, uses the call System.currentTimeMillis()
to obtain system time. Please see the JIRA issue ESPER-191 Support nano/microsecond resolution for more information on Java system time-call performance, accuracy and drift.
The internal timer thread can be configured to use nano-second time as returned by System.nanoTime()
. If configured for nano-second time, the engine computes an offset of the nano-second ticks to wall clock time upon startup to present back an accurate millisecond wall clock time.
Please see section Section 16.4.19, “Engine Settings related to Time Source” to configure the internal timer thread to use System.nanoTime()
.
The internal timer is based on java.util.concurrent.ScheduledThreadPoolExecutor
(java.util.Timer
does not support high accuracy VM time).
Your application can achieve a higher tick rate then 1 tick per millisecond by sending external timer events that carry a long-value which is not based on milliseconds since January 1, 1970, 00:00:00 GMT. In this case, your time interval parameters need to take consideration of the changed use of engine time.
Thus, if your external timer events send long values that represents microseconds (1E-6 sec), then your time window interval must be 1000-times larger, i.e. "win:time(1000)" becomes a 1-second time window.
And therefore, if your external timer events send long values that represents nanoseconds (1E-9 sec), then your time window interval must be 1000000-times larger, i.e. "win:time(1000000)" becomes a 1-second time window.
In the default configuration, isolated service is disabled and not available. This is because there is a small overhead associated with this feature. Please review Section 16.4.24.7, “Allow Isolated Service Provider” for the configuration setting.
As discussed before, a single Java Virtual Machine may hold multiple Esper engine instances unique by engine URI. Within an Esper engine instance the default execution environment for statements is the EPRuntime
engine runtime, which coordinates all statement's reaction to incoming events and to time passing (via internal or external timer).
Subordinate to an Esper engine instance, your application can additionally allocate multiple isolated services (or execution environments), uniquely identified by a name and represented by the EPServiceProviderIsolated
interface. In the isolated service, time passes only when you application sends timer events to the EPRuntimeIsolated
instance. Only events explicitly sent to the isolated service are visible to statements added.
Your application can create new statements that start in an isolated service. You can also move existing statements back and forth between the engine and an isolated service.
Isolation does not apply to variables: Variables are global in nature. Also, as named windows are globally visibly data windows, consumers to named windows see changes in named windows even though a consumer or the named window (through the create statement) may be in an isolated service. Tables are also globally visible.
An isolated service allows an application to:
Suspend a statement without loosing its statement state that may have accumulated for the statement.
Control the concept of time separately for a set of statements, for example to simulate, backtest, adjust arrival order or compute arrival time.
Initialize statement state by replaying events, without impacting already running statements, to catch-up statements from historical events for example.
While a statement resides in an isolated runtime it receives only those events explicitly sent to the isolated runtime, and performs time-based processing based on the timer events provided to that isolated runtime.
Use the getEPServiceIsolated
method on EPServiceProvider
passing a name to obtain an isolated runtime:
EPServiceProviderIsolated isolatedService = epServiceManager.getEPServiceIsolated("name");
Set the start time for your isolated runtime via the CurrentTimeEvent
timer event:
// In this example start the time at the system time long startInMillis = System.currentTimeMillis(); isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(startInMillis));
Use the addStatement
method on EPAdministratorIsolated
to move an existing statement out of the engine runtime into the isolated runtime:
// look up the existing statement EPStatement stmt = epServiceManager.getEPAdministrator().getStatement("MyStmt"); // move it to an isolated service isolatedService.getEPAdministrator().addStatement(stmt);
To remove the statement from isolation and return the statement back to the engine runtime, use the removeStatement
method on EPAdministratorIsolated
:
isolatedService.getEPAdministrator().removeStatement(stmt);
To create a new statement in the isolated service, use the createEPL
method on EPAdministratorIsolated
:
isolatedService.getEPAdministrator().createEPL( "@Name('MyStmt') select * from Event", null, null); // the example is passing the statement name in an annotation and no user object
The destroy
method on EPServiceProviderIsolated
moves all currently-isolated statements for that isolated service provider back to engine runtime.
When moving a statement between engine runtime and isolated service or back, the algorithm ensures that events are aged according to the time that passed and time schedules stay intact.
To use isolated services, your configuration must have view sharing disabled as described in Section 16.4.12.1, “Sharing View Resources between Statements”.
First, let's create a statement and send events:
EPStatement stmt = epServiceManager.getEPAdministrator().createEPL("select * from TemperatureEvent.win:time(30)"); epServiceManager.getEPRuntime().send(new TemperatureEvent(...)); // send some more events over time
The steps to suspend the previously created statement are as follows:
EPServiceProviderIsolated isolatedService = epServiceManager.getEPServiceIsolated("suspendedStmts"); isolatedService.getEPAdministrator().addStatement(stmt);
To resume the statement, move the statement back to the engine:
isolatedService.getEPAdministrator().removeStatement(stmt);
EPServiceProviderIsolated isolatedService = epServiceManager.getEPServiceIsolated("suspendedStmts"); isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(myStartTime));
EPStatement stmt = epAdmin.createEPL( "select * from pattern[every a=TemperatureEvent -> b=TemperatureEvent where timer:within(60)]");
isolatedService.getEPRuntime().sendEvent(new CurrentTimeEvent(currentTime)); isolatedService.getEPRuntime().send(historyEvent); // repeat the above advancing time until no more events
Finally, when done replaying events, merge the statement back with the engine:
isolatedService.getEPAdministrator().removeStatement(stmt);
For example, assume the below two statements named A and B:
@Name('A') insert into MyStream select * from MyEvent @Name('B') select * from MyStream
For example, assume the below three statements named A, B and C:
@Name('A') create window MyNamedWindow.win:time(60) as select * from MyEvent @Name('B') insert into MyNamedWindow select * from MyEvent @Name('C') select * from MyNamedWindow
You may register one or more exception handlers for the engine to invoke in the case it encounters an exception processing a continuously-executing statement. By default and without exception handlers the engine cancels execution of the current EPL statement that encountered the exception, logs the exception and continues to the next statement, if any. The configuration is described in Section 16.4.25, “Engine Settings related to Exception Handling”.
If your application registers exception handlers as part of engine configuration, the engine invokes the exception handlers in the order they are registered passing relevant exception information such as EPL statement name, expression and the exception itself.
Exception handlers receive any EPL statement unchecked exception such as internal exceptions or exceptions thrown by plug-in aggregation functions or plug-in views. The engine does not provide to exception handlers any exceptions thrown by static method invocations for function calls, method invocations in joins, methods on event classes and listeners or subscriber exceptions.
An exception handler can itself throw a runtime exception to cancel execution of the current event against any further statements.
For on-demand queries the API indicates any exception directly back to the caller without the exception handlers being invoked, as exception handlers apply to continuous queries only. The same applies to any API calls other than sendEvent
and the EventSender
methods.
As the configuration section describes, your application registers one or more classes that implement the ExceptionHandlerFactory
interface in the engine configuration. Upon engine initialization the engine obtains a factory instance from the class name that then provides the exception handler instance. The exception handler class must implement the ExceptionHandler
interface.
You may register one or more condition handlers for the engine to invoke in the case it encounters certain conditions, as outlined below, when executing a statement. By default and without condition handlers the engine logs the condition at informational level and continues processing. The configuration is described in Section 16.4.26, “Engine Settings related to Condition Handling”.
If your application registers condition handlers as part of engine configuration, the engine invokes the condition handlers in the order they are registered passing relevant condition information such as EPL statement name, expression and the condition information itself.
Currently the only conditions indicated by this facility are raised by the pattern followed-by operator, see Section 7.5.8.1, “Limiting Sub-Expression Count” and see Section 7.5.8.2, “Limiting Engine-wide Sub-Expression Count”.
A condition handler may not itself throw a runtime exception or return any value.
As the configuration section describes, your application registers one or more classes that implement the ConditionHandlerFactory
interface in the engine configuration. Upon engine initialization the engine obtains a factory instance from the class name that then provides the condition handler instance. The condition handler class must implement the ConditionHandler
interface.
The statement object model is a set of classes that provide an object-oriented representation of an EPL or pattern statement. The object model classes are found in package com.espertech.esper.client.soda
. An instance of EPStatementObjectModel
represents a statement's object model.
The statement object model classes are a full and complete specification of a statement. All EPL and pattern constructs including expressions and sub-queries are available via the statement object model.
In conjunction with the administrative API, the statement object model provides the means to build, change or interrogate statements beyond the EPL or pattern syntax string representation. The object graph of the statement object model is fully navigable for easy querying by code, and is also serializable allowing applications to persist or transport statements in object form, when required.
The statement object model supports full round-trip from object model to EPL statement string and back to object model: A statement object model can be rendered into an EPL string representation via the toEPL
method on EPStatementObjectModel
. Further, the administrative API allows to compile a statement string into an object model representation via the compileEPL
method on EPAdministrator
.
The statement object model is fully mutable. Mutating a any list such as returned by getChildren()
, for example, is acceptable and supported.
The create
method on EPAdministrator
creates and starts a statement as represented by an object model. In order to obtain an object model from an existing statement, obtain the statement expression text of the statement via the getText
method on EPStatement
and use the compileEPL
method to obtain the object model.
The following limitations apply:
Statement object model classes are not safe for sharing between threads other than for read access.
Between versions of Esper, the serialized form of the object model is subject to change. Esper makes no guarantees that the serialized object model of one version will be fully compatible with the serialized object model generated by another version of Esper. Please consider this issue when storing Esper object models in persistent store.
Part of the statement object model package are convenient builder classes that make it easy to build a new object model or change an existing object model. The SelectClause
and FromClause
are such builder classes and provide convenient create
methods.
Within the from-clause we have a choice of different streams to select on. The FilterStream
class represents a stream that is filled by events of a certain type and that pass an optional filter expression.
We can use the classes introduced above to create a simple statement object model:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setSelectClause(SelectClause.createWildcard()); model.setFromClause(FromClause.create(FilterStream.create("com.chipmaker.ReadyEvent")));
The model as above is equivalent to the EPL :
select * from com.chipmaker.ReadyEvent
Last, the code snippet below creates a statement from the object model:
EPStatement stmt = epService.getEPAdministrator().create(model);
Notes on usage:
Variable names can simply be treated as property names.
When selecting from named windows or tables, the name of the named window or table is the event type name for use in FilterStream
instances or patterns.
To compile an arbitrary sub-expression text into an Expression
object representation, simply add the expression text to a where
clause,
compile the EPL string into an object model via the compileEPL
on EPAdministrator
, and obtain the compiled where
from the EPStatementObjectModel
via the getWhereClause
method.
In the next example we add a simple where-clause to the EPL as shown earlier:
select * from com.chipmaker.ReadyEvent where line=8
And the code to add a where-clause to the object model is below.
model.setWhereClause(Expressions.eq("line", 8));
select * from com.chipmaker.ReadyEvent where (line=8) or (line=10 and age<5)
The code for building such a where-clause by means of the object model classes is:
model.setWhereClause(Expressions.or() .add(Expressions.eq("line", 8)) .add(Expressions.and() .add(Expressions.eq("line", 10)) .add(Expressions.lt("age", 5)) ));
For instance, consider the following pattern statement.
select * from pattern [every a=MyAEvent and not b=MyBEvent]
EPStatementObjectModel model = new EPStatementObjectModel(); model.setSelectClause(SelectClause.createWildcard()); PatternExpr pattern = Patterns.and() .add(Patterns.everyFilter("MyAEvent", "a")) .add(Patterns.notFilter("MyBEvent", "b")); model.setFromClause(FromClause.create(PatternStream.create(pattern)));
insert into ReadyStreamAvg(line, avgAge) select line, avg(age) as avgAge from com.chipmaker.ReadyEvent(line in (1, 8, 10)).win:time(10) as RE where RE.waverId != null group by line having avg(age) < 0 output every 10.0 seconds order by line
Finally, this code snippet builds the above statement from scratch:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setInsertInto(InsertIntoClause.create("ReadyStreamAvg", "line", "avgAge")); model.setSelectClause(SelectClause.create() .add("line") .add(Expressions.avg("age"), "avgAge")); Filter filter = Filter.create("com.chipmaker.ReadyEvent", Expressions.in("line", 1, 8, 10)); model.setFromClause(FromClause.create( FilterStream.create(filter, "RE").addView("win", "time", 10))); model.setWhereClause(Expressions.isNotNull("RE.waverId")); model.setGroupByClause(GroupByClause.create("line")); model.setHavingClause(Expressions.lt(Expressions.avg("age"), Expressions.constant(0))); model.setOutputLimitClause(OutputLimitClause.create(OutputLimitSelector.DEFAULT, Expressions.timePeriod(null, null, null, 10.0, null))); model.setOrderByClause(OrderByClause.create("line"));
This sample statement creates a variable:
create variable integer var_output_rate = 10
The code to build the above statement using the object model:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setCreateVariable(CreateVariableClause.create("integer", "var_output_rate", 10)); epService.getEPAdministrator().create(model);
A second statement sets the variable to a new value:
on NewValueEvent set var_output_rate = new_rate
The code to build the above statement using the object model:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setOnExpr(OnClause.createOnSet("var_output_rate", Expressions.property("new_rate"))); model.setFromClause(FromClause.create(FilterStream.create("NewValueEvent"))); EPStatement stmtSet = epService.getEPAdministrator().create(model);
This sample statement creates a named window:
create window OrdersTimeWindow.win:time(30 sec) as select symbol as sym, volume as vol, price from OrderEvent
The is the code that builds the create-window statement as above:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setCreateWindow(CreateWindowClause.create("OrdersTimeWindow").addView("win", "time", 30)); model.setSelectClause(SelectClause.create() .addWithName("symbol", "sym") .addWithName("volume", "vol") .add("price")); model.setFromClause(FromClause.create(FilterStream.create("OrderEvent)));
A second statement deletes from the named window:
on NewOrderEvent as myNewOrders delete from OrdersNamedWindow as myNamedWindow where myNamedWindow.symbol = myNewOrders.symbol
EPStatementObjectModel model = new EPStatementObjectModel(); model.setOnExpr(OnClause.createOnDelete("OrdersNamedWindow", "myNamedWindow")); model.setFromClause(FromClause.create(FilterStream.create("NewOrderEvent", "myNewOrders"))); model.setWhereClause(Expressions.eqProperty("myNamedWindow.symbol", "myNewOrders.symbol")); EPStatement stmtOnDelete = epService.getEPAdministrator().create(model);
on QueryEvent(volume>0) as query select count(*) from OrdersNamedWindow as win where win.symbol = query.symbol
The on-select statement is built from scratch via the object model as follows:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setOnExpr(OnClause.createOnSelect("OrdersNamedWindow", "win")); model.setWhereClause(Expressions.eqProperty("win.symbol", "query.symbol")); model.setFromClause(FromClause.create(FilterStream.create("QueryEvent", "query", Expressions.gt("volume", 0)))); model.setSelectClause(SelectClause.create().add(Expressions.countStar())); EPStatement stmtOnSelect = epService.getEPAdministrator().create(model);
?:name
String stmt = "select * from com.chipmaker.ReadyEvent(line=?)"; EPPreparedStatement prepared = epService.getEPAdministrator().prepareEPL(stmt); prepared.setObject(1, 8); EPStatement statement = epService.getEPAdministrator().create(prepared);
The next example names the substitution parameter:
String stmt = "select * from ReadyEvent(line=?:lines/line1)"; EPPreparedStatement prepared = epService.getEPAdministrator().prepareEPL(stmt); prepared.setObject("lines/line1", 1);
The engine can report key processing metrics through the JMX platform mbean server by setting a single configuration flag described in Section 16.4.20, “Engine Settings related to JMX Metrics”. For additional detailed reporting and metrics events, please read on.
Metrics reporting is a feature that allows an application to receive ongoing reports about key engine-level and statement-level metrics. Examples are the number of incoming events, the CPU time and wall time taken by statement executions or the number of output events per statement.
Metrics reporting is, by default, disabled. To enable reporting, please follow the steps as outlined in Section 16.4.21, “Engine Settings related to Metrics Reporting”. Metrics reporting must be enabled at engine initialization time. Reporting intervals can be controlled at runtime via the ConfigurationOperations
interface available from the administrative API.
Your application can receive metrics at configurable intervals via EPL statement. A metric datapoint is simply a well-defined event. The events are EngineMetric
and StatementMetric
and the Java class representing the events can be found in the client API in package com.espertech.esper.client.metric
.
Since metric events are processed by the engine the same as application events, your EPL may use any construct on such events. For example, your application may select, filter, aggregate properties, sort or insert into a stream, named window or table all metric events the same as application events.
This example statement selects all engine metric events:
select * from com.espertech.esper.client.metric.EngineMetric
The next statement selects all statement metric events:
select * from com.espertech.esper.client.metric.StatementMetric
Make sure to have metrics reporting enabled since only then do listeners or subscribers to a statement such as above receive metric events.
The engine provides metric events after the configured interval of time has passed. By default, only started statements that have activity within an interval (in the form of event or timer processing) are reported upon.
The default configuration performs the publishing of metric events in an Esper daemon thread under the control of the engine instance. Metrics reporting honors externally-supplied time, if using external timer events.
Via runtime configuration options provided by ConfigurationOperations
, your application may enable and disable metrics reporting globally, provided that metrics reporting was enabled at initialization time. Your application may also enable and disable metrics reporting for individual statements by statement name.
Statement groups is a configuration feature that allows to assign reporting intervals to statements. Statement groups are described further in the Section 16.4.21, “Engine Settings related to Metrics Reporting” section. Statement groups cannot be added or removed at runtime.
The following limitations apply:
If your Java VM version does not report current thread CPU time (most JVM do), then CPU time is reported as zero (use ManagementFactory.getThreadMXBean().isCurrentThreadCpuTimeSupported()
to determine if your JVM supports this feature).
Note: In some JVM the accuracy of CPU time returned is very low (in the order of 10 milliseconds off) which can impact the usefulness of CPU metrics returned. Consider measuring CPU time in your application thread after sending a number of events in the same thread, external to the engine as an alternative.
Your Java VM may not provide high resolution time via System.nanoTime
. In such case wall time may be inaccurate and inprecise.
CPU time and wall time have nanosecond precision but not necessarily nanosecond accuracy, please check with your Java VM provider.
There is a performance cost to collecting and reporting metrics.
Not all statements may report metrics: The engine performs certain runtime optimizations sharing resources between similar statements, thereby not reporting on certain statements unless resource sharing is disabled through configuration.
JSONEventRenderer jsonRenderer = epService.getEPRuntime(). getEventRenderer().getJSONRenderer(statement.getEventType());
String jsonEventText = jsonRenderer.render("MyEvent", event);
The XML renderer works the same:
XMLEventRenderer xmlRenderer = epService.getEPRuntime(). getEventRenderer().getXMLRenderer(statement.getEventType());
String xmlEventText = xmlRenderer.render("MyEvent", event);
String json = epService.getEPRuntime().getEventRenderer().renderJSON(event); String xml = epService.getEPRuntime().getEventRenderer().renderXML(event);
As discussed in Section 5.2.7, “Annotation” an EPL annotation is an addition made to information in an EPL statement. The API and examples to interrogate annotations are described here.
You may use the getAnnotations
method of EPStatement
to obtain annotations specified for an EPL statement. Or when compiling an EPL expression to a EPStatementObjectModel
statement object model you may also query, change or add annotations.
The following example code demonstrates iterating over an EPStatement
statement's annotations and retrieving values:
String exampleEPL = "@Tag(name='direct-output', value='sink 1') select * from RootEvent"; EPStatement stmt = epService.getEPAdministrator().createEPL(exampleEPL); for (Annotation annotation : stmt.getAnnotations()) { if (annotation instanceof Tag) { Tag tag = (Tag) annotation; System.out.println("Tag name " + tag.name() + " value " + tag.value()); } }
The output of the sample code shown above is Tag name direct-output value sink 1
.
This chapter discusses how to select context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions and the reasons for context partition selection are introduced in Section 4.9, “Operations on Specific Context Partitions”.
The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts. The section uses a category context for the purpose of illustration. The API discussed herein is general and handles all different types of contexts including nested contexts.
Consider a category context that separates bank transactions into small, medium and large:
// declare category context create context TxnCategoryContext group by amount < 100 as small, group by amount between 100 and 1000 as medium, group by amount > 1000 as large from BankTxn
// retain 1 minute of events of each category separately context TxnCategoryContext select * from BankTxn.win:time(1 minute)
In order for your application to iterate one or more specific categories it is necessary to identify which category, i.e. which context partition, to iterate. Similarly for on-demand queries, to execute on-demand queries against one or more specific categories, it is necessary to identify which context partition to execute the on-demand query against.
Your application may iterate one or more specific context partitions using either the iterate
or safeIterate
method of EPStatement
by providing an implementation of the ContextPartitionSelector
interface.
For example, assume your application must obtain all bank transactions for small amounts. It may use the API to identify the category and iterate the associated context partition:
ContextPartitionSelectorCategory categorySmall = new ContextPartitionSelectorCategory() { public Set<String> getLabels() { return Collections.singleton("small"); } }; Iterator<EventBean> it = stmt.iterator(categorySmall);
Your application may execute on-demand queries against one or more specific context partitions by using the executeQuery
method on EPRuntime
or the execute
method on EPOnDemandPreparedQuery
and by providing an implementation of ContextPartitionSelector
.
On-demand queries execute against named windows and tables, therefore below EPL statement creates a named window which the engine manages separately for small, medium and large transactions according to the context declared earlier:
// Named window per category context TxnCategoryContext create window BankTxnWindow.win:time(1 min) as BankTxn
The following code demonstrates how to fire an on-demand query against the small and the medium category:
ContextPartitionSelectorCategory categorySmallMed = new ContextPartitionSelectorCategory() { public Set<String> getLabels() { return new HashSet<String>(Arrays.asList("small", "medium")); } }; epService.getEPRuntime().executeQuery( "select count(*) from BankTxnWindow", new ContextPartitionSelector[] {categorySmallMed});
The following limitations apply:
On-demand queries may not join named windows or tables that declare a context.
This chapter briefly discusses the API to manage context partitions. Contexts are discussed in Chapter 4, Context and Context Partitions.
The section is only relevant when you declare a context. It applies to all different types of hash, partitioned, category, overlapping or other temporal contexts.
The administrative API for context partitions is EPContextPartitionAdmin
. Use the getContextPartitionAdmin
method of the EPAdministrator
interface
to obtain said service.
The context partition admin API allows an application to:
Start, stop and destroy individual context partitions.
Interrogate the state and identifiers for existing context partitions.
Determine statements associated to a context and context nesting level.
Stopping individual context partitions is useful to drop state, free memory and suspend a given context partition without stopping or destroying any associated statements. For example, assume a keyed segmented context per user id. To suspend and free the memory for a given user id your application can stop the user id's context partition. The engine does not allocate a context partition for this user id again, until your application destroys or starts the context partition.
Destroying individual context partitions is useful to drop state, free memory and deregister the given context partition without stopping or destroying any associated statements. For example, assume a keyed segmented context per user id. To deregister and free the memory for a given user id your application can destroy the user id's context partition. The engine can allocate a fresh context partition for this user id when events for this user id arrive.
Please see the JavaDoc documentation for more information.
Esper offers a listener and an assertions class to facilitate automated testing of EPL rules, for example when using a test framework such as JUnit
or TestNG
.
Esper does not require any specific test framework. If your application has the JUnit
test framework in classpath Esper uses junit.framework.AssertionFailedError
to indicate assertion errors, so as to integrate with continuous integration tools.
For detailed method-level information, please consult the JavaDoc of the package com.espertech.esper.client.scopetest
.
The class com.espertech.esper.client.scopetest.EPAssertionUtil
provides methods to assert or compare event property values as well as perform various array arthithmatic, sort events and convert events or iterators to arrays.
The class com.espertech.esper.client.scopetest.SupportUpdateListener
provides an UpdateListener
implementation that collects events and returns event data for assertion.
The class com.espertech.esper.client.scopetest.SupportSubscriber
provides a subscriber implementation that collects events and returns event data for assertion. The SupportSubscriberMRD
is a subscriber that accepts events multi-row delivery. The SupportSubscriber
and SupportSubscriberMRD
work similar to SupportUpdateListener
that is introduced in more detail below.
String epl = "select personName, count(*) as cnt from PersonEvent.win:length(3) group by personName"; EPStatement stmt = epService.getEPAdministrator().createEPL(epl); SupportUpdateListener listener = new SupportUpdateListener(); stmt.addListener(listener); epService.getEPRuntime().sendEvent(new PersonEvent("Joe")); EPAssertionUtil.assertProps(listener.assertOneGetNewAndReset(), "personName,cnt".split(","), new Object[]{"Joe", 1L});
A few additional examples are shown below:
String[] fields = new String[] {"property"}; EPAssertionUtil.assertPropsPerRow(listener.getAndResetDataListsFlattened(), fields, new Object[][]{{"E2"}}, new Object[][]{{"E1"}});
EPAssertionUtil.assertPropsPerRow(listener.getAndResetLastNewData(), fields, new Object[][]{{"E1"}, {"E2"}, {"E3"}});
assertTrue(listener.getAndClearIsInvoked());