| Class | OpenWFE::GenericSyncExpression |
| In: |
lib/openwfe/expressions/fe_concurrence.rb
|
| Parent: | SyncExpression |
The classical OpenWFE sync expression. Used by ‘concurrence’ and ‘concurrent-iterator‘
| cancel_remaining | [RW] | |
| count | [RW] | |
| remaining_children | [RW] | |
| reply_count | [RW] | |
| unready_queue | [RW] |
# File lib/openwfe/expressions/fe_concurrence.rb, line 319
319: def initialize (synchable, workitem)
320:
321: super()
322:
323: @remaining_children = []
324: @reply_count = 0
325:
326: @count = determine_count(synchable, workitem)
327: @cancel_remaining = cancel_remaining?(synchable, workitem)
328:
329: merge = synchable.lookup_sym_attribute(
330: :merge, workitem, :default => :first)
331:
332: merge_type = synchable.lookup_sym_attribute(
333: :merge_type, workitem, :default => :mix)
334:
335: synchable.ldebug { "new() merge_type is '#{merge_type}'" }
336:
337: @merge_array = MergeArray.new synchable.fei, merge, merge_type
338:
339: @unready_queue = []
340: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 369
369: def add_child (child)
370: @remaining_children << child
371: end
when all the children got applied concurrently, the concurrence calls this method to notify the sync expression that replies can be processed
# File lib/openwfe/expressions/fe_concurrence.rb, line 347
347: def ready (synchable)
348: synchable.synchronize do
349:
350: synchable.ldebug do
351: "ready() called by #{synchable.fei.to_debug_s} " +
352: "#{@unready_queue.length} wi waiting"
353: end
354:
355: queue = @unready_queue
356: @unready_queue = nil
357: synchable.store_itself()
358:
359: queue.each do |workitem|
360: break if do_reply(synchable, workitem)
361: #
362: # do_reply() will return 'true' as soon as the
363: # concurrence is over, if this is the case, the
364: # queue should not be treated anymore
365: end
366: end
367: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 373
373: def reply (synchable, workitem)
374: synchable.synchronize do
375:
376: if @unready_queue
377:
378: @unready_queue << workitem
379:
380: synchable.store_itself()
381:
382: synchable.ldebug do
383: "#{self.class}.reply() "+
384: "#{@unready_queue.length} wi waiting..."
385: end
386:
387: else
388: do_reply(synchable, workitem)
389: end
390: end
391: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 476
476: def cancel_remaining? (synchable_expression, workitem)
477:
478: s = synchable_expression.lookup_sym_attribute(
479: :remaining, workitem, :default => :cancel)
480:
481: (s == :cancel)
482: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 484
484: def determine_count (synchable_expression, workitem)
485:
486: c = synchable_expression.lookup_attribute :count, workitem
487: return -1 if not c
488: i = c.to_i
489: return -1 if i < 1
490: i
491: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 395
395: def do_reply (synchable, workitem)
396:
397: synchable.ldebug do
398: "#{self.class}.do_reply() from " +
399: "#{workitem.last_expression_id.to_debug_s}"
400: end
401:
402: @merge_array.push(synchable, workitem)
403:
404: @reply_count = @reply_count + 1
405:
406: @remaining_children.delete(workitem.last_expression_id)
407:
408: #synchable.ldebug do
409: # "#{self.class}.do_reply() "+
410: # "remaining children : #{@remaining_children.length}"
411: #end
412:
413: if @remaining_children.length <= 0
414: reply_to_parent(synchable)
415: return true
416: end
417:
418: if @count > 0 and @reply_count >= @count
419: treat_remaining_children(synchable)
420: reply_to_parent(synchable)
421: return true
422: end
423:
424: #
425: # over-if
426:
427: conditional =
428: synchable.eval_condition("over-if", workitem, "over-unless")
429:
430: if conditional
431: treat_remaining_children(synchable)
432: reply_to_parent(synchable)
433: return true
434: end
435:
436: #
437: # not over, resuming
438:
439: synchable.store_itself()
440:
441: #synchable.ldebug do
442: # "#{self.class}.do_reply() not replying to parent "+
443: # "#{workitem.last_expression_id.to_debug_s}"
444: #end
445:
446: false
447: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 449
449: def reply_to_parent (synchable)
450:
451: workitem = @merge_array.do_merge
452:
453: synchable.reply_to_parent workitem
454: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 456
456: def treat_remaining_children (synchable)
457:
458: expool = synchable.get_expression_pool
459:
460: @remaining_children.each do |child|
461:
462: synchable.ldebug do
463: "#{self.class}.treat_remainining_children() " +
464: "#{child.to_debug_s} " +
465: "(cancel ? #{@cancel_remaining})"
466: end
467:
468: if @cancel_remaining
469: expool.cancel(child)
470: else
471: expool.forget(synchable, child)
472: end
473: end
474: end