Class | OpenWFE::GenericSyncExpression |
In: |
lib/openwfe/expressions/fe_concurrence.rb
|
Parent: | SyncExpression |
The classical OpenWFE sync expression. Used by ‘concurrence’ and ‘concurrent-iterator‘
cancel_remaining | [RW] | |
count | [RW] | |
remaining_children | [RW] | |
reply_count | [RW] | |
unready_queue | [RW] |
# File lib/openwfe/expressions/fe_concurrence.rb, line 319 319: def initialize (synchable, workitem) 320: 321: super() 322: 323: @remaining_children = [] 324: @reply_count = 0 325: 326: @count = determine_count(synchable, workitem) 327: @cancel_remaining = cancel_remaining?(synchable, workitem) 328: 329: merge = synchable.lookup_sym_attribute( 330: :merge, workitem, :default => :first) 331: 332: merge_type = synchable.lookup_sym_attribute( 333: :merge_type, workitem, :default => :mix) 334: 335: synchable.ldebug { "new() merge_type is '#{merge_type}'" } 336: 337: @merge_array = MergeArray.new synchable.fei, merge, merge_type 338: 339: @unready_queue = [] 340: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 369 369: def add_child (child) 370: @remaining_children << child 371: end
when all the children got applied concurrently, the concurrence calls this method to notify the sync expression that replies can be processed
# File lib/openwfe/expressions/fe_concurrence.rb, line 347 347: def ready (synchable) 348: synchable.synchronize do 349: 350: synchable.ldebug do 351: "ready() called by #{synchable.fei.to_debug_s} " + 352: "#{@unready_queue.length} wi waiting" 353: end 354: 355: queue = @unready_queue 356: @unready_queue = nil 357: synchable.store_itself() 358: 359: queue.each do |workitem| 360: break if do_reply(synchable, workitem) 361: # 362: # do_reply() will return 'true' as soon as the 363: # concurrence is over, if this is the case, the 364: # queue should not be treated anymore 365: end 366: end 367: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 373 373: def reply (synchable, workitem) 374: synchable.synchronize do 375: 376: if @unready_queue 377: 378: @unready_queue << workitem 379: 380: synchable.store_itself() 381: 382: synchable.ldebug do 383: "#{self.class}.reply() "+ 384: "#{@unready_queue.length} wi waiting..." 385: end 386: 387: else 388: do_reply(synchable, workitem) 389: end 390: end 391: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 476 476: def cancel_remaining? (synchable_expression, workitem) 477: 478: s = synchable_expression.lookup_sym_attribute( 479: :remaining, workitem, :default => :cancel) 480: 481: (s == :cancel) 482: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 484 484: def determine_count (synchable_expression, workitem) 485: 486: c = synchable_expression.lookup_attribute :count, workitem 487: return -1 if not c 488: i = c.to_i 489: return -1 if i < 1 490: i 491: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 395 395: def do_reply (synchable, workitem) 396: 397: synchable.ldebug do 398: "#{self.class}.do_reply() from " + 399: "#{workitem.last_expression_id.to_debug_s}" 400: end 401: 402: @merge_array.push(synchable, workitem) 403: 404: @reply_count = @reply_count + 1 405: 406: @remaining_children.delete(workitem.last_expression_id) 407: 408: #synchable.ldebug do 409: # "#{self.class}.do_reply() "+ 410: # "remaining children : #{@remaining_children.length}" 411: #end 412: 413: if @remaining_children.length <= 0 414: reply_to_parent(synchable) 415: return true 416: end 417: 418: if @count > 0 and @reply_count >= @count 419: treat_remaining_children(synchable) 420: reply_to_parent(synchable) 421: return true 422: end 423: 424: # 425: # over-if 426: 427: conditional = 428: synchable.eval_condition("over-if", workitem, "over-unless") 429: 430: if conditional 431: treat_remaining_children(synchable) 432: reply_to_parent(synchable) 433: return true 434: end 435: 436: # 437: # not over, resuming 438: 439: synchable.store_itself() 440: 441: #synchable.ldebug do 442: # "#{self.class}.do_reply() not replying to parent "+ 443: # "#{workitem.last_expression_id.to_debug_s}" 444: #end 445: 446: false 447: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 449 449: def reply_to_parent (synchable) 450: 451: workitem = @merge_array.do_merge 452: 453: synchable.reply_to_parent workitem 454: end
# File lib/openwfe/expressions/fe_concurrence.rb, line 456 456: def treat_remaining_children (synchable) 457: 458: expool = synchable.get_expression_pool 459: 460: @remaining_children.each do |child| 461: 462: synchable.ldebug do 463: "#{self.class}.treat_remainining_children() " + 464: "#{child.to_debug_s} " + 465: "(cancel ? #{@cancel_remaining})" 466: end 467: 468: if @cancel_remaining 469: expool.cancel(child) 470: else 471: expool.forget(synchable, child) 472: end 473: end 474: end