Class | OpenWFE::ListenExpression |
In: |
lib/openwfe/expressions/fe_listen.rb
|
Parent: | FlowExpression |
The "listen" expression can be viewed in two ways :
1) It‘s a hook into the participant map to intercept apply or reply operations on participants.
2) It allows OpenWFE[ru] to be a bit closer to the ‘ideal’ process-calculus world (en.wikipedia.org/wiki/Process_calculi)
Anyway…
<listen to="alice"> <subprocess ref="notify_bob" /> </listen>
Whenever a workitem is dispatched (applied) to the participant named "alice", the subprocess named "notify_bob" is triggered (once).
listen :to => "^channel_.*", :upon => "reply" do sequence do participant :ref => "delta" participant :ref => "echo" end end
After the listen has been applied, the first workitem coming back from a participant whose named starts with "channel_" will trigger a sequence with the participants ‘delta’ and ‘echo’.
listen :to => "alpha", :where => "${f:color} == red" do participant :ref => "echo" end
Will send a copy of the first workitem meant for participant "alpha" to participant "echo" if this workitem‘s color field is set to ‘red’.
listen :to => "alpha", :once => "false" do send_email_to_stakeholders end
This is some kind of a server : each time a workitem is dispatched to participant "alpha", the subprocess (or participant) named ‘send_email_to_stakeholders’) will receive a copy of that workitem. Use with care.
listen :to => "alpha", :once => "false", :timeout => "1M2w" do send_email_to_stakeholders end
The ‘listen’ expression understands the ‘timeout’ attribute. It can thus be instructed to stop listening after a certain amount of time (here, after one month and two weeks).
Since OpenWFEru 0.9.16, the listen expression is usable without a child expression. It blocks until a valid messages comes in the channel, at which point it resumes the process, with workitem that came as the message (not with the workitem at apply time). (no merge implemented for now).
sequence do listen :to => "channel_z", :upon => :reply participant :the_rest_of_the_process end
In this example, the process will block until a workitem comes for a participant named ‘channel_z’.
Note that since OpenWFEru 0.9.16, the engine accept (in its reply() method) workitems that don‘t belong to a process intance (ie workitems that have a nil flow_expression_id). So it‘s entirely feasible to send ‘notifications only’ workitems to the OpenWFEru engine. (see openwferu.rubyforge.org/svn/trunk/openwfe-ruby/test/ft_54b_listen.rb)
Since OpenWFEru 0.9.16, this expression has been aliased ‘intercept’ and ‘receive’. It also accepts the ‘on’ parameter as an alias parameter to the ‘to’ parameter. Think "listen to" and "receive on".
A ‘merge’ attribute was added as well in 0.9.16, if set to true (the default value being false), the incoming workitem will be merged with a copy of the workitem that ‘applied’ (activated) the listen expression.
applied_workitem | [RW] | ‘listen’ accepts a :merge attribute, when set to true, this field will contain a copy of the workitem that activated the listen activity. This copy will be merged with incoming (listened for) workitems when triggering the listen child. |
call_count | [RW] | how many messages were received (can more than 0 or 1 if ‘once’ is set to false). |
once | [RW] | is set to true if the expression listen to 1! workitem and then replies to its parent. When set to true, it listens until the process it belongs to terminates. The default value is true. |
participant_regex | [RW] | the channel on which this expression ‘listens‘ |
upon | [RW] | can take :apply or :reply as a value, if not set (nil), will listen on both ‘directions’. |
# File lib/openwfe/expressions/fe_listen.rb, line 174 174: def apply (workitem) 175: 176: #if @children.size < 1 177: # reply_to_parent workitem 178: # return 179: #end 180: # 181: # 'listen' now blocks if there is no children 182: 183: @participant_regex = lookup_string_attribute(:to, workitem) 184: 185: @participant_regex = lookup_string_attribute(:on, workitem) \ 186: unless @participant_regex 187: 188: raise "attribute 'to' is missing for expression 'listen'" \ 189: unless @participant_regex 190: 191: ldebug { "apply() listening to '#{@participant_regex}'" } 192: 193: # 194: # once 195: 196: @once = lookup_boolean_attribute :once, workitem, true 197: 198: @once = true if @children.size < 1 199: # a 'blocking listen' can only get triggered once. 200: 201: ldebug { "apply() @once is #{@once}" } 202: 203: # 204: # merge 205: 206: merge = lookup_boolean_attribute :merge, workitem, false 207: 208: ldebug { "apply() merge is #{@merge}" } 209: 210: @applied_workitem = workitem.dup if merge 211: 212: # 213: # upon 214: 215: @upon = lookup_sym_attribute( 216: :upon, workitem, :default => :apply) 217: 218: @upon = (@upon == :reply) ? :reply : :apply 219: 220: ldebug { "apply() @upon is #{@upon}" } 221: 222: @call_count = 0 223: 224: determine_timeout 225: reschedule(get_scheduler) 226: 227: store_itself 228: end
This is the method called when a ‘listenable’ workitem comes in
# File lib/openwfe/expressions/fe_listen.rb, line 252 252: def call (channel, *args) 253: synchronize do 254: 255: upon = args[0] 256: 257: ldebug { "call() channel : '#{channel}' upon '#{upon}'" } 258: 259: return if upon != @upon 260: 261: workitem = args[1].dup 262: 263: conditional = eval_condition(:where, workitem) 264: # 265: # note that the values if the incoming workitem (not the 266: # workitem at apply time) are used for the evaluation 267: # of the condition (if necessary). 268: 269: return if conditional == false 270: 271: return if @once and @call_count > 0 272: 273: # 274: # workitem does match... 275: 276: ldebug do 277: "call() "+ 278: "through for fei #{workitem.fei} / "+ 279: "'#{workitem.participant_name}'" 280: end 281: 282: @call_count += 1 283: store_itself() 284: 285: #ldebug { "call() @call_count is #{@call_count}" } 286: 287: # 288: # eventual merge 289: 290: workitem = merge_workitems @applied_workitem.dup, workitem \ 291: if @applied_workitem 292: 293: # 294: # reply or launch nested child expression 295: 296: if @children.size > 0 297: # 298: # listen with child 299: 300: parent = @once ? self : nil 301: 302: get_expression_pool.launch_template( 303: parent, 304: nil, 305: @call_count - 1, 306: @children[0], 307: workitem, 308: nil) 309: else 310: # 311: # 'blocking listen' 312: 313: reply_to_parent workitem 314: end 315: end 316: end
# File lib/openwfe/expressions/fe_listen.rb, line 230 230: def cancel 231: 232: stop_observing 233: end
# File lib/openwfe/expressions/fe_listen.rb, line 235 235: def reply_to_parent (workitem) 236: 237: stop_observing 238: super 239: end
Registers for timeout and start observing the participant activity.
# File lib/openwfe/expressions/fe_listen.rb, line 322 322: def reschedule (scheduler) 323: 324: to_reschedule(scheduler) 325: start_observing 326: end
Only called in case of timeout.
# File lib/openwfe/expressions/fe_listen.rb, line 244 244: def trigger (params) 245: 246: reply_to_parent workitem 247: end
Start observing a [participant name] channel.
# File lib/openwfe/expressions/fe_listen.rb, line 333 333: def start_observing 334: 335: get_participant_map.add_observer @participant_regex, self 336: end
Expression‘s job is over, deregister.
# File lib/openwfe/expressions/fe_listen.rb, line 341 341: def stop_observing 342: 343: get_participant_map.remove_observer self 344: end