Class OpenWFE::ExpressionPool
In: lib/openwfe/expool/expressionpool.rb
Parent: Object

The ExpressionPool stores expressions (pieces of workflow instance). It‘s the core of the workflow engine. It relies on an expression storage for actual persistence of the expressions.

Methods

Included Modules

ServiceMixin OwfeServiceLocator OwfeObservable WorkqueueMixin FeiMixin MonitorMixin

Attributes

paused_instances  [R]  The hash containing the wfid of the process instances currently paused.

Public Class methods

The constructor for the expression pool.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 104
104:         def initialize (service_name, application_context)
105: 
106:             super()
107:             
108:             service_init(service_name, application_context)
109: 
110:             @paused_instances = {}
111: 
112:             @monitors = MonitorProvider.new(application_context)
113: 
114:             @observers = {}
115: 
116:             @stopped = false
117: 
118:             engine_environment_id
119:                 # makes sure it's called now
120: 
121:             start_workqueue
122:         end

Public Instance methods

Applies a given expression (id or expression)

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 347
347:         def apply (exp, workitem)
348: 
349:             queue_work :do_apply, exp, workitem
350:             #do_apply exp, workitem
351:         end

Cancels the given expression. The param might be an expression instance or a FlowExpressionId instance.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 367
367:         def cancel (exp)
368: 
369:             exp, fei = fetch exp
370: 
371:             unless exp
372:                 ldebug { "cancel() cannot cancel missing  #{fei.to_debug_s}" }
373:                 return nil
374:             end
375: 
376:             ldebug { "cancel() for  #{fei.to_debug_s}" }
377: 
378:             onotify :cancel, exp
379: 
380:             inflowitem = exp.cancel()
381:             remove exp
382: 
383:             inflowitem
384:         end

Cancels the given expression and makes sure to resume the flow if the expression or one of its children were active.

If the cancelled branch was not active, this method will take care of removing the cancelled expression from the parent expression.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 394
394:         def cancel_expression (exp)
395: 
396:             exp = fetch_expression exp
397: 
398:             wi = cancel exp
399: 
400:             if wi
401:                 reply_to_parent exp, wi, false
402:             else
403:                 parent_exp = fetch_expression exp.parent_id
404:                 parent_exp.remove_child(exp.fei) if parent_exp
405:             end
406:         end
cancel_flow(exp_or_wfid)

Alias for cancel_process

Given any expression of a process, cancels the complete process instance.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 412
412:         def cancel_process (exp_or_wfid)
413: 
414:             wfid = extract_wfid exp_or_wfid, false
415: 
416:             ldebug { "cancel_process() '#{wfid}'" }
417: 
418:             root = fetch_root wfid
419: 
420:             raise "no process to cancel '#{wfid}'" unless root
421: 
422:             cancel root
423:         end

Gets the process definition (if necessary) and turns into into an expression tree (for storing into a RawExpression).

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 768
768:         def determine_rep (param)
769: 
770:             param = read_uri(param) if param.is_a?(URI)
771: 
772:             DefParser.parse param
773:         end

Returns the unique engine_environment FlowExpressionId instance. There is only one such environment in an engine, hence this ‘singleton’ method.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 666
666:         def engine_environment_id
667:             #synchronize do 
668:                 # no need, it's been already called at initialization
669: 
670:             return @eei if @eei
671: 
672:             @eei = FlowExpressionId.new
673:             @eei.owfe_version = OPENWFERU_VERSION
674:             @eei.engine_id = get_engine.service_name
675:             @eei.initial_engine_id = @eei.engine_id
676:             @eei.workflow_definition_url = 'ee'
677:             @eei.workflow_definition_name = 'ee'
678:             @eei.workflow_definition_revision = '0'
679:             @eei.workflow_instance_id = '0'
680:             @eei.expression_name = EN_ENVIRONMENT
681:             @eei.expression_id = '0'
682:             @eei
683:             #end
684:         end

Fetches a FlowExpression from the pool. Returns a tuple : the FlowExpression plus its FlowExpressionId.

The param ‘exp’ may be a FlowExpressionId or a FlowExpression that has to be reloaded.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 539
539:         def fetch (exp)
540:             synchronize do
541: 
542:                 #ldebug { "fetch() exp is of kind #{exp.class}" }
543: 
544:                 fei = if exp.is_a?(FlowExpression)
545: 
546:                     exp.fei 
547: 
548:                 elsif not exp.is_a?(FlowExpressionId)
549: 
550:                     raise \
551:                         "Cannot fetch expression with key : "+
552:                         "'#{fei}' (#{fei.class})"
553: 
554:                 else
555: 
556:                     exp
557:                 end
558: 
559:                 #ldebug { "fetch() for  #{fei.to_debug_s}" }
560: 
561:                 [ get_expression_storage[fei], fei ]
562:             end
563:         end

Returns the engine environment (the top level environment)

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 580
580:         def fetch_engine_environment
581:             synchronize do
582:                 #
583:                 # synchronize to ensure that there's 1! engine env
584: 
585:                 eei = engine_environment_id
586:                 ee, fei = fetch eei
587: 
588:                 return ee if ee
589: 
590:                 ee = Environment.new_env(
591:                     eei, nil, nil, @application_context, nil)
592: 
593:                 ee.store_itself
594: 
595:                 ee
596:             end
597:         end

Fetches a FlowExpression (returns only the FlowExpression instance)

The param ‘exp’ may be a FlowExpressionId or a FlowExpression that has to be reloaded.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 571
571:         def fetch_expression (exp)
572: 
573:             exp, fei = fetch exp
574:             exp
575:         end

Fetches the root expression of a process (or a subprocess).

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 602
602:         def fetch_root (wfid)
603: 
604:             get_expression_storage.fetch_root wfid
605:         end

Forgets the given expression (makes sure to substitute its parent_id with the GONE_PARENT_ID constant)

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 430
430:         def forget (parent_exp, exp)
431: 
432:             exp, fei = fetch exp
433: 
434:             #ldebug { "forget() forgetting  #{fei}" }
435: 
436:             return if not exp
437: 
438:             onotify :forget, exp
439: 
440:             parent_exp.children.delete(fei)
441: 
442:             exp.parent_id = GONE_PARENT_ID
443:             exp.dup_environment
444:             exp.store_itself()
445: 
446:             ldebug { "forget() forgot      #{fei}" }
447:         end

Obtains a unique monitor for an expression. It avoids the need for the FlowExpression instances to include the monitor mixin by themselves

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 143
143:         def get_monitor (fei)
144: 
145:             @monitors[fei]
146:         end

Returns true if the process instance to which the expression belongs is currently paused.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 779
779:         def is_paused? (expression)
780: 
781:             (@paused_instances[expression.fei.parent_wfid] != nil)
782:         end

Instantiates a workflow definition and launches it.

This method call will return immediately, it could even return before the actual launch is completely over.

Returns the FlowExpressionId instance of the root expression of the newly launched flow.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 196
196:         def launch (launchitem, options={})
197: 
198:             #
199:             # prepare raw expression
200: 
201:             raw_expression = prepare_raw_expression launchitem
202:                 #
203:                 # will raise an exception if there are requirements
204:                 # and one of them is not met
205: 
206:             raw_expression.new_environment
207:                 #
208:                 # as this expression is the root of a new process instance,
209:                 # it has to have an environment for all the variables of 
210:                 # the process instance
211: 
212:             raw_expression = wrap_in_schedule(raw_expression, options) \
213:                 if options.size > 0
214: 
215:             fei = raw_expression.fei
216: 
217:             #
218:             # apply prepared raw expression
219: 
220:             wi = build_workitem launchitem
221: 
222:             onotify :launch, fei, launchitem
223: 
224:             apply raw_expression, wi
225: 
226:             fei
227:         end

launches a subprocess

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 307
307:         def launch_template (
308:             requesting_expression, 
309:             env_id, 
310:             sub_id, 
311:             template, 
312:             workitem, 
313:             params=nil)
314: 
315:             rawexp = prepare_from_template(
316:                 requesting_expression, env_id, sub_id, template, params)
317: 
318:             workitem.flow_expression_id = rawexp.fei
319: 
320:             onotify :launch_template, rawexp.fei, workitem
321: 
322:             apply rawexp, workitem
323: 
324:             rawexp.fei
325:         end

Lists all workflows (processes) currently in the expool (in the engine). This method will return a list of "process-definition" expressions (root of flows).

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 720
720:         def list_processes (options={})
721: 
722:             options[:include_classes] = DefineExpression
723:                 #
724:                 # Maybe it would be better to list root expressions instead
725:                 # so that expressions like 'sequence' can be used
726:                 # as root expressions. Later...
727: 
728:             get_expression_storage.find_expressions options
729:         end

This method is called when apply() or reply() failed for an expression. There are currently only two ‘users’, the ParticipantExpression class and the do_process_workelement method of this ExpressionPool class.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 738
738:         def notify_error (error, fei, message, workitem)
739: 
740:             fei = extract_fei fei
741:                 # densha requires that... :(
742: 
743:             se = OpenWFE::exception_to_s error
744: 
745:             onotify :error, fei, message, workitem, error.class.name, se
746: 
747:             #fei = extract_fei fei
748: 
749:             if error.is_a?(PausedError)
750:                 lwarn do
751:                     "#{self.service_name} " +
752:                     "operation :#{message.to_s} on #{fei.to_s} " +
753:                     "delayed because process '#{fei.wfid}' is in pause"
754:                 end
755:             else
756:                 lwarn do
757:                     "#{self.service_name} " +
758:                     "operation :#{message.to_s} on #{fei.to_s} " +
759:                     "failed with\n" + se
760:                 end
761:             end
762:         end

Prepares a raw expression from a template. Returns that raw expression.

Used in the concurrent-iterator when building up the children list and of course used by the launch_template() method.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 236
236:         def prepare_from_template (
237:             requesting_expression, env_id, sub_id, template, params=nil)
238: 
239:             rawexp = if template.is_a?(RawExpression)
240: 
241:                 template.application_context = @application_context
242:                 template.dup
243: 
244:             elsif template.is_a?(FlowExpressionId)
245: 
246:                 fetch_expression(template).dup
247: 
248:             else
249: 
250:                 build_raw_expression nil, template
251:             end
252: 
253:             #raise "did not find subprocess in : #{template.to_s}"              #    unless rawexp
254: 
255:             rawexp.fei = rawexp.fei.dup
256: 
257:             if requesting_expression == nil
258: 
259:                 # no parent, brand new process instance
260: 
261:                 rawexp.parent_id = nil
262:                 rawexp.fei.workflow_instance_id = get_wfid_generator.generate
263: 
264:             elsif requesting_expression.kind_of?(FlowExpressionId)
265: 
266:                 rawexp.parent_id = requesting_expression
267:                 rawexp.fei.workflow_instance_id = \
268:                     "#{requesting_expression.workflow_instance_id}.#{sub_id}"
269: 
270:             elsif requesting_expression.kind_of?(String)
271: 
272:                 rawexp.parent_id = nil
273:                 rawexp.fei.workflow_instance_id = \
274:                     "#{requesting_expression}.#{sub_id}"
275: 
276:             else # kind is FlowExpression
277: 
278:                 rawexp.parent_id = requesting_expression.fei
279:                 rawexp.fei.workflow_instance_id = \
280:                     "#{requesting_expression.fei.workflow_instance_id}.#{sub_id}"
281:             end
282: 
283:             #ldebug do
284:             #    "launch_template() spawning wfid " +
285:             #    "#{rawexp.fei.workflow_instance_id.to_s}"
286:             #end
287: 
288:             if env_id
289: 
290:                 rawexp.environment_id = env_id
291:             else
292:                 #
293:                 # the new scope gets its own environment
294:                 #
295:                 rawexp.new_environment params
296:             end
297: 
298:             rawexp.store_itself
299: 
300:             rawexp
301:         end

This method is called by the launch method. It‘s actually the first stage of that method. It may be interessant to use to ‘validate’ a launchitem and its process definition, as it will raise an exception in case of ‘parameter’ mismatch.

There is a ‘pre_launch_check’ alias for this method in the Engine class.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 158
158:         def prepare_raw_expression (launchitem)
159: 
160:             wfdurl = launchitem.workflow_definition_url
161: 
162:             raise "launchitem.workflow_definition_url not set, cannot launch" \
163:                 unless wfdurl
164: 
165:             definition = if wfdurl.match "^field:"
166: 
167:                 wfdfield = wfdurl[6..-1]
168:                 launchitem.attributes.delete wfdfield
169:             else
170: 
171:                 read_uri wfdurl
172:             end
173: 
174:             raise "didn't find process definition at '#{wfdurl}'" \
175:                 unless definition
176: 
177:             raw_expression = build_raw_expression launchitem, definition
178: 
179:             raw_expression.check_parameters launchitem
180:                 #
181:                 # will raise an exception if there are requirements
182:                 # and one of them is not met
183: 
184:             raw_expression
185:         end

Returns the list of applied expressions belonging to a given workflow instance.

If the unapplied optional parameter is set to true, all the expressions (even those not yet applied) that compose the process instance will be returned. Environments will be returned as well.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 694
694:         def process_stack (wfid, unapplied=false)
695: 
696:             #raise "please provide a non-nil workflow instance id"              #    unless wfid
697: 
698:             wfid = extract_wfid wfid, true
699: 
700:             params = {
701:                 #:exclude_classes => [ Environment, RawExpression ],
702:                 #:exclude_classes => [ Environment ],
703:                 :parent_wfid => wfid
704:             }
705:             params[:applied] = true if (not unapplied)
706: 
707:             get_expression_storage.find_expressions params
708: 
709:             # TODO : (maybe) add raw_representation method to result
710:             #        (only if unapplied set to true)
711:         end

Removes a flow expression from the pool (This method is mainly called from the pool itself)

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 611
611:         def remove (exp)
612: 
613:             exp, _fei = fetch(exp) \
614:                 if exp.is_a?(FlowExpressionId)
615: 
616:             return unless exp
617: 
618:             ldebug { "remove() fe  #{exp.fei.to_debug_s}" }
619: 
620:             onotify :remove, exp.fei
621: 
622:             synchronize do
623: 
624:                 @monitors.delete(exp.fei)
625: 
626:                 remove_environment(exp.environment_id) \
627:                     if exp.owns_its_environment?
628:             end
629:         end

Replies to a given expression

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 356
356:         def reply (exp, workitem)
357: 
358:             queue_work :do_reply, exp, workitem
359:             #do_reply exp, workitem
360:         end

Replies to the parent of the given expression.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 452
452:         def reply_to_parent (exp, workitem, remove=true)
453: 
454:             ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" }
455: 
456:             workitem.last_expression_id = exp.fei
457: 
458:             onotify :reply_to_parent, exp, workitem
459: 
460:             if remove
461: 
462:                 remove(exp)
463:                     #
464:                     # remove the expression itself
465: 
466:                 exp.clean_children()
467:                     #
468:                     # remove all the children of the expression
469:             end
470: 
471:             #
472:             # manage tag, have to remove it so it can get 'redone' or 'undone'
473:             # (preventing abuse)
474: 
475:             tagname = exp.attributes["tag"] if exp.attributes
476: 
477:             exp.delete_variable(tagname) if tagname
478: 
479:             #
480:             # flow terminated ?
481: 
482:             if not exp.parent_id
483: 
484:                 ldebug do 
485:                     "reply_to_parent() process " +
486:                     "#{exp.fei.workflow_instance_id} terminated"
487:                 end
488: 
489:                 onotify :terminate, exp, workitem
490: 
491:                 return
492:             end
493: 
494:             #
495:             # else, gone parent ?
496: 
497:             if exp.parent_id == GONE_PARENT_ID
498: 
499:                 ldebug do
500:                     "reply_to_parent() parent is gone for  " +
501:                     exp.fei.to_debug_s
502:                 end
503: 
504:                 return
505:             end
506: 
507:             #
508:             # parent still present, reply to it
509: 
510:             reply exp.parent_id, workitem
511:         end

This method is called at each expool (engine) [re]start. It roams through the previously saved (persisted) expressions to reschedule ones like ‘sleep’ or ‘cron’.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 636
636:         def reschedule
637: 
638:             return if @stopped
639: 
640:             synchronize do
641: 
642:                 t = OpenWFE::Timer.new
643: 
644:                 linfo { "reschedule() initiating..." }
645: 
646:                 options = { :include_classes => Rufus::Schedulable }
647: 
648:                 get_expression_storage.find_expressions(options).each do |fexp|
649: 
650:                     linfo { "reschedule() for  #{fexp.fei.to_s}..." }
651: 
652:                     onotify :reschedule, fexp.fei
653: 
654:                     fexp.reschedule get_scheduler
655:                 end
656: 
657:                 linfo { "reschedule() done. (took #{t.duration} ms)" }
658:             end
659:         end

Stops this expression pool (especially its workqueue).

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 127
127:         def stop
128: 
129:             @stopped = true
130: 
131:             stop_workqueue
132:                 #
133:                 # flushes the work queue
134: 
135:             onotify :stop
136:         end

Adds or updates a flow expression in this pool

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 516
516:         def update (flow_expression)
517: 
518:             ldebug { "update() for #{flow_expression.fei.to_debug_s}" }
519: 
520:             #t = Timer.new
521: 
522:             onotify :update, flow_expression.fei, flow_expression
523: 
524:             #ldebug do 
525:             #    "update() took #{t.duration} ms  " +
526:             #    "#{flow_expression.fei.to_debug_s}"
527:             #end
528: 
529:             flow_expression
530:         end

Protected Instance methods

Builds the RawExpression instance at the root of the flow being launched.

The param can be a template or a definition (anything accepted by the determine_representation() method).

[Source]

      # File lib/openwfe/expool/expressionpool.rb, line 1041
1041:             def build_raw_expression (launchitem, param)
1042: 
1043:                 procdef = determine_rep param
1044: 
1045:                 atts = procdef[1]
1046:                 flow_name = atts['name'] || "noname"
1047:                 flow_revision = atts['revision'] || "0"
1048:                 exp_name = procdef.first
1049: 
1050:                 fei = new_fei launchitem, flow_name, flow_revision, exp_name
1051: 
1052:                 RawExpression.new_raw(
1053:                     fei, nil, nil, @application_context, procdef)
1054:             end

Prepares a new instance of InFlowWorkItem from a LaunchItem instance.

[Source]

      # File lib/openwfe/expool/expressionpool.rb, line 998
 998:             def build_workitem (launchitem)
 999: 
1000:                 wi = InFlowWorkItem.new
1001: 
1002:                 wi.attributes = launchitem.attributes.dup
1003: 
1004:                 wi
1005:             end

Will raise an exception if the expression belongs to a paused process.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 887
887:             def check_if_paused (expression)
888: 
889:                 wfid = expression.fei.parent_wfid
890: 
891:                 raise PausedError.new(wfid) if @paused_instances[wfid]
892:             end

The real apply work.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 830
830:             def do_apply (exp, workitem)
831: 
832:                 exp, _fei = fetch(exp) if exp.is_a?(FlowExpressionId)
833: 
834:                 #ldebug { "apply()  '#{_fei}'" }
835: 
836:                 if not exp
837: 
838:                     #raise "apply() cannot apply missing  #{_fei.to_debug_s}"
839:                         # not very helpful anyway
840: 
841:                     lwarn do 
842:                         "do_apply() cannot apply missing  #{_fei.to_debug_s}"
843:                     end
844:                     return
845:                 end
846: 
847:                 check_if_paused exp
848: 
849:                 workitem.flow_expression_id = exp.fei
850: 
851:                 onotify :apply, exp, workitem
852: 
853:                 exp.apply workitem
854:             end

This method is called by the workqueue when processing the atomic work operations.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 813
813:             def do_process_workelement elt
814: 
815:                 message, fei, workitem = elt
816: 
817:                 begin
818: 
819:                     send message, fei, workitem
820: 
821:                 rescue Exception => e
822: 
823:                     notify_error e, fei, message, workitem
824:                 end
825:             end

The real reply work is done here

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 859
859:             def do_reply (exp, workitem)
860: 
861:                 exp, fei = fetch exp
862: 
863:                 ldebug { "reply() to   #{fei.to_debug_s}" }
864:                 ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" }
865: 
866:                 if not exp
867: 
868:                     #raise "cannot reply to missing  #{fei.to_debug_s}"
869: 
870:                     lwarn do 
871:                         "reply() cannot reply to missing  #{fei.to_debug_s}"
872:                     end
873:                     return
874:                 end
875: 
876:                 check_if_paused exp
877: 
878:                 onotify :reply, exp, workitem
879: 
880:                 exp.reply workitem 
881:             end

Builds a FlowExpressionId instance for a process being launched.

[Source]

      # File lib/openwfe/expool/expressionpool.rb, line 1011
1011:             def new_fei (launchitem, flow_name, flow_revision, exp_name)
1012: 
1013:                 url = if launchitem
1014:                     launchitem.workflow_definition_url
1015:                 else
1016:                     "no-url"
1017:                 end
1018: 
1019:                 fei = FlowExpressionId.new
1020: 
1021:                 fei.owfe_version = OPENWFERU_VERSION
1022:                 fei.engine_id = OpenWFE::stu get_engine.service_name
1023:                 fei.initial_engine_id = OpenWFE::stu fei.engine_id
1024:                 fei.workflow_definition_url = OpenWFE::stu url
1025:                 fei.workflow_definition_name = OpenWFE::stu flow_name
1026:                 fei.workflow_definition_revision = OpenWFE::stu flow_revision
1027:                 fei.wfid = get_wfid_generator.generate launchitem
1028:                 fei.expression_id = "0"
1029:                 fei.expression_name = exp_name
1030: 
1031:                 fei
1032:             end

This is the only point in the expression pool where an URI is read, so this is where the :remote_definitions_allowed security check is enforced.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 791
791:             def read_uri (uri)
792: 
793:                 uri = URI.parse uri.to_s
794: 
795:                 raise "loading remote definitions is not allowed" \
796:                     if (ac[:remote_definitions_allowed] != true and
797:                         uri.scheme and
798:                         uri.scheme != 'file')
799: 
800:                 #open(uri.to_s).read
801: 
802:                 f = Rufus::Verbs.fopen uri
803:                 result = f.read
804:                 f.close if f.respond_to?(:close)
805: 
806:                 result
807:             end

Removes an environment, especially takes care of unbinding any special value it may contain.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 977
977:             def remove_environment (environment_id)
978: 
979:                 ldebug { "remove_environment()  #{environment_id.to_debug_s}" }
980: 
981:                 env, fei = fetch(environment_id)
982: 
983:                 return unless env
984:                     #
985:                     # env already unbound and removed
986: 
987:                 env.unbind
988: 
989:                 #get_expression_storage().delete(environment_id)
990: 
991:                 onotify :remove, environment_id
992:             end

if the launch method is called with a schedule option (like :at, :in, :cron and :every), this method takes care of wrapping the process with a sleep or a cron.

[Source]

     # File lib/openwfe/expool/expressionpool.rb, line 899
899:             def wrap_in_schedule (raw_expression, options)
900: 
901:                 oat = options[:at]
902:                 oin = options[:in]
903:                 ocron = options[:cron]
904:                 oevery = options[:every]
905: 
906:                 fei = new_fei nil, "schedlaunch", "0", "sequence"
907: 
908:                 # not very happy with this code, it builds custom
909:                 # wrapping processes manually, maybe there is 
910:                 # a more elegant way, but for now, it's ok.
911: 
912:                 if oat or oin
913: 
914:                     seq = get_expression_map.get_class :sequence
915:                     seq = seq.new_exp fei, nil, nil, application_context, nil
916: 
917:                     att = if oat
918:                         { "until" => oat }
919:                     else #oin
920:                         { "for" => oin }
921:                     end
922:                     att["scheduler-tags"] = "scheduled-launch"
923: 
924:                     sle = get_expression_map.get_class :sleep
925: 
926:                     sle = sle.new_exp(
927:                         fei.dup, fei, nil, application_context, att)
928: 
929:                     sle.fei.expression_id = "0.0"
930:                     sle.fei.expression_name = "sleep"
931: 
932:                     seq.children << sle.fei
933:                     seq.children << raw_expression.fei
934: 
935:                     seq.new_environment
936:                     sle.environment_id = seq.environment_id
937: 
938:                     sle.store_itself
939:                     seq.store_itself
940: 
941:                     raw_expression.store_itself
942:                     raw_expression = seq
943: 
944:                 elsif ocron or oevery
945: 
946:                     fei.expression_name = "cron"
947: 
948:                     att = if ocron
949:                         { "tab" => ocron }
950:                     else #oevery
951:                         { "every" => oevery }
952:                     end
953:                     att["name"] = "//cron_launch__#{fei.wfid}"
954:                     att["scheduler-tags"] = "scheduled-launch"
955: 
956:                     cro = get_expression_map.get_class :cron
957:                     cro = cro.new_exp fei, nil, nil, application_context, att
958: 
959:                     cro.children << raw_expression.fei
960: 
961:                     cro.new_environment
962: 
963:                     cro.store_itself
964: 
965:                     raw_expression.store_itself
966:                     raw_expression = cro
967:                 end
968:                     # else, don't schedule at all
969: 
970:                 raw_expression
971:             end

[Validate]