Class OpenWFE::ListenExpression
In: lib/openwfe/expressions/fe_listen.rb
Parent: FlowExpression

The "listen" expression can be viewed in two ways :

1) It‘s a hook into the participant map to intercept apply or reply operations on participants.

2) It allows OpenWFE[ru] to be a bit closer to the ‘ideal’ process-calculus world (en.wikipedia.org/wiki/Process_calculi)

Anyway…

    <listen to="alice">
        <subprocess ref="notify_bob" />
    </listen>

Whenever a workitem is dispatched (applied) to the participant named "alice", the subprocess named "notify_bob" is triggered (once).

    listen :to => "^channel_.*", :upon => "reply" do
        sequence do
            participant :ref => "delta"
            participant :ref => "echo"
        end
    end

After the listen has been applied, the first workitem coming back from a participant whose named starts with "channel_" will trigger a sequence with the participants ‘delta’ and ‘echo’.

    listen :to => "alpha", :where => "${f:color} == red" do
        participant :ref => "echo"
    end

Will send a copy of the first workitem meant for participant "alpha" to participant "echo" if this workitem‘s color field is set to ‘red’.

    listen :to => "alpha", :once => "false" do
        send_email_to_stakeholders
    end

This is some kind of a server : each time a workitem is dispatched to participant "alpha", the subprocess (or participant) named ‘send_email_to_stakeholders’) will receive a copy of that workitem. Use with care.

    listen :to => "alpha", :once => "false", :timeout => "1M2w" do
        send_email_to_stakeholders
    end

The ‘listen’ expression understands the ‘timeout’ attribute. It can thus be instructed to stop listening after a certain amount of time (here, after one month and two weeks).

Since OpenWFEru 0.9.16, the listen expression is usable without a child expression. It blocks until a valid messages comes in the channel, at which point it resumes the process, with workitem that came as the message (not with the workitem at apply time). (no merge implemented for now).

    sequence do
        listen :to => "channel_z", :upon => :reply
        participant :the_rest_of_the_process
    end

In this example, the process will block until a workitem comes for a participant named ‘channel_z’.

Note that since OpenWFEru 0.9.16, the engine accept (in its reply() method) workitems that don‘t belong to a process intance (ie workitems that have a nil flow_expression_id). So it‘s entirely feasible to send ‘notifications only’ workitems to the OpenWFEru engine. (see openwferu.rubyforge.org/svn/trunk/openwfe-ruby/test/ft_54b_listen.rb)

Since OpenWFEru 0.9.16, this expression has been aliased ‘intercept’ and ‘receive’. It also accepts the ‘on’ parameter as an alias parameter to the ‘to’ parameter. Think "listen to" and "receive on".

A ‘merge’ attribute was added as well in 0.9.16, if set to true (the default value being false), the incoming workitem will be merged with a copy of the workitem that ‘applied’ (activated) the listen expression.

Methods

Included Modules

TimeoutMixin ConditionMixin MergeMixin

Attributes

applied_workitem  [RW]  ‘listen’ accepts a :merge attribute, when set to true, this field will contain a copy of the workitem that activated the listen activity. This copy will be merged with incoming (listened for) workitems when triggering the listen child.
call_count  [RW]  how many messages were received (can more than 0 or 1 if ‘once’ is set to false).
once  [RW]  is set to true if the expression listen to 1! workitem and then replies to its parent. When set to true, it listens until the process it belongs to terminates. The default value is true.
participant_regex  [RW]  the channel on which this expression ‘listens‘
upon  [RW]  can take :apply or :reply as a value, if not set (nil), will listen on both ‘directions’.

Public Instance methods

[Source]

     # File lib/openwfe/expressions/fe_listen.rb, line 174
174:         def apply (workitem)
175: 
176:             #if @children.size < 1
177:             #    reply_to_parent workitem
178:             #    return
179:             #end
180:                 #
181:                 # 'listen' now blocks if there is no children
182: 
183:             @participant_regex = lookup_string_attribute(:to, workitem)
184: 
185:             @participant_regex = lookup_string_attribute(:on, workitem) \
186:                 unless @participant_regex
187: 
188:             raise "attribute 'to' is missing for expression 'listen'" \
189:                 unless @participant_regex
190: 
191:             ldebug { "apply() listening to '#{@participant_regex}'" }
192: 
193:             #
194:             # once
195: 
196:             @once = lookup_boolean_attribute :once, workitem, true
197: 
198:             @once = true if @children.size < 1
199:                 # a 'blocking listen' can only get triggered once.
200: 
201:             ldebug { "apply() @once is #{@once}" }
202: 
203:             #
204:             # merge
205: 
206:             merge = lookup_boolean_attribute :merge, workitem, false
207: 
208:             ldebug { "apply() merge is #{@merge}" }
209: 
210:             @applied_workitem = workitem.dup if merge
211: 
212:             #
213:             # upon
214: 
215:             @upon = lookup_sym_attribute(
216:                 :upon, workitem, :default => :apply)
217: 
218:             @upon = (@upon == :reply) ? :reply : :apply
219: 
220:             ldebug { "apply() @upon is #{@upon}" }
221: 
222:             @call_count = 0
223: 
224:             determine_timeout
225:             reschedule(get_scheduler)
226: 
227:             store_itself
228:         end

This is the method called when a ‘listenable’ workitem comes in

[Source]

     # File lib/openwfe/expressions/fe_listen.rb, line 252
252:         def call (channel, *args)
253:             synchronize do
254: 
255:                 upon = args[0]
256: 
257:                 ldebug { "call() channel : '#{channel}' upon '#{upon}'" }
258: 
259:                 return if upon != @upon
260: 
261:                 workitem = args[1].dup
262: 
263:                 conditional = eval_condition(:where, workitem)
264:                     #
265:                     # note that the values if the incoming workitem (not the
266:                     # workitem at apply time) are used for the evaluation
267:                     # of the condition (if necessary).
268: 
269:                 return if conditional == false
270: 
271:                 return if @once and @call_count > 0
272: 
273:                 #
274:                 # workitem does match...
275: 
276:                 ldebug do 
277:                     "call() "+
278:                     "through for fei #{workitem.fei} / "+
279:                     "'#{workitem.participant_name}'"
280:                 end
281: 
282:                 @call_count += 1
283:                 store_itself()
284: 
285:                 #ldebug { "call() @call_count is #{@call_count}" }
286: 
287:                 #
288:                 # eventual merge
289:                 
290:                 workitem = merge_workitems @applied_workitem.dup, workitem \
291:                     if @applied_workitem
292: 
293:                 #
294:                 # reply or launch nested child expression
295: 
296:                 if @children.size > 0
297:                     #
298:                     # listen with child
299: 
300:                     parent = @once ? self : nil
301: 
302:                     get_expression_pool.launch_template(
303:                         parent, 
304:                         nil, 
305:                         @call_count - 1, 
306:                         @children[0], 
307:                         workitem, 
308:                         nil)
309:                 else
310:                     #
311:                     # 'blocking listen'
312:                     
313:                     reply_to_parent workitem
314:                 end
315:             end
316:         end

[Source]

     # File lib/openwfe/expressions/fe_listen.rb, line 230
230:         def cancel
231: 
232:             stop_observing
233:         end

[Source]

     # File lib/openwfe/expressions/fe_listen.rb, line 235
235:         def reply_to_parent (workitem)
236: 
237:             stop_observing
238:             super
239:         end

Registers for timeout and start observing the participant activity.

[Source]

     # File lib/openwfe/expressions/fe_listen.rb, line 322
322:         def reschedule (scheduler)
323: 
324:             to_reschedule(scheduler)
325:             start_observing
326:         end

Only called in case of timeout.

[Source]

     # File lib/openwfe/expressions/fe_listen.rb, line 244
244:         def trigger (params)
245: 
246:             reply_to_parent workitem
247:         end

Protected Instance methods

Start observing a [participant name] channel.

[Source]

     # File lib/openwfe/expressions/fe_listen.rb, line 333
333:             def start_observing
334: 
335:                 get_participant_map.add_observer @participant_regex, self
336:             end

Expression‘s job is over, deregister.

[Source]

     # File lib/openwfe/expressions/fe_listen.rb, line 341
341:             def stop_observing
342: 
343:                 get_participant_map.remove_observer self
344:             end

[Validate]