esper.codehaus.org and espertech.comDocumentation
As outlined in Chapter 14, API Reference the interface for listeners is com.espertech.esper.client.UpdateListener
. Implementations must provide a single update
method that the engine invokes when results become available:
A second, strongly-typed and native, highly-performant method of result delivery is provided: A subscriber object is a direct binding of query results to a Java object. The object, a POJO, receives statement results via method invocation. The subscriber class need not implement an interface or extend a superclass. Please see Section 14.3.3, “Setting a Subscriber Object”.
The engine provides statement results to update listeners by placing results in com.espertech.esper.client.EventBean
instances. A typical listener implementation queries the EventBean
instances via getter methods to obtain the statement-generated results.
The get
method on the EventBean
interface can be used to retrieve result columns by name. The property name supplied to the get
method can also be used to query nested, indexed or array properties of object graphs as discussed in more detail in Chapter 2, Event Representations and Section 14.6, “Event and Event Type”
The getUnderlying
method on the EventBean
interface allows update listeners to obtain the underlying event object. For wildcard selects, the underlying event is the event object that was sent into the engine via the sendEvent
method.
For joins and select clauses with expressions, the underlying object implements java.util.Map
.
In this section we look at the output of a very simple EPL statement. The statement selects an event stream without using a data window and without applying any filtering, as follows:
select * from Withdrawal
This statement selects all Withdrawal
events. Every time the engine processes an event of type Withdrawal
or any sub-type of Withdrawal
, it invokes all update listeners, handing the new event to each of the statement's listeners.
The term insert stream denotes the new events arriving, and entering a data window or aggregation. The insert stream in this example is the stream of arriving Withdrawal events, and is posted to listeners as new events.
The diagram below shows a series of Withdrawal events 1 to 6 arriving over time. The number in parenthesis is the withdrawal amount, an event property that is used in the examples that discuss filtering.
The example statement above results in only new events and no old events posted by the engine to the statement's listeners.
A length window instructs the engine to only keep the last N events for a stream. The next statement applies a length window onto the Withdrawal event stream. The statement serves to illustrate the concept of data window and events entering and leaving a data window:
select * from Withdrawal.win:length(5)
The size of this statement's length window is five events. The engine enters all arriving Withdrawal events into the length window. When the length window is full, the oldest Withdrawal event is pushed out the window. The engine indicates to listeners all events entering the window as new events, and all events leaving the window as old events.
While the term insert stream denotes new events arriving, the term remove stream denotes events leaving a data window, or changing aggregation values. In this example, the remove stream is the stream of Withdrawal events that leave the length window, and such events are posted to listeners as old events.
The next diagram illustrates how the length window contents change as events arrive and shows the events posted to an update listener.
As before, all arriving events are posted as new events to listeners. In addition, when event W1 leaves the length window on arrival of event W6, it is posted as an old event to listeners.
Similar to a length window, a time window also keeps the most recent events up to a given time period. A time window of 5 seconds, for example, keeps the last 5 seconds of events. As seconds pass, the time window actively pushes the oldest events out of the window resulting in one or more old events posted to update listeners.
istream
, irstream
and rstream
keywords on select-clauses and on insert-into clauses to control which stream to deliver, see Section 5.3.7, “Selecting insert and remove stream events”. There is also a related, engine-wide configuration setting described in Section 15.4.17, “Engine Settings related to Stream Selection”.
select * from Withdrawal(amount>=200).win:length(5)
With the filter, any Withdrawal events that have an amount of less then 200 do not enter the length window and are therefore not passed to update listeners. Filters are discussed in more detail in Section 5.4.1, “Filter-based Event Streams” and Section 6.4, “Filter Expressions In Patterns”.
The where-clause and having-clause in statements eliminate potential result rows at a later stage in processing, after events have been processed into a statement's data window or other views.
The next statement applies a where-clause to Withdrawal events. Where-clauses are discussed in more detail in Section 5.5, “Specifying Search Conditions: the Where Clause”.
select * from Withdrawal.win:length(5) where amount >= 200
The where-clause applies to both new events and old events. As the diagram below shows, arriving events enter the window however only events that pass the where-clause are handed to update listeners. Also, as events leave the data window, only those events that pass the conditions in the where-clause are posted to listeners as old events.
The where-clause can contain complex conditions while event stream filters are more restrictive in the type of filters that can be specified. The next statement's where-clause applies the ceil
function of the java.lang.Math
Java library class in the where clause. The insert-into clause makes the results of the first statement available to the second statement:
insert into WithdrawalFiltered select * from Withdrawal where Math.ceil(amount) >= 200
select * from WithdrawalFiltered
A time window is a moving window extending to the specified time interval into the past based on the system time. Time windows enable us to limit the number of events considered by a query, as do length windows.
As a practical example, consider the need to determine all accounts where the average withdrawal amount per account for the last 4 seconds of withdrawals is greater then 1000. The statement to solve this problem is shown below.
select account, avg(amount) from Withdrawal.win:time(4 sec) group by account having amount > 1000
The next diagram serves to illustrate the functioning of a time window. For the diagram, we assume a query that simply selects the event itself and does not group or filter events.
select * from Withdrawal.win:time(4 sec)
The diagram starts at a given time t
and displays the contents of the time window at t + 4
and t + 5 seconds
and so on.
The activity as illustrated by the diagram:
At time t + 4 seconds
an event W1
arrives and enters the time window. The engine reports the new event to update listeners.
At time t + 5 seconds
an event W2
arrives and enters the time window. The engine reports the new event to update listeners.
At time t + 6.5 seconds
an event W3
arrives and enters the time window. The engine reports the new event to update listeners.
At time t + 8 seconds
event W1
leaves the time window. The engine reports the event as an old event to update listeners.
The time batch view buffers events and releases them every specified time interval in one update. Time windows control the evaluation of events, as does the length batch window.
The next diagram serves to illustrate the functioning of a time batch view. For the diagram, we assume a simple query as below:
select * from Withdrawal.win:time_batch(4 sec)
The diagram starts at a given time t
and displays the contents of the time window at t + 4
and t + 5 seconds
and so on.
The activity as illustrated by the diagram:
At time t + 1 seconds
an event W1
arrives and enters the batch. No call to inform update listeners occurs.
At time t + 3 seconds
an event W2
arrives and enters the batch. No call to inform update listeners occurs.
At time t + 4 seconds
the engine processes the batched events and a starts a new batch. The engine reports events W1
and W2
to update listeners.
At time t + 6.5 seconds
an event W3
arrives and enters the batch. No call to inform update listeners occurs.
At time t + 8 seconds
the engine processes the batched events and a starts a new batch. The engine reports the event W3
as new data to update listeners. The engine reports the events W1
and W2
as old data (prior batch) to update listeners.
The built-in data windows that act on batches of events are the win:time_batch
and the win:length_batch
views, among others. The win:time_batch
data window collects events arriving during a given time interval and posts collected events as a batch to listeners at the end of the time interval. The win:length_batch
data window collects a given number of events and posts collected events as a batch to listeners when the given number of events has collected.
Related to batch data windows is output rate limiting. While batch data windows retain events the output
clause offered by output rate limiting can control or stabilize the rate at which events are output, see Section 5.7, “Stabilizing and Controlling Output: the Output Clause”.
Let's look at how a time batch window may be used:
select account, amount from Withdrawal.win:time_batch(1 sec)
The above statement collects events arriving during a one-second interval, at the end of which the engine posts the collected events as new events (insert stream) to each listener. The engine posts the events collected during the prior batch as old events (remove stream). The engine starts posting events to listeners one second after it receives the first event and thereon.
For statements containing aggregation functions and/or a group by
clause, the engine posts consolidated aggregation results for an event batch. For example, consider the following statement:
select sum(amount) as mysum from Withdrawal.win:time_batch(1 sec)
Note that output rate limiting also generates batches of events following the output model as discussed here.
Consider the following statement that alerts when 2 Withdrawal events have been received:
select count(*) as mycount from Withdrawal having count(*) = 2
select istream count(*) as mycount from Withdrawal having count(*) = 2
The documentation provides output examples for query types in Appendix A, Output Reference and Samples, and the next sections outlines each query type.
An example statement for the un-aggregated and un-grouped case is as follows:
select * from Withdrawal.win:time_batch(1 sec)
The appendix provides a complete example including input and output events over time at Section A.2, “Output for Un-aggregated and Un-grouped Queries”
select sum(amount) from Withdrawal.win:time_batch(1 sec)
The appendix provides a complete example including input and output events over time at Section A.3, “Output for Fully-aggregated and Un-grouped Queries”
select account, sum(amount) from Withdrawal.win:time_batch(1 sec)
The appendix provides a complete example including input and output events over time at Section A.4, “Output for Aggregated and Un-grouped Queries”
select account, sum(amount) from Withdrawal.win:time_batch(1 sec) group by account
The appendix provides a complete example including input and output events over time at Section A.5, “Output for Fully-aggregated and Grouped Queries”
An event sent by your application or generated by statements is visible to all other statements in the same engine instance. Similarly, current time (the time horizon) moves forward for all statements in the same engine instance. Please see the Chapter 14, API Reference chapter for how to send events and how time moves forward through system time or via simulated time, and the possible threading models.
Within an Esper engine instance you can additionally control event visibility and current time on a statement level, under the term isolated service as described in Section 14.10, “Service Isolation”.
An isolated service provides a dedicated execution environment for one or more statements. Events sent to an isolated service are visible only within that isolated service. In the isolated service you can move time forward at the pace and resolution desired without impacting other statements that reside in the engine runtime or other isolated services. You can move statements between the engine and an isolated service.