| Class | OpenWFE::ConcurrenceExpression |
| In: |
lib/openwfe/expressions/fe_concurrence.rb
|
| Parent: | SequenceExpression |
The concurrence expression will execute each of its (direct) children in parallel threads.
Thus,
<concurrence>
<participant ref="pa" />
<participant ref="pb" />
</concurrence>
Participants pa and pb will be ‘treated’ in parallel (quasi simultaneously).
The concurrence expressions accept many attributes, that can get combined. By default, the concurrence waits for all its children to reply and returns the workitem of the first child that replied. The attributes tune this behaviour.
count
<concurrence count="1">
<participant ref="pa" />
<participant ref="pb" />
</concurrence>
The concurrence will be over as soon as ‘pa’ or ‘pb’ replied, i.e. as soon as "1" child replied.
remaining
The attribute ‘remaining’ can take two values ‘cancel’ (the default) and ‘forget’. Cancelled children are completely wiped away, forgotten ones continue to operate but their reply will simply get discarded.
over-if
‘over-if’ accepts a ‘boolean expression’ (something replying ‘true’ or ‘false’), if the expression evaluates to true, the concurrence will be over and the remaining children will get cancelled (the default) or forgotten.
merge
By default, the first child to reply to its parent ‘concurrence’ expression ‘wins’, i.e. its workitem is used for resuming the flow (after the concurrence).
Thus, in that example
<concurrence merge="lowest">
<participant ref="pa" />
<participant ref="pb" />
</concurrence>
when the concurrence is done, the workitem of ‘pb’ is used to resume the flow after the concurrence.
merge-type
The merge occurs are the top level of workitem attributes.
More complex merge behaviour can be obtained by extending the GenericSyncExpression class. But the default sync options are already numerous and powerful by their combinations.
| sync_expression | [RW] |
# File lib/openwfe/expressions/fe_concurrence.rb, line 143
143: def apply (workitem)
144:
145: sync = lookup_sym_attribute(
146: :sync, workitem, :default => :generic)
147:
148: @sync_expression =
149: get_expression_map.get_sync_class(sync).new(self, workitem)
150:
151: @children.each do |child|
152: @sync_expression.add_child child
153: end
154:
155: store_itself()
156:
157: concurrence = self
158:
159: @children.each_with_index do |child, index|
160: Thread.new do
161: begin
162: #ldebug { "apply() child : #{child.to_debug_s}" }
163: concurrence.synchronize do
164:
165: get_expression_pool().apply(
166: child,
167: #workitem.dup)
168: get_workitem(workitem, index))
169: end
170: rescue Exception => e
171: lwarn do
172: "apply() " +
173: "caught exception in concurrent child " +
174: child.to_debug_s + "\n" +
175: OpenWFE::exception_to_s(e)
176: end
177: end
178: end
179: end
180:
181: #@sync_expression.ready(self)
182: #
183: # this is insufficient, have to do that :
184:
185: synchronize do
186: #
187: # Making sure the freshest version of the concurrence
188: # expression is used.
189: # This is especially important when using pure persistence.
190: #
191: reloaded_self, _fei = get_expression_pool.fetch(@fei)
192: reloaded_self.sync_expression.ready(reloaded_self)
193: end
194: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 196
196: def reply (workitem)
197: @sync_expression.reply(self, workitem)
198: end