Class OpenWFE::Engine
In: lib/openwfe/engine/engine.rb
lib/openwfe/extras/participants/activeparticipants.rb
Parent: Service

Opening engine to update its reply method to accept these active record workitems.

Methods

Included Modules

OwfeServiceLocator FeiMixin ExpoolMethods StatusMethods ParticipantMethods

Public Class methods

Builds an OpenWFEru engine.

Accepts an optional initial application_context (containing initialization params for services for example).

The engine itself uses one param :logger, used to define where all the log output for OpenWFEru should go. By default, this output goes to logs/openwferu.log

[Source]

     # File lib/openwfe/engine/engine.rb, line 88
 88:         def initialize (application_context={})
 89: 
 90:             super S_ENGINE, application_context
 91: 
 92:             $OWFE_LOG = application_context[:logger]
 93: 
 94:             unless $OWFE_LOG
 95:                 #puts "Creating logs in " + FileUtils.pwd
 96:                 FileUtils.mkdir("logs") unless File.exist?("logs")
 97:                 $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000
 98:                 $OWFE_LOG.level = Logger::INFO
 99:             end
100: 
101:             # build order matters.
102:             #
103:             # especially for the expstorage which 'observes' the expression
104:             # pool and thus needs to be instantiated after it.
105: 
106:             build_scheduler
107:                 #
108:                 # for delayed or repetitive executions (it's the engine's clock)
109:                 # see http://openwferu.rubyforge.org/scheduler.html
110: 
111:             build_expression_map
112:                 #
113:                 # mapping expression names ('sequence', 'if', 'concurrence', 
114:                 # 'when'...) to their implementations (SequenceExpression,
115:                 # IfExpression, ConcurrenceExpression, ...)
116: 
117:             build_wfid_generator
118:                 #
119:                 # the workflow instance (process instance) id generator
120:                 # making sure each process instance has a unique identifier
121: 
122:             build_expression_pool
123:                 #
124:                 # the core (hairy ball) of the engine
125: 
126:             build_expression_storage
127:                 #
128:                 # the engine persistence (persisting the expression instances
129:                 # that make up process instances)
130: 
131:             build_participant_map
132:                 #
133:                 # building the services that maps participant names to 
134:                 # participant implementations / instances.
135: 
136:             build_error_journal
137:                 #
138:                 # builds the error journal (keeping track of failures
139:                 # in business process executions, and an opportunity to
140:                 # fix and replay)
141: 
142:             linfo { "new() --- engine started --- #{self.object_id}" }
143:         end

Public Instance methods

Adds a workitem listener to this engine.

The ‘freq’ parameters if present might indicate how frequently the resource should be polled for incoming workitems.

    engine.add_workitem_listener(listener, "3m10s")
       # every 3 minutes and 10 seconds

    engine.add_workitem_listener(listener, "0 22 * * 1-5")
       # every weekday at 10pm

TODO : block handling…

[Source]

     # File lib/openwfe/engine/engine.rb, line 286
286:         def add_workitem_listener (listener, freq=nil)
287: 
288:             name = nil
289: 
290:             if listener.kind_of? Class
291: 
292:                 listener = init_service nil, listener
293: 
294:                 name = listener.service_name
295:             else
296: 
297:                 name = listener.name if listener.respond_to? :name
298:                 name = "#{listener.class}::#{listener.object_id}" unless name
299: 
300:                 @application_context[name] = listener
301:             end
302: 
303:             result = nil
304: 
305:             if freq
306: 
307:                 freq = freq.to_s.strip
308: 
309:                 result = if Rufus::Scheduler.is_cron_string(freq)
310: 
311:                     get_scheduler.schedule(freq, listener)
312:                 else
313: 
314:                     get_scheduler.schedule_every(freq, listener)
315:                 end
316:             end
317: 
318:             linfo { "add_workitem_listener() added '#{name}'" }
319: 
320:             result
321:         end

Enabling the console means that hitting CTRL-C on the window / term / dos box / whatever does run the OpenWFEru engine will open an IRB interactive console for directly manipulating the engine instance.

Hit CTRL-D to get out of the console.

[Source]

     # File lib/openwfe/engine/engine.rb, line 363
363:         def enable_irb_console
364: 
365:             OpenWFE::trap_int_irb(binding)
366:         end
forward(workitem)

Alias for reply

forward(workitem)

Alias for reply

Returns the variables set for a process or an expression.

If a process (wfid) is given, variables of the process environment will be returned, else variables in the environment valid for the expression (fei) will be returned.

If nothing (or nil) is given, the variables set in the engine environment will be returned.

[Source]

     # File lib/openwfe/engine/engine.rb, line 541
541:         def get_variables (fei_or_wfid=nil)
542: 
543:             return get_expression_pool.fetch_engine_environment.variables \
544:                 unless fei_or_wfid
545: 
546:             fetch_exp(fei_or_wfid).get_environment.variables
547:         end

Makes the current thread join the engine‘s scheduler thread

You can thus make an engine standalone with something like :

    require 'openwfe/engine/engine'

    the_engine = OpenWFE::Engine.new
    the_engine.join

And you‘ll have to hit CTRL-C to make it stop.

[Source]

     # File lib/openwfe/engine/engine.rb, line 335
335:         def join
336: 
337:             get_scheduler.join
338:         end

Calling this method makes the control flow block until the workflow engine is inactive.

TODO : implement idle_for

[Source]

     # File lib/openwfe/engine/engine.rb, line 346
346:         def join_until_idle
347: 
348:             storage = get_expression_storage
349: 
350:             while storage.size > 1
351:                 sleep 1
352:             end
353:         end

Launches a [business] process. The ‘launch_object’ param may contain either a LaunchItem instance, either a String containing the URL of the process definition to launch (with an empty LaunchItem created on the fly).

The launch object can also be a String containing the XML process definition or directly a class extending OpenWFE::ProcessDefinition (Ruby process definition).

Returns the FlowExpressionId instance of the expression at the root of the newly launched process.

Options for scheduled launches like :at, :in and :cron are accepted via the ‘options’ optional parameter. For example :

    engine.launch(launch_item)
        # will launch immediately

    engine.launch(launch_item, :in => "1d20m")
        # will launch in one day and twenty minutes

    engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007")
        # will launch at that point in time

    engine.launch(launch_item, :cron => "0 5 * * *")
        # will launch that same process every day,
        # five minutes after midnight (see "man 5 crontab")

[Source]

     # File lib/openwfe/engine/engine.rb, line 204
204:         def launch (launch_object, options={})
205: 
206:             launchitem = extract_launchitem launch_object
207: 
208:             fei = get_expression_pool.launch launchitem, options
209: 
210:             fei.dup
211:                 #
212:                 # so that users of this launch() method can play with their
213:                 # fei without breaking things
214:         end

Returns an array of wfid (workflow instance ids) whose root environment containes the given variable

If there are no matches, an empty array will be returned.

Regular expressions are accepted as values.

If no value is given, all processes with the given variable name set will be returned.

[Source]

     # File lib/openwfe/engine/engine.rb, line 560
560:         def lookup_processes (var_name, value=nil)
561: 
562:             # TODO : maybe this would be better in the ExpressionPool
563: 
564:             regexp = if value
565:                 if value.is_a?(Regexp)
566:                     value
567:                 else
568:                     Regexp.compile(value.to_s)
569:                 end
570:             else
571:                 nil
572:             end
573: 
574:             envs = get_expression_storage.find_expressions(
575:                 :include_classes => Environment)
576: 
577:             envs = envs.find_all do |env|
578:                 val = env.variables[var_name]
579:                 (val and ((not regexp) or (regexp.match(val))))
580:             end
581:             envs.collect do |env|
582:                 env.fei.wfid
583:             end
584: 
585:             #envs.inject([]) do |r, env|
586:             #    val = env.variables[var_name]
587:             #    r << env.fei.wfid              #        if (val and ((not regexp) or (regexp.match(val))))
588:             #    r
589:             #end
590:                 #
591:                 # seems slower...
592:         end

Looks up a process variable in a process. If fei_or_wfid is not given, will simply look in the ‘engine environment’ (where the top level variables ’//’ do reside).

[Source]

     # File lib/openwfe/engine/engine.rb, line 523
523:         def lookup_variable (var_name, fei_or_wfid=nil)
524: 
525:             return get_expression_pool.fetch_engine_environment[var_name] \
526:                 unless fei_or_wfid
527: 
528:             fetch_exp(fei_or_wfid).lookup_variable var_name
529:         end
oldreply(workitem)

Alias for reply

Pauses a process (sets its /paused variable to true).

[Source]

     # File lib/openwfe/engine/engine.rb, line 457
457:         def pause_process (wfid)
458: 
459:             wfid = extract_wfid wfid
460: 
461:             root_expression = get_expression_pool.fetch_root wfid
462: 
463:             get_expression_pool.paused_instances[wfid] = true
464:             root_expression.set_variable VAR_PAUSED, true
465:         end

When ‘parameters’ are used at the top of a process definition, this method can be used to assert a launchitem before launch. An expression will be raised if the parameters do not match the requirements.

Note that the launch method will raise those exceptions as well. This method can be useful in some scenarii though.

[Source]

     # File lib/openwfe/engine/engine.rb, line 169
169:         def pre_launch_check (launchitem)
170: 
171:             get_expression_pool.prepare_raw_expression(launchitem)
172:         end
proceed(workitem)

Alias for reply

proceed(workitem)

Alias for reply

reload()

Alias for reschedule

Takes care of removing an error from the error journal and they replays its process at that point.

[Source]

     # File lib/openwfe/engine/engine.rb, line 506
506:         def replay_at_error (error)
507: 
508:             get_error_journal.remove_errors(
509:                 error.fei.parent_wfid, 
510:                 error)
511: 
512:             get_expression_pool.queue_work(
513:                 error.message,
514:                 error.fei,
515:                 error.workitem)
516:         end

[Source]

     # File lib/openwfe/extras/participants/activeparticipants.rb, line 328
328:             def reply (workitem)
329: 
330:                 if workitem.is_a?(Workitem)
331: 
332:                     oldreply(workitem.as_owfe_workitem)
333:                     workitem.destroy
334:                 else
335: 
336:                     oldreply(workitem)
337:                 end
338:             end

This method is used to feed a workitem back to the engine (after it got sent to a worklist or wherever by a participant). Participant implementations themselves do call this method usually.

This method also accepts LaunchItem instances.

Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem that don‘t belong to a process instance (ie whose flow_expression_id is nil). It will simply notify the participant_map of the reply for the given participant_name. If there is no participant_name specified for this orphan workitem, an exception will be raised.

[Source]

     # File lib/openwfe/engine/engine.rb, line 229
229:         def reply (workitem)
230: 
231:             if workitem.is_a?(InFlowWorkItem)
232: 
233:                 if workitem.flow_expression_id
234:                     #
235:                     # vanilla case, workitem coming back
236:                     # (from listener probably)
237: 
238:                     return get_expression_pool.reply(
239:                         workitem.flow_expression_id, workitem)
240:                 end
241: 
242:                 if workitem.participant_name
243:                     #
244:                     # a workitem that doesn't belong to a process instance
245:                     # but bears a participant name.
246:                     # Notify, there may be something listening on
247:                     # this channel (see the 'listen' expression).
248: 
249:                     return get_participant_map.onotify(
250:                         workitem.participant_name, :reply, workitem)
251:                 end
252: 
253:                 raise \
254:                     "InFlowWorkitem doesn't belong to a process instance" +
255:                     " nor to a participant"
256:             end
257: 
258:             return get_expression_pool.launch(workitem) \
259:                 if workitem.is_a?(LaunchItem)
260:                     #
261:                     # launchitem coming from listener
262:                     # let's attempt to launch a new process instance
263: 
264:             raise \
265:                 "engine.reply() " +
266:                 "cannot handle instances of #{workitem.class}"
267:         end

Call this method once the participants for a persisted engine have been [re]added.

If this method is called too soon, missing participants will cause trouble… Call this method after all the participants have been added.

[Source]

     # File lib/openwfe/engine/engine.rb, line 153
153:         def reschedule
154: 
155:             get_expression_pool.reschedule()
156:         end

Restarts a process : removes its ‘paused’ flag (variable) and makes sure to ‘replay’ events (replies) that came for it while it was in pause.

[Source]

     # File lib/openwfe/engine/engine.rb, line 472
472:         def resume_process (wfid)
473: 
474:             wfid = extract_wfid wfid
475: 
476:             root_expression = get_expression_pool.fetch_root wfid
477: 
478:             #
479:             # remove 'paused' flag
480: 
481:             get_expression_pool.paused_instances.delete wfid
482:             root_expression.unset_variable VAR_PAUSED 
483: 
484:             #
485:             # replay
486:             #
487:             # select PausedError instances in separate list
488: 
489:             errors = get_error_journal.get_error_log wfid
490:             error_class = PausedError.name
491:             paused_errors = errors.select { |e| e.error_class == error_class }
492: 
493:             return if paused_errors.size < 1
494: 
495:             # replay select PausedError instances
496: 
497:             paused_errors.each do |e|
498:                 replay_at_error e
499:             end
500:         end

Stopping the engine will stop all the services in the application context.

[Source]

     # File lib/openwfe/engine/engine.rb, line 384
384:         def stop
385: 
386:             linfo { "stop() stopping engine '#{@service_name}'" }
387: 
388:             @application_context.each do |sname, service|
389: 
390:                 next if sname == self.service_name
391: 
392:                 #if service.kind_of?(ServiceMixin)
393:                 if service.respond_to?(:stop)
394: 
395:                     service.stop
396: 
397:                     linfo do 
398:                         "stop() stopped service '#{sname}' (#{service.class})"
399:                     end
400:                 end
401:             end
402: 
403:             linfo { "stop() stopped engine '#{@service_name}'" }
404: 
405:             nil
406:         end

Replaces an expression in the pool with a newer version of it.

(useful when fixing processes on the fly)

[Source]

     # File lib/openwfe/engine/engine.rb, line 646
646:         def update_expression (fexp)
647: 
648:             fexp.application_context = application_context
649: 
650:             get_expression_pool.update fexp
651:         end

Use only when doing "process gardening".

This method updates an expression, the ‘data’ parameter is expected to be a hash. If the expression is an Environment, the variables will be merged with the ones found in the data param. If the expression is not an Environment, the data will be merged into the ‘applied_workitem’ if any.

If the merge is not possible, an exception will be raised.

[Source]

     # File lib/openwfe/engine/engine.rb, line 606
606:         def update_expression_data (fei, data)
607: 
608:             fexp = fetch_exp fei
609: 
610:             original = if fexp.is_a?(Environment)
611: 
612:                 fexp.variables
613:             else
614: 
615:                 fexp.applied_workitem.attributes
616:             end
617: 
618:             original.merge! data
619: 
620:             get_expression_pool.update fexp
621:         end

A variant of update_expression() that directly replaces the raw representation stored within a RawExpression.

Useful for modifying [not yet reached] segments of processes.

[Source]

     # File lib/openwfe/engine/engine.rb, line 629
629:         def update_raw_expression (fei, representation)
630: 
631:             fexp = fetch_exp fei
632: 
633:             raise "cannot update already applied expression" \
634:                 unless fexp.is_a?(RawExpression)
635: 
636:             fexp.raw_representation = representation
637: 
638:             get_expression_pool.update fexp
639:         end

Waits for a given process instance to terminate. The method only exits when the flow terminates, but beware : if the process already terminated, the method will never exit.

The parameter can be a FlowExpressionId instance, for example the one given back by a launch(), or directly a workflow instance id (String).

This method is mainly used in utests.

[Source]

     # File lib/openwfe/engine/engine.rb, line 419
419:         def wait_for (fei_or_wfid)
420: 
421:             wfid = if fei_or_wfid.kind_of?(FlowExpressionId)
422:                 fei_or_wfid.workflow_instance_id
423:             else
424:                 fei_or_wfid
425:             end
426: 
427:             t = Thread.new { Thread.stop }
428: 
429:             to = get_expression_pool.add_observer(:terminate) do |c, fe, wi|
430:                 t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?)
431:             end
432:             te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e|
433:                 t.wakeup if (fei.parent_wfid == wfid and t.alive?)
434:             end
435:             #tc = get_expression_pool.add_observer(:cancel) do |c, fe|
436:             #    if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?)
437:             #        sleep 0.500
438:             #        t.wakeup 
439:             #    end
440:             #end
441: 
442:             linfo { "wait_for() #{wfid}" }
443: 
444:             t.join
445: 
446:             get_expression_pool.remove_observer(to, :terminate)
447:             get_expression_pool.remove_observer(te, :error)
448:             #get_expression_pool.remove_observer(tc, :cancel)
449:                 #
450:                 # it would work as well without specifying the channel,
451:                 # but it's thus a little bit faster
452:         end

Protected Instance methods

The default implementation of this method uses an InMemoryErrorJournal (do not use in production).

[Source]

     # File lib/openwfe/engine/engine.rb, line 743
743:             def build_error_journal
744: 
745:                 init_service S_ERROR_JOURNAL, InMemoryErrorJournal
746:             end

Builds the ExpressionMap (the mapping between expression names and expression implementations).

[Source]

     # File lib/openwfe/engine/engine.rb, line 664
664:             def build_expression_map
665: 
666:                 @application_context[S_EXPRESSION_MAP] = ExpressionMap.new
667:                     #
668:                     # the expression map is not a Service anymore,
669:                     # it's a simple instance (that will be reused in other
670:                     # OpenWFEru components)
671:             end

Builds the OpenWFEru expression pool (the core of the engine) and binds it in the engine context. There is only one implementation of the expression pool, so this method is usually never overriden.

[Source]

     # File lib/openwfe/engine/engine.rb, line 699
699:             def build_expression_pool
700: 
701:                 init_service S_EXPRESSION_POOL, ExpressionPool
702:             end

The implementation here builds an InMemoryExpressionStorage instance.

See FilePersistedEngine or CachedFilePersistedEngine for overrides of this method.

[Source]

     # File lib/openwfe/engine/engine.rb, line 711
711:             def build_expression_storage
712: 
713:                 init_service S_EXPRESSION_STORAGE, InMemoryExpressionStorage
714:             end

The ParticipantMap is a mapping between participant names (well rather regular expressions) and participant implementations (see openwferu.rubyforge.org/participants.html)

[Source]

     # File lib/openwfe/engine/engine.rb, line 721
721:             def build_participant_map
722: 
723:                 init_service S_PARTICIPANT_MAP, ParticipantMap
724:             end

There is only one Scheduler implementation, that‘s the one built and bound here.

[Source]

     # File lib/openwfe/engine/engine.rb, line 730
730:             def build_scheduler
731: 
732:                 scheduler = Rufus::Scheduler.new
733: 
734:                 @application_context[S_SCHEDULER] = scheduler
735: 
736:                 scheduler.start
737:             end

This implementation builds a KotobaWfidGenerator instance and binds it in the engine context. There are other WfidGeneration implementations available, like UuidWfidGenerator or FieldWfidGenerator.

[Source]

     # File lib/openwfe/engine/engine.rb, line 679
679:             def build_wfid_generator
680: 
681:                 #init_service S_WFID_GENERATOR, DefaultWfidGenerator
682:                 #init_service S_WFID_GENERATOR, UuidWfidGenerator
683:                 init_service S_WFID_GENERATOR, KotobaWfidGenerator
684: 
685:                 #g = FieldWfidGenerator.new(
686:                 #    S_WFID_GENERATOR, @application_context, "wfid")
687:                     #
688:                     # showing how to initialize a FieldWfidGenerator that
689:                     # will take as workflow instance id the value found in
690:                     # the field "wfid" of the LaunchItem.
691:             end

Turns the raw launch request info into a LaunchItem instance.

[Source]

     # File lib/openwfe/engine/engine.rb, line 751
751:             def extract_launchitem (launch_object)
752: 
753:                 if launch_object.kind_of?(OpenWFE::LaunchItem)
754: 
755:                     launch_object
756: 
757:                 elsif launch_object.kind_of?(Class)
758: 
759:                     LaunchItem.new launch_object
760: 
761:                 elsif launch_object.kind_of?(String)
762: 
763:                     li = OpenWFE::LaunchItem.new
764: 
765:                     if launch_object[0, 1] == '<' or launch_object.index("\n")
766: 
767:                         li.workflow_definition_url = "field:__definition"
768:                         li['__definition'] = launch_object
769: 
770:                     else
771: 
772:                         li.workflow_definition_url = launch_object
773:                     end
774: 
775:                     li
776:                 end
777:             end

[Validate]