Class OpenWFE::GenericSyncExpression
In: lib/openwfe/expressions/fe_concurrence.rb
Parent: SyncExpression

The classical OpenWFE sync expression. Used by ‘concurrence’ and ‘concurrent-iterator‘

Methods

Classes and Modules

Class OpenWFE::GenericSyncExpression::MergeArray

Attributes

cancel_remaining  [RW] 
count  [RW] 
remaining_children  [RW] 
reply_count  [RW] 
unready_queue  [RW] 

Public Class methods

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 319
319:         def initialize (synchable, workitem)
320: 
321:             super()
322: 
323:             @remaining_children = []
324:             @reply_count = 0
325: 
326:             @count = determine_count(synchable, workitem)
327:             @cancel_remaining = cancel_remaining?(synchable, workitem)
328: 
329:             merge = synchable.lookup_sym_attribute(
330:                 :merge, workitem, :default => :first)
331: 
332:             merge_type = synchable.lookup_sym_attribute(
333:                 :merge_type, workitem, :default => :mix)
334: 
335:             synchable.ldebug { "new() merge_type is '#{merge_type}'" }
336: 
337:             @merge_array = MergeArray.new synchable.fei, merge, merge_type
338: 
339:             @unready_queue = []
340:         end

Public Instance methods

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 369
369:         def add_child (child)
370:             @remaining_children << child
371:         end

when all the children got applied concurrently, the concurrence calls this method to notify the sync expression that replies can be processed

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 347
347:         def ready (synchable)
348:             synchable.synchronize do
349: 
350:                 synchable.ldebug do 
351:                     "ready() called by  #{synchable.fei.to_debug_s}  " +
352:                     "#{@unready_queue.length} wi waiting"
353:                 end
354: 
355:                 queue = @unready_queue
356:                 @unready_queue = nil
357:                 synchable.store_itself()
358: 
359:                 queue.each do |workitem|
360:                     break if do_reply(synchable, workitem)
361:                         #
362:                         # do_reply() will return 'true' as soon as the 
363:                         # concurrence is over, if this is the case, the
364:                         # queue should not be treated anymore
365:                 end
366:             end
367:         end

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 373
373:         def reply (synchable, workitem)
374:             synchable.synchronize do
375: 
376:                 if @unready_queue
377: 
378:                     @unready_queue << workitem
379: 
380:                     synchable.store_itself()
381: 
382:                     synchable.ldebug do 
383:                         "#{self.class}.reply() "+
384:                         "#{@unready_queue.length} wi waiting..."
385:                     end
386: 
387:                 else
388:                     do_reply(synchable, workitem)
389:                 end
390:             end
391:         end

Protected Instance methods

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 476
476:             def cancel_remaining? (synchable_expression, workitem)
477: 
478:                 s = synchable_expression.lookup_sym_attribute(
479:                     :remaining, workitem, :default => :cancel)
480: 
481:                 (s == :cancel)
482:             end

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 484
484:             def determine_count (synchable_expression, workitem)
485: 
486:                 c = synchable_expression.lookup_attribute :count, workitem
487:                 return -1 if not c
488:                 i = c.to_i
489:                 return -1 if i < 1
490:                 i
491:             end

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 395
395:             def do_reply (synchable, workitem)
396: 
397:                 synchable.ldebug do 
398:                     "#{self.class}.do_reply() from " +
399:                     "#{workitem.last_expression_id.to_debug_s}"
400:                 end
401: 
402:                 @merge_array.push(synchable, workitem)
403: 
404:                 @reply_count = @reply_count + 1
405: 
406:                 @remaining_children.delete(workitem.last_expression_id)
407: 
408:                 #synchable.ldebug do
409:                 #    "#{self.class}.do_reply()  "+
410:                 #    "remaining children : #{@remaining_children.length}"
411:                 #end
412: 
413:                 if @remaining_children.length <= 0
414:                     reply_to_parent(synchable)
415:                     return true
416:                 end
417: 
418:                 if @count > 0 and @reply_count >= @count
419:                     treat_remaining_children(synchable)
420:                     reply_to_parent(synchable)
421:                     return true
422:                 end
423: 
424:                 #
425:                 # over-if
426: 
427:                 conditional = 
428:                     synchable.eval_condition("over-if", workitem, "over-unless")
429: 
430:                 if conditional
431:                     treat_remaining_children(synchable)
432:                     reply_to_parent(synchable)
433:                     return true
434:                 end
435: 
436:                 #
437:                 # not over, resuming
438: 
439:                 synchable.store_itself()
440: 
441:                 #synchable.ldebug do
442:                 #    "#{self.class}.do_reply() not replying to parent "+
443:                 #    "#{workitem.last_expression_id.to_debug_s}"
444:                 #end
445: 
446:                 false
447:             end

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 449
449:             def reply_to_parent (synchable)
450: 
451:                 workitem = @merge_array.do_merge
452: 
453:                 synchable.reply_to_parent workitem
454:             end

[Source]

     # File lib/openwfe/expressions/fe_concurrence.rb, line 456
456:             def treat_remaining_children (synchable)
457: 
458:                 expool = synchable.get_expression_pool
459: 
460:                 @remaining_children.each do |child|
461: 
462:                     synchable.ldebug do 
463:                         "#{self.class}.treat_remainining_children() " +
464:                         "#{child.to_debug_s} " +
465:                         "(cancel ? #{@cancel_remaining})"
466:                     end
467: 
468:                     if @cancel_remaining
469:                         expool.cancel(child)
470:                     else
471:                         expool.forget(synchable, child)
472:                     end
473:                 end
474:             end

[Validate]