Class | OpenWFE::Engine |
In: |
lib/openwfe/engine/engine.rb
lib/openwfe/extras/participants/activeparticipants.rb |
Parent: | Service |
Opening engine to update its reply method to accept these active record workitems.
Builds an OpenWFEru engine.
Accepts an optional initial application_context (containing initialization params for services for example).
The engine itself uses one param :logger, used to define where all the log output for OpenWFEru should go. By default, this output goes to logs/openwferu.log
# File lib/openwfe/engine/engine.rb, line 88 88: def initialize (application_context={}) 89: 90: super S_ENGINE, application_context 91: 92: $OWFE_LOG = application_context[:logger] 93: 94: unless $OWFE_LOG 95: #puts "Creating logs in " + FileUtils.pwd 96: FileUtils.mkdir("logs") unless File.exist?("logs") 97: $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000 98: $OWFE_LOG.level = Logger::INFO 99: end 100: 101: # build order matters. 102: # 103: # especially for the expstorage which 'observes' the expression 104: # pool and thus needs to be instantiated after it. 105: 106: build_scheduler 107: # 108: # for delayed or repetitive executions (it's the engine's clock) 109: # see http://openwferu.rubyforge.org/scheduler.html 110: 111: build_expression_map 112: # 113: # mapping expression names ('sequence', 'if', 'concurrence', 114: # 'when'...) to their implementations (SequenceExpression, 115: # IfExpression, ConcurrenceExpression, ...) 116: 117: build_wfid_generator 118: # 119: # the workflow instance (process instance) id generator 120: # making sure each process instance has a unique identifier 121: 122: build_expression_pool 123: # 124: # the core (hairy ball) of the engine 125: 126: build_expression_storage 127: # 128: # the engine persistence (persisting the expression instances 129: # that make up process instances) 130: 131: build_participant_map 132: # 133: # building the services that maps participant names to 134: # participant implementations / instances. 135: 136: build_error_journal 137: # 138: # builds the error journal (keeping track of failures 139: # in business process executions, and an opportunity to 140: # fix and replay) 141: 142: linfo { "new() --- engine started --- #{self.object_id}" } 143: end
Adds a workitem listener to this engine.
The ‘freq’ parameters if present might indicate how frequently the resource should be polled for incoming workitems.
engine.add_workitem_listener(listener, "3m10s") # every 3 minutes and 10 seconds engine.add_workitem_listener(listener, "0 22 * * 1-5") # every weekday at 10pm
TODO : block handling…
# File lib/openwfe/engine/engine.rb, line 286 286: def add_workitem_listener (listener, freq=nil) 287: 288: name = nil 289: 290: if listener.kind_of? Class 291: 292: listener = init_service nil, listener 293: 294: name = listener.service_name 295: else 296: 297: name = listener.name if listener.respond_to? :name 298: name = "#{listener.class}::#{listener.object_id}" unless name 299: 300: @application_context[name] = listener 301: end 302: 303: result = nil 304: 305: if freq 306: 307: freq = freq.to_s.strip 308: 309: result = if Rufus::Scheduler.is_cron_string(freq) 310: 311: get_scheduler.schedule(freq, listener) 312: else 313: 314: get_scheduler.schedule_every(freq, listener) 315: end 316: end 317: 318: linfo { "add_workitem_listener() added '#{name}'" } 319: 320: result 321: end
Enabling the console means that hitting CTRL-C on the window / term / dos box / whatever does run the OpenWFEru engine will open an IRB interactive console for directly manipulating the engine instance.
Hit CTRL-D to get out of the console.
# File lib/openwfe/engine/engine.rb, line 363 363: def enable_irb_console 364: 365: OpenWFE::trap_int_irb(binding) 366: end
Returns the variables set for a process or an expression.
If a process (wfid) is given, variables of the process environment will be returned, else variables in the environment valid for the expression (fei) will be returned.
If nothing (or nil) is given, the variables set in the engine environment will be returned.
# File lib/openwfe/engine/engine.rb, line 541 541: def get_variables (fei_or_wfid=nil) 542: 543: return get_expression_pool.fetch_engine_environment.variables \ 544: unless fei_or_wfid 545: 546: fetch_exp(fei_or_wfid).get_environment.variables 547: end
Makes the current thread join the engine‘s scheduler thread
You can thus make an engine standalone with something like :
require 'openwfe/engine/engine' the_engine = OpenWFE::Engine.new the_engine.join
And you‘ll have to hit CTRL-C to make it stop.
# File lib/openwfe/engine/engine.rb, line 335 335: def join 336: 337: get_scheduler.join 338: end
Calling this method makes the control flow block until the workflow engine is inactive.
TODO : implement idle_for
# File lib/openwfe/engine/engine.rb, line 346 346: def join_until_idle 347: 348: storage = get_expression_storage 349: 350: while storage.size > 1 351: sleep 1 352: end 353: end
Launches a [business] process. The ‘launch_object’ param may contain either a LaunchItem instance, either a String containing the URL of the process definition to launch (with an empty LaunchItem created on the fly).
The launch object can also be a String containing the XML process definition or directly a class extending OpenWFE::ProcessDefinition (Ruby process definition).
Returns the FlowExpressionId instance of the expression at the root of the newly launched process.
Options for scheduled launches like :at, :in and :cron are accepted via the ‘options’ optional parameter. For example :
engine.launch(launch_item) # will launch immediately engine.launch(launch_item, :in => "1d20m") # will launch in one day and twenty minutes engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007") # will launch at that point in time engine.launch(launch_item, :cron => "0 5 * * *") # will launch that same process every day, # five minutes after midnight (see "man 5 crontab")
# File lib/openwfe/engine/engine.rb, line 204 204: def launch (launch_object, options={}) 205: 206: launchitem = extract_launchitem launch_object 207: 208: fei = get_expression_pool.launch launchitem, options 209: 210: fei.dup 211: # 212: # so that users of this launch() method can play with their 213: # fei without breaking things 214: end
Returns an array of wfid (workflow instance ids) whose root environment containes the given variable
If there are no matches, an empty array will be returned.
Regular expressions are accepted as values.
If no value is given, all processes with the given variable name set will be returned.
# File lib/openwfe/engine/engine.rb, line 560 560: def lookup_processes (var_name, value=nil) 561: 562: # TODO : maybe this would be better in the ExpressionPool 563: 564: regexp = if value 565: if value.is_a?(Regexp) 566: value 567: else 568: Regexp.compile(value.to_s) 569: end 570: else 571: nil 572: end 573: 574: envs = get_expression_storage.find_expressions( 575: :include_classes => Environment) 576: 577: envs = envs.find_all do |env| 578: val = env.variables[var_name] 579: (val and ((not regexp) or (regexp.match(val)))) 580: end 581: envs.collect do |env| 582: env.fei.wfid 583: end 584: 585: #envs.inject([]) do |r, env| 586: # val = env.variables[var_name] 587: # r << env.fei.wfid # if (val and ((not regexp) or (regexp.match(val)))) 588: # r 589: #end 590: # 591: # seems slower... 592: end
Looks up a process variable in a process. If fei_or_wfid is not given, will simply look in the ‘engine environment’ (where the top level variables ’//’ do reside).
# File lib/openwfe/engine/engine.rb, line 523 523: def lookup_variable (var_name, fei_or_wfid=nil) 524: 525: return get_expression_pool.fetch_engine_environment[var_name] \ 526: unless fei_or_wfid 527: 528: fetch_exp(fei_or_wfid).lookup_variable var_name 529: end
Pauses a process (sets its /paused variable to true).
# File lib/openwfe/engine/engine.rb, line 457 457: def pause_process (wfid) 458: 459: wfid = extract_wfid wfid 460: 461: root_expression = get_expression_pool.fetch_root wfid 462: 463: get_expression_pool.paused_instances[wfid] = true 464: root_expression.set_variable VAR_PAUSED, true 465: end
When ‘parameters’ are used at the top of a process definition, this method can be used to assert a launchitem before launch. An expression will be raised if the parameters do not match the requirements.
Note that the launch method will raise those exceptions as well. This method can be useful in some scenarii though.
# File lib/openwfe/engine/engine.rb, line 169 169: def pre_launch_check (launchitem) 170: 171: get_expression_pool.prepare_raw_expression(launchitem) 172: end
Takes care of removing an error from the error journal and they replays its process at that point.
# File lib/openwfe/engine/engine.rb, line 506 506: def replay_at_error (error) 507: 508: get_error_journal.remove_errors( 509: error.fei.parent_wfid, 510: error) 511: 512: get_expression_pool.queue_work( 513: error.message, 514: error.fei, 515: error.workitem) 516: end
# File lib/openwfe/extras/participants/activeparticipants.rb, line 328 328: def reply (workitem) 329: 330: if workitem.is_a?(Workitem) 331: 332: oldreply(workitem.as_owfe_workitem) 333: workitem.destroy 334: else 335: 336: oldreply(workitem) 337: end 338: end
This method is used to feed a workitem back to the engine (after it got sent to a worklist or wherever by a participant). Participant implementations themselves do call this method usually.
This method also accepts LaunchItem instances.
Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem that don‘t belong to a process instance (ie whose flow_expression_id is nil). It will simply notify the participant_map of the reply for the given participant_name. If there is no participant_name specified for this orphan workitem, an exception will be raised.
# File lib/openwfe/engine/engine.rb, line 229 229: def reply (workitem) 230: 231: if workitem.is_a?(InFlowWorkItem) 232: 233: if workitem.flow_expression_id 234: # 235: # vanilla case, workitem coming back 236: # (from listener probably) 237: 238: return get_expression_pool.reply( 239: workitem.flow_expression_id, workitem) 240: end 241: 242: if workitem.participant_name 243: # 244: # a workitem that doesn't belong to a process instance 245: # but bears a participant name. 246: # Notify, there may be something listening on 247: # this channel (see the 'listen' expression). 248: 249: return get_participant_map.onotify( 250: workitem.participant_name, :reply, workitem) 251: end 252: 253: raise \ 254: "InFlowWorkitem doesn't belong to a process instance" + 255: " nor to a participant" 256: end 257: 258: return get_expression_pool.launch(workitem) \ 259: if workitem.is_a?(LaunchItem) 260: # 261: # launchitem coming from listener 262: # let's attempt to launch a new process instance 263: 264: raise \ 265: "engine.reply() " + 266: "cannot handle instances of #{workitem.class}" 267: end
Call this method once the participants for a persisted engine have been [re]added.
If this method is called too soon, missing participants will cause trouble… Call this method after all the participants have been added.
# File lib/openwfe/engine/engine.rb, line 153 153: def reschedule 154: 155: get_expression_pool.reschedule() 156: end
Restarts a process : removes its ‘paused’ flag (variable) and makes sure to ‘replay’ events (replies) that came for it while it was in pause.
# File lib/openwfe/engine/engine.rb, line 472 472: def resume_process (wfid) 473: 474: wfid = extract_wfid wfid 475: 476: root_expression = get_expression_pool.fetch_root wfid 477: 478: # 479: # remove 'paused' flag 480: 481: get_expression_pool.paused_instances.delete wfid 482: root_expression.unset_variable VAR_PAUSED 483: 484: # 485: # replay 486: # 487: # select PausedError instances in separate list 488: 489: errors = get_error_journal.get_error_log wfid 490: error_class = PausedError.name 491: paused_errors = errors.select { |e| e.error_class == error_class } 492: 493: return if paused_errors.size < 1 494: 495: # replay select PausedError instances 496: 497: paused_errors.each do |e| 498: replay_at_error e 499: end 500: end
Stopping the engine will stop all the services in the application context.
# File lib/openwfe/engine/engine.rb, line 384 384: def stop 385: 386: linfo { "stop() stopping engine '#{@service_name}'" } 387: 388: @application_context.each do |sname, service| 389: 390: next if sname == self.service_name 391: 392: #if service.kind_of?(ServiceMixin) 393: if service.respond_to?(:stop) 394: 395: service.stop 396: 397: linfo do 398: "stop() stopped service '#{sname}' (#{service.class})" 399: end 400: end 401: end 402: 403: linfo { "stop() stopped engine '#{@service_name}'" } 404: 405: nil 406: end
Replaces an expression in the pool with a newer version of it.
(useful when fixing processes on the fly)
# File lib/openwfe/engine/engine.rb, line 646 646: def update_expression (fexp) 647: 648: fexp.application_context = application_context 649: 650: get_expression_pool.update fexp 651: end
Use only when doing "process gardening".
This method updates an expression, the ‘data’ parameter is expected to be a hash. If the expression is an Environment, the variables will be merged with the ones found in the data param. If the expression is not an Environment, the data will be merged into the ‘applied_workitem’ if any.
If the merge is not possible, an exception will be raised.
# File lib/openwfe/engine/engine.rb, line 606 606: def update_expression_data (fei, data) 607: 608: fexp = fetch_exp fei 609: 610: original = if fexp.is_a?(Environment) 611: 612: fexp.variables 613: else 614: 615: fexp.applied_workitem.attributes 616: end 617: 618: original.merge! data 619: 620: get_expression_pool.update fexp 621: end
A variant of update_expression() that directly replaces the raw representation stored within a RawExpression.
Useful for modifying [not yet reached] segments of processes.
# File lib/openwfe/engine/engine.rb, line 629 629: def update_raw_expression (fei, representation) 630: 631: fexp = fetch_exp fei 632: 633: raise "cannot update already applied expression" \ 634: unless fexp.is_a?(RawExpression) 635: 636: fexp.raw_representation = representation 637: 638: get_expression_pool.update fexp 639: end
Waits for a given process instance to terminate. The method only exits when the flow terminates, but beware : if the process already terminated, the method will never exit.
The parameter can be a FlowExpressionId instance, for example the one given back by a launch(), or directly a workflow instance id (String).
This method is mainly used in utests.
# File lib/openwfe/engine/engine.rb, line 419 419: def wait_for (fei_or_wfid) 420: 421: wfid = if fei_or_wfid.kind_of?(FlowExpressionId) 422: fei_or_wfid.workflow_instance_id 423: else 424: fei_or_wfid 425: end 426: 427: t = Thread.new { Thread.stop } 428: 429: to = get_expression_pool.add_observer(:terminate) do |c, fe, wi| 430: t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?) 431: end 432: te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e| 433: t.wakeup if (fei.parent_wfid == wfid and t.alive?) 434: end 435: #tc = get_expression_pool.add_observer(:cancel) do |c, fe| 436: # if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?) 437: # sleep 0.500 438: # t.wakeup 439: # end 440: #end 441: 442: linfo { "wait_for() #{wfid}" } 443: 444: t.join 445: 446: get_expression_pool.remove_observer(to, :terminate) 447: get_expression_pool.remove_observer(te, :error) 448: #get_expression_pool.remove_observer(tc, :cancel) 449: # 450: # it would work as well without specifying the channel, 451: # but it's thus a little bit faster 452: end
The default implementation of this method uses an InMemoryErrorJournal (do not use in production).
# File lib/openwfe/engine/engine.rb, line 743 743: def build_error_journal 744: 745: init_service S_ERROR_JOURNAL, InMemoryErrorJournal 746: end
Builds the ExpressionMap (the mapping between expression names and expression implementations).
# File lib/openwfe/engine/engine.rb, line 664 664: def build_expression_map 665: 666: @application_context[S_EXPRESSION_MAP] = ExpressionMap.new 667: # 668: # the expression map is not a Service anymore, 669: # it's a simple instance (that will be reused in other 670: # OpenWFEru components) 671: end
Builds the OpenWFEru expression pool (the core of the engine) and binds it in the engine context. There is only one implementation of the expression pool, so this method is usually never overriden.
# File lib/openwfe/engine/engine.rb, line 699 699: def build_expression_pool 700: 701: init_service S_EXPRESSION_POOL, ExpressionPool 702: end
The implementation here builds an InMemoryExpressionStorage instance.
See FilePersistedEngine or CachedFilePersistedEngine for overrides of this method.
# File lib/openwfe/engine/engine.rb, line 711 711: def build_expression_storage 712: 713: init_service S_EXPRESSION_STORAGE, InMemoryExpressionStorage 714: end
The ParticipantMap is a mapping between participant names (well rather regular expressions) and participant implementations (see openwferu.rubyforge.org/participants.html)
# File lib/openwfe/engine/engine.rb, line 721 721: def build_participant_map 722: 723: init_service S_PARTICIPANT_MAP, ParticipantMap 724: end
There is only one Scheduler implementation, that‘s the one built and bound here.
# File lib/openwfe/engine/engine.rb, line 730 730: def build_scheduler 731: 732: scheduler = Rufus::Scheduler.new 733: 734: @application_context[S_SCHEDULER] = scheduler 735: 736: scheduler.start 737: end
This implementation builds a KotobaWfidGenerator instance and binds it in the engine context. There are other WfidGeneration implementations available, like UuidWfidGenerator or FieldWfidGenerator.
# File lib/openwfe/engine/engine.rb, line 679 679: def build_wfid_generator 680: 681: #init_service S_WFID_GENERATOR, DefaultWfidGenerator 682: #init_service S_WFID_GENERATOR, UuidWfidGenerator 683: init_service S_WFID_GENERATOR, KotobaWfidGenerator 684: 685: #g = FieldWfidGenerator.new( 686: # S_WFID_GENERATOR, @application_context, "wfid") 687: # 688: # showing how to initialize a FieldWfidGenerator that 689: # will take as workflow instance id the value found in 690: # the field "wfid" of the LaunchItem. 691: end
Turns the raw launch request info into a LaunchItem instance.
# File lib/openwfe/engine/engine.rb, line 751 751: def extract_launchitem (launch_object) 752: 753: if launch_object.kind_of?(OpenWFE::LaunchItem) 754: 755: launch_object 756: 757: elsif launch_object.kind_of?(Class) 758: 759: LaunchItem.new launch_object 760: 761: elsif launch_object.kind_of?(String) 762: 763: li = OpenWFE::LaunchItem.new 764: 765: if launch_object[0, 1] == '<' or launch_object.index("\n") 766: 767: li.workflow_definition_url = "field:__definition" 768: li['__definition'] = launch_object 769: 770: else 771: 772: li.workflow_definition_url = launch_object 773: end 774: 775: li 776: end 777: end