Class | OpenWFE::ExpressionPool |
In: |
lib/openwfe/expool/expressionpool.rb
|
Parent: | Object |
The ExpressionPool stores expressions (pieces of workflow instance). It‘s the core of the workflow engine. It relies on an expression storage for actual persistence of the expressions.
paused_instances | [R] | The hash containing the wfid of the process instances currently paused. |
The constructor for the expression pool.
# File lib/openwfe/expool/expressionpool.rb, line 104 104: def initialize (service_name, application_context) 105: 106: super() 107: 108: service_init(service_name, application_context) 109: 110: @paused_instances = {} 111: 112: @monitors = MonitorProvider.new(application_context) 113: 114: @observers = {} 115: 116: @stopped = false 117: 118: engine_environment_id 119: # makes sure it's called now 120: 121: start_workqueue 122: end
Applies a given expression (id or expression)
# File lib/openwfe/expool/expressionpool.rb, line 347 347: def apply (exp, workitem) 348: 349: queue_work :do_apply, exp, workitem 350: #do_apply exp, workitem 351: end
Cancels the given expression. The param might be an expression instance or a FlowExpressionId instance.
# File lib/openwfe/expool/expressionpool.rb, line 367 367: def cancel (exp) 368: 369: exp, fei = fetch exp 370: 371: unless exp 372: ldebug { "cancel() cannot cancel missing #{fei.to_debug_s}" } 373: return nil 374: end 375: 376: ldebug { "cancel() for #{fei.to_debug_s}" } 377: 378: onotify :cancel, exp 379: 380: inflowitem = exp.cancel() 381: remove exp 382: 383: inflowitem 384: end
Cancels the given expression and makes sure to resume the flow if the expression or one of its children were active.
If the cancelled branch was not active, this method will take care of removing the cancelled expression from the parent expression.
# File lib/openwfe/expool/expressionpool.rb, line 394 394: def cancel_expression (exp) 395: 396: exp = fetch_expression exp 397: 398: wi = cancel exp 399: 400: if wi 401: reply_to_parent exp, wi, false 402: else 403: parent_exp = fetch_expression exp.parent_id 404: parent_exp.remove_child(exp.fei) if parent_exp 405: end 406: end
Given any expression of a process, cancels the complete process instance.
# File lib/openwfe/expool/expressionpool.rb, line 412 412: def cancel_process (exp_or_wfid) 413: 414: wfid = extract_wfid exp_or_wfid, false 415: 416: ldebug { "cancel_process() '#{wfid}'" } 417: 418: root = fetch_root wfid 419: 420: raise "no process to cancel '#{wfid}'" unless root 421: 422: cancel root 423: end
Gets the process definition (if necessary) and turns into into an expression tree (for storing into a RawExpression).
# File lib/openwfe/expool/expressionpool.rb, line 768 768: def determine_rep (param) 769: 770: param = read_uri(param) if param.is_a?(URI) 771: 772: DefParser.parse param 773: end
Returns the unique engine_environment FlowExpressionId instance. There is only one such environment in an engine, hence this ‘singleton’ method.
# File lib/openwfe/expool/expressionpool.rb, line 666 666: def engine_environment_id 667: #synchronize do 668: # no need, it's been already called at initialization 669: 670: return @eei if @eei 671: 672: @eei = FlowExpressionId.new 673: @eei.owfe_version = OPENWFERU_VERSION 674: @eei.engine_id = get_engine.service_name 675: @eei.initial_engine_id = @eei.engine_id 676: @eei.workflow_definition_url = 'ee' 677: @eei.workflow_definition_name = 'ee' 678: @eei.workflow_definition_revision = '0' 679: @eei.workflow_instance_id = '0' 680: @eei.expression_name = EN_ENVIRONMENT 681: @eei.expression_id = '0' 682: @eei 683: #end 684: end
Fetches a FlowExpression from the pool. Returns a tuple : the FlowExpression plus its FlowExpressionId.
The param ‘exp’ may be a FlowExpressionId or a FlowExpression that has to be reloaded.
# File lib/openwfe/expool/expressionpool.rb, line 539 539: def fetch (exp) 540: synchronize do 541: 542: #ldebug { "fetch() exp is of kind #{exp.class}" } 543: 544: fei = if exp.is_a?(FlowExpression) 545: 546: exp.fei 547: 548: elsif not exp.is_a?(FlowExpressionId) 549: 550: raise \ 551: "Cannot fetch expression with key : "+ 552: "'#{fei}' (#{fei.class})" 553: 554: else 555: 556: exp 557: end 558: 559: #ldebug { "fetch() for #{fei.to_debug_s}" } 560: 561: [ get_expression_storage[fei], fei ] 562: end 563: end
Returns the engine environment (the top level environment)
# File lib/openwfe/expool/expressionpool.rb, line 580 580: def fetch_engine_environment 581: synchronize do 582: # 583: # synchronize to ensure that there's 1! engine env 584: 585: eei = engine_environment_id 586: ee, fei = fetch eei 587: 588: return ee if ee 589: 590: ee = Environment.new_env( 591: eei, nil, nil, @application_context, nil) 592: 593: ee.store_itself 594: 595: ee 596: end 597: end
Fetches a FlowExpression (returns only the FlowExpression instance)
The param ‘exp’ may be a FlowExpressionId or a FlowExpression that has to be reloaded.
# File lib/openwfe/expool/expressionpool.rb, line 571 571: def fetch_expression (exp) 572: 573: exp, fei = fetch exp 574: exp 575: end
Fetches the root expression of a process (or a subprocess).
# File lib/openwfe/expool/expressionpool.rb, line 602 602: def fetch_root (wfid) 603: 604: get_expression_storage.fetch_root wfid 605: end
Forgets the given expression (makes sure to substitute its parent_id with the GONE_PARENT_ID constant)
# File lib/openwfe/expool/expressionpool.rb, line 430 430: def forget (parent_exp, exp) 431: 432: exp, fei = fetch exp 433: 434: #ldebug { "forget() forgetting #{fei}" } 435: 436: return if not exp 437: 438: onotify :forget, exp 439: 440: parent_exp.children.delete(fei) 441: 442: exp.parent_id = GONE_PARENT_ID 443: exp.dup_environment 444: exp.store_itself() 445: 446: ldebug { "forget() forgot #{fei}" } 447: end
Obtains a unique monitor for an expression. It avoids the need for the FlowExpression instances to include the monitor mixin by themselves
# File lib/openwfe/expool/expressionpool.rb, line 143 143: def get_monitor (fei) 144: 145: @monitors[fei] 146: end
Returns true if the process instance to which the expression belongs is currently paused.
# File lib/openwfe/expool/expressionpool.rb, line 779 779: def is_paused? (expression) 780: 781: (@paused_instances[expression.fei.parent_wfid] != nil) 782: end
Instantiates a workflow definition and launches it.
This method call will return immediately, it could even return before the actual launch is completely over.
Returns the FlowExpressionId instance of the root expression of the newly launched flow.
# File lib/openwfe/expool/expressionpool.rb, line 196 196: def launch (launchitem, options={}) 197: 198: # 199: # prepare raw expression 200: 201: raw_expression = prepare_raw_expression launchitem 202: # 203: # will raise an exception if there are requirements 204: # and one of them is not met 205: 206: raw_expression.new_environment 207: # 208: # as this expression is the root of a new process instance, 209: # it has to have an environment for all the variables of 210: # the process instance 211: 212: raw_expression = wrap_in_schedule(raw_expression, options) \ 213: if options.size > 0 214: 215: fei = raw_expression.fei 216: 217: # 218: # apply prepared raw expression 219: 220: wi = build_workitem launchitem 221: 222: onotify :launch, fei, launchitem 223: 224: apply raw_expression, wi 225: 226: fei 227: end
launches a subprocess
# File lib/openwfe/expool/expressionpool.rb, line 307 307: def launch_template ( 308: requesting_expression, 309: env_id, 310: sub_id, 311: template, 312: workitem, 313: params=nil) 314: 315: rawexp = prepare_from_template( 316: requesting_expression, env_id, sub_id, template, params) 317: 318: workitem.flow_expression_id = rawexp.fei 319: 320: onotify :launch_template, rawexp.fei, workitem 321: 322: apply rawexp, workitem 323: 324: rawexp.fei 325: end
Lists all workflows (processes) currently in the expool (in the engine). This method will return a list of "process-definition" expressions (root of flows).
# File lib/openwfe/expool/expressionpool.rb, line 720 720: def list_processes (options={}) 721: 722: options[:include_classes] = DefineExpression 723: # 724: # Maybe it would be better to list root expressions instead 725: # so that expressions like 'sequence' can be used 726: # as root expressions. Later... 727: 728: get_expression_storage.find_expressions options 729: end
This method is called when apply() or reply() failed for an expression. There are currently only two ‘users’, the ParticipantExpression class and the do_process_workelement method of this ExpressionPool class.
# File lib/openwfe/expool/expressionpool.rb, line 738 738: def notify_error (error, fei, message, workitem) 739: 740: fei = extract_fei fei 741: # densha requires that... :( 742: 743: se = OpenWFE::exception_to_s error 744: 745: onotify :error, fei, message, workitem, error.class.name, se 746: 747: #fei = extract_fei fei 748: 749: if error.is_a?(PausedError) 750: lwarn do 751: "#{self.service_name} " + 752: "operation :#{message.to_s} on #{fei.to_s} " + 753: "delayed because process '#{fei.wfid}' is in pause" 754: end 755: else 756: lwarn do 757: "#{self.service_name} " + 758: "operation :#{message.to_s} on #{fei.to_s} " + 759: "failed with\n" + se 760: end 761: end 762: end
Prepares a raw expression from a template. Returns that raw expression.
Used in the concurrent-iterator when building up the children list and of course used by the launch_template() method.
# File lib/openwfe/expool/expressionpool.rb, line 236 236: def prepare_from_template ( 237: requesting_expression, env_id, sub_id, template, params=nil) 238: 239: rawexp = if template.is_a?(RawExpression) 240: 241: template.application_context = @application_context 242: template.dup 243: 244: elsif template.is_a?(FlowExpressionId) 245: 246: fetch_expression(template).dup 247: 248: else 249: 250: build_raw_expression nil, template 251: end 252: 253: #raise "did not find subprocess in : #{template.to_s}" # unless rawexp 254: 255: rawexp.fei = rawexp.fei.dup 256: 257: if requesting_expression == nil 258: 259: # no parent, brand new process instance 260: 261: rawexp.parent_id = nil 262: rawexp.fei.workflow_instance_id = get_wfid_generator.generate 263: 264: elsif requesting_expression.kind_of?(FlowExpressionId) 265: 266: rawexp.parent_id = requesting_expression 267: rawexp.fei.workflow_instance_id = \ 268: "#{requesting_expression.workflow_instance_id}.#{sub_id}" 269: 270: elsif requesting_expression.kind_of?(String) 271: 272: rawexp.parent_id = nil 273: rawexp.fei.workflow_instance_id = \ 274: "#{requesting_expression}.#{sub_id}" 275: 276: else # kind is FlowExpression 277: 278: rawexp.parent_id = requesting_expression.fei 279: rawexp.fei.workflow_instance_id = \ 280: "#{requesting_expression.fei.workflow_instance_id}.#{sub_id}" 281: end 282: 283: #ldebug do 284: # "launch_template() spawning wfid " + 285: # "#{rawexp.fei.workflow_instance_id.to_s}" 286: #end 287: 288: if env_id 289: 290: rawexp.environment_id = env_id 291: else 292: # 293: # the new scope gets its own environment 294: # 295: rawexp.new_environment params 296: end 297: 298: rawexp.store_itself 299: 300: rawexp 301: end
This method is called by the launch method. It‘s actually the first stage of that method. It may be interessant to use to ‘validate’ a launchitem and its process definition, as it will raise an exception in case of ‘parameter’ mismatch.
There is a ‘pre_launch_check’ alias for this method in the Engine class.
# File lib/openwfe/expool/expressionpool.rb, line 158 158: def prepare_raw_expression (launchitem) 159: 160: wfdurl = launchitem.workflow_definition_url 161: 162: raise "launchitem.workflow_definition_url not set, cannot launch" \ 163: unless wfdurl 164: 165: definition = if wfdurl.match "^field:" 166: 167: wfdfield = wfdurl[6..-1] 168: launchitem.attributes.delete wfdfield 169: else 170: 171: read_uri wfdurl 172: end 173: 174: raise "didn't find process definition at '#{wfdurl}'" \ 175: unless definition 176: 177: raw_expression = build_raw_expression launchitem, definition 178: 179: raw_expression.check_parameters launchitem 180: # 181: # will raise an exception if there are requirements 182: # and one of them is not met 183: 184: raw_expression 185: end
Returns the list of applied expressions belonging to a given workflow instance.
If the unapplied optional parameter is set to true, all the expressions (even those not yet applied) that compose the process instance will be returned. Environments will be returned as well.
# File lib/openwfe/expool/expressionpool.rb, line 694 694: def process_stack (wfid, unapplied=false) 695: 696: #raise "please provide a non-nil workflow instance id" # unless wfid 697: 698: wfid = extract_wfid wfid, true 699: 700: params = { 701: #:exclude_classes => [ Environment, RawExpression ], 702: #:exclude_classes => [ Environment ], 703: :parent_wfid => wfid 704: } 705: params[:applied] = true if (not unapplied) 706: 707: get_expression_storage.find_expressions params 708: 709: # TODO : (maybe) add raw_representation method to result 710: # (only if unapplied set to true) 711: end
Removes a flow expression from the pool (This method is mainly called from the pool itself)
# File lib/openwfe/expool/expressionpool.rb, line 611 611: def remove (exp) 612: 613: exp, _fei = fetch(exp) \ 614: if exp.is_a?(FlowExpressionId) 615: 616: return unless exp 617: 618: ldebug { "remove() fe #{exp.fei.to_debug_s}" } 619: 620: onotify :remove, exp.fei 621: 622: synchronize do 623: 624: @monitors.delete(exp.fei) 625: 626: remove_environment(exp.environment_id) \ 627: if exp.owns_its_environment? 628: end 629: end
Replies to a given expression
# File lib/openwfe/expool/expressionpool.rb, line 356 356: def reply (exp, workitem) 357: 358: queue_work :do_reply, exp, workitem 359: #do_reply exp, workitem 360: end
Replies to the parent of the given expression.
# File lib/openwfe/expool/expressionpool.rb, line 452 452: def reply_to_parent (exp, workitem, remove=true) 453: 454: ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } 455: 456: workitem.last_expression_id = exp.fei 457: 458: onotify :reply_to_parent, exp, workitem 459: 460: if remove 461: 462: remove(exp) 463: # 464: # remove the expression itself 465: 466: exp.clean_children() 467: # 468: # remove all the children of the expression 469: end 470: 471: # 472: # manage tag, have to remove it so it can get 'redone' or 'undone' 473: # (preventing abuse) 474: 475: tagname = exp.attributes["tag"] if exp.attributes 476: 477: exp.delete_variable(tagname) if tagname 478: 479: # 480: # flow terminated ? 481: 482: if not exp.parent_id 483: 484: ldebug do 485: "reply_to_parent() process " + 486: "#{exp.fei.workflow_instance_id} terminated" 487: end 488: 489: onotify :terminate, exp, workitem 490: 491: return 492: end 493: 494: # 495: # else, gone parent ? 496: 497: if exp.parent_id == GONE_PARENT_ID 498: 499: ldebug do 500: "reply_to_parent() parent is gone for " + 501: exp.fei.to_debug_s 502: end 503: 504: return 505: end 506: 507: # 508: # parent still present, reply to it 509: 510: reply exp.parent_id, workitem 511: end
This method is called at each expool (engine) [re]start. It roams through the previously saved (persisted) expressions to reschedule ones like ‘sleep’ or ‘cron’.
# File lib/openwfe/expool/expressionpool.rb, line 636 636: def reschedule 637: 638: return if @stopped 639: 640: synchronize do 641: 642: t = OpenWFE::Timer.new 643: 644: linfo { "reschedule() initiating..." } 645: 646: options = { :include_classes => Rufus::Schedulable } 647: 648: get_expression_storage.find_expressions(options).each do |fexp| 649: 650: linfo { "reschedule() for #{fexp.fei.to_s}..." } 651: 652: onotify :reschedule, fexp.fei 653: 654: fexp.reschedule get_scheduler 655: end 656: 657: linfo { "reschedule() done. (took #{t.duration} ms)" } 658: end 659: end
Stops this expression pool (especially its workqueue).
# File lib/openwfe/expool/expressionpool.rb, line 127 127: def stop 128: 129: @stopped = true 130: 131: stop_workqueue 132: # 133: # flushes the work queue 134: 135: onotify :stop 136: end
Adds or updates a flow expression in this pool
# File lib/openwfe/expool/expressionpool.rb, line 516 516: def update (flow_expression) 517: 518: ldebug { "update() for #{flow_expression.fei.to_debug_s}" } 519: 520: #t = Timer.new 521: 522: onotify :update, flow_expression.fei, flow_expression 523: 524: #ldebug do 525: # "update() took #{t.duration} ms " + 526: # "#{flow_expression.fei.to_debug_s}" 527: #end 528: 529: flow_expression 530: end
Builds the RawExpression instance at the root of the flow being launched.
The param can be a template or a definition (anything accepted by the determine_representation() method).
# File lib/openwfe/expool/expressionpool.rb, line 1041 1041: def build_raw_expression (launchitem, param) 1042: 1043: procdef = determine_rep param 1044: 1045: atts = procdef[1] 1046: flow_name = atts['name'] || "noname" 1047: flow_revision = atts['revision'] || "0" 1048: exp_name = procdef.first 1049: 1050: fei = new_fei launchitem, flow_name, flow_revision, exp_name 1051: 1052: RawExpression.new_raw( 1053: fei, nil, nil, @application_context, procdef) 1054: end
Prepares a new instance of InFlowWorkItem from a LaunchItem instance.
# File lib/openwfe/expool/expressionpool.rb, line 998 998: def build_workitem (launchitem) 999: 1000: wi = InFlowWorkItem.new 1001: 1002: wi.attributes = launchitem.attributes.dup 1003: 1004: wi 1005: end
Will raise an exception if the expression belongs to a paused process.
# File lib/openwfe/expool/expressionpool.rb, line 887 887: def check_if_paused (expression) 888: 889: wfid = expression.fei.parent_wfid 890: 891: raise PausedError.new(wfid) if @paused_instances[wfid] 892: end
The real apply work.
# File lib/openwfe/expool/expressionpool.rb, line 830 830: def do_apply (exp, workitem) 831: 832: exp, _fei = fetch(exp) if exp.is_a?(FlowExpressionId) 833: 834: #ldebug { "apply() '#{_fei}'" } 835: 836: if not exp 837: 838: #raise "apply() cannot apply missing #{_fei.to_debug_s}" 839: # not very helpful anyway 840: 841: lwarn do 842: "do_apply() cannot apply missing #{_fei.to_debug_s}" 843: end 844: return 845: end 846: 847: check_if_paused exp 848: 849: workitem.flow_expression_id = exp.fei 850: 851: onotify :apply, exp, workitem 852: 853: exp.apply workitem 854: end
This method is called by the workqueue when processing the atomic work operations.
# File lib/openwfe/expool/expressionpool.rb, line 813 813: def do_process_workelement elt 814: 815: message, fei, workitem = elt 816: 817: begin 818: 819: send message, fei, workitem 820: 821: rescue Exception => e 822: 823: notify_error e, fei, message, workitem 824: end 825: end
The real reply work is done here
# File lib/openwfe/expool/expressionpool.rb, line 859 859: def do_reply (exp, workitem) 860: 861: exp, fei = fetch exp 862: 863: ldebug { "reply() to #{fei.to_debug_s}" } 864: ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" } 865: 866: if not exp 867: 868: #raise "cannot reply to missing #{fei.to_debug_s}" 869: 870: lwarn do 871: "reply() cannot reply to missing #{fei.to_debug_s}" 872: end 873: return 874: end 875: 876: check_if_paused exp 877: 878: onotify :reply, exp, workitem 879: 880: exp.reply workitem 881: end
Builds a FlowExpressionId instance for a process being launched.
# File lib/openwfe/expool/expressionpool.rb, line 1011 1011: def new_fei (launchitem, flow_name, flow_revision, exp_name) 1012: 1013: url = if launchitem 1014: launchitem.workflow_definition_url 1015: else 1016: "no-url" 1017: end 1018: 1019: fei = FlowExpressionId.new 1020: 1021: fei.owfe_version = OPENWFERU_VERSION 1022: fei.engine_id = OpenWFE::stu get_engine.service_name 1023: fei.initial_engine_id = OpenWFE::stu fei.engine_id 1024: fei.workflow_definition_url = OpenWFE::stu url 1025: fei.workflow_definition_name = OpenWFE::stu flow_name 1026: fei.workflow_definition_revision = OpenWFE::stu flow_revision 1027: fei.wfid = get_wfid_generator.generate launchitem 1028: fei.expression_id = "0" 1029: fei.expression_name = exp_name 1030: 1031: fei 1032: end
This is the only point in the expression pool where an URI is read, so this is where the :remote_definitions_allowed security check is enforced.
# File lib/openwfe/expool/expressionpool.rb, line 791 791: def read_uri (uri) 792: 793: uri = URI.parse uri.to_s 794: 795: raise "loading remote definitions is not allowed" \ 796: if (ac[:remote_definitions_allowed] != true and 797: uri.scheme and 798: uri.scheme != 'file') 799: 800: #open(uri.to_s).read 801: 802: f = Rufus::Verbs.fopen uri 803: result = f.read 804: f.close if f.respond_to?(:close) 805: 806: result 807: end
Removes an environment, especially takes care of unbinding any special value it may contain.
# File lib/openwfe/expool/expressionpool.rb, line 977 977: def remove_environment (environment_id) 978: 979: ldebug { "remove_environment() #{environment_id.to_debug_s}" } 980: 981: env, fei = fetch(environment_id) 982: 983: return unless env 984: # 985: # env already unbound and removed 986: 987: env.unbind 988: 989: #get_expression_storage().delete(environment_id) 990: 991: onotify :remove, environment_id 992: end
if the launch method is called with a schedule option (like :at, :in, :cron and :every), this method takes care of wrapping the process with a sleep or a cron.
# File lib/openwfe/expool/expressionpool.rb, line 899 899: def wrap_in_schedule (raw_expression, options) 900: 901: oat = options[:at] 902: oin = options[:in] 903: ocron = options[:cron] 904: oevery = options[:every] 905: 906: fei = new_fei nil, "schedlaunch", "0", "sequence" 907: 908: # not very happy with this code, it builds custom 909: # wrapping processes manually, maybe there is 910: # a more elegant way, but for now, it's ok. 911: 912: if oat or oin 913: 914: seq = get_expression_map.get_class :sequence 915: seq = seq.new_exp fei, nil, nil, application_context, nil 916: 917: att = if oat 918: { "until" => oat } 919: else #oin 920: { "for" => oin } 921: end 922: att["scheduler-tags"] = "scheduled-launch" 923: 924: sle = get_expression_map.get_class :sleep 925: 926: sle = sle.new_exp( 927: fei.dup, fei, nil, application_context, att) 928: 929: sle.fei.expression_id = "0.0" 930: sle.fei.expression_name = "sleep" 931: 932: seq.children << sle.fei 933: seq.children << raw_expression.fei 934: 935: seq.new_environment 936: sle.environment_id = seq.environment_id 937: 938: sle.store_itself 939: seq.store_itself 940: 941: raw_expression.store_itself 942: raw_expression = seq 943: 944: elsif ocron or oevery 945: 946: fei.expression_name = "cron" 947: 948: att = if ocron 949: { "tab" => ocron } 950: else #oevery 951: { "every" => oevery } 952: end 953: att["name"] = "//cron_launch__#{fei.wfid}" 954: att["scheduler-tags"] = "scheduled-launch" 955: 956: cro = get_expression_map.get_class :cron 957: cro = cro.new_exp fei, nil, nil, application_context, att 958: 959: cro.children << raw_expression.fei 960: 961: cro.new_environment 962: 963: cro.store_itself 964: 965: raw_expression.store_itself 966: raw_expression = cro 967: end 968: # else, don't schedule at all 969: 970: raw_expression 971: end