Class OpenWFE::Journal
In: lib/openwfe/expool/journal.rb
Parent: Service

Keeping a replayable track of the events in an OpenWFEru engine

Methods

Included Modules

MonitorMixin OwfeServiceLocator JournalReplay FeiMixin

Classes and Modules

Class OpenWFE::Journal::Bucket

Constants

FREQ = "1m"

Attributes

donedir  [R] 
workdir  [R] 

Public Class methods

once per minute, makes sure the buckets are flushed

[Source]

    # File lib/openwfe/expool/journal.rb, line 67
67:         def initialize (service_name, application_context)
68: 
69:             super
70: 
71:             @buckets = {}
72: 
73:             @workdir = get_work_directory + "/journal"
74:             @donedir = @workdir + "/done"
75: 
76:             FileUtils.makedirs(@donedir) unless File.exist?(@donedir)
77: 
78:             get_expression_pool.add_observer(:all) do |event, *args|
79:                 #ldebug { ":#{event}  for #{args[0].class.name}" }
80:                 queue_event(event, *args)
81:             end
82: 
83:             @thread_id = get_scheduler.schedule_every(FREQ) do
84:                 flush_buckets()
85:             end
86:         end

Public Instance methods

Will flush the journal of every open instance.

[Source]

    # File lib/openwfe/expool/journal.rb, line 91
91:         def stop
92:             get_scheduler.unschedule(@thread_id) if @thread_id
93:             flush_buckets()
94:         end

Protected Instance methods

Makes sure that all the buckets are persisted to disk

[Source]

     # File lib/openwfe/expool/journal.rb, line 157
157:             def flush_buckets
158: 
159:                 count = 0
160: 
161:                 synchronize do
162: 
163:                     @buckets.each do |k, v|
164:                         v.flush
165:                         count += 1
166:                     end
167:                     @buckets.clear
168:                 end
169: 
170:                 linfo { "flush_buckets() flushed #{count} buckets" } \
171:                     if count > 0
172:             end

[Source]

     # File lib/openwfe/expool/journal.rb, line 174
174:             def get_bucket (wfid)
175:                 @buckets[wfid] ||= Bucket.new(get_path(wfid))
176:             end

[Source]

     # File lib/openwfe/expool/journal.rb, line 184
184:             def get_path (wfid)
185:                 @workdir + "/" + wfid.to_s + ".journal"
186:             end

Queues the events before a flush.

If the event is a :terminate, the individual bucket will get flushed.

[Source]

     # File lib/openwfe/expool/journal.rb, line 104
104:             def queue_event (event, *args)
105: 
106:                 #ldebug { "queue_event() :#{event}" }
107: 
108:                 return if event == :stop
109:                 return if event == :launch
110:                 return if event == :reschedule
111: 
112:                 wfid = extract_fei(args[0]).parent_wfid
113:                     #
114:                     # maybe args[0] could be a FlowExpression instead
115:                     # of a FlowExpressionId instance
116:                 #puts "___#{event}__wfid : #{wfid}"
117: 
118:                 e = serialize_event(event, *args)
119: 
120:                 bucket = nil
121: 
122:                 synchronize do
123: 
124:                     bucket = get_bucket(wfid) 
125:                     bucket << e
126: 
127:                     #ldebug { "queue_event() bucket : #{bucket.object_id}" }
128: 
129:                     if event == :terminate
130: 
131:                         bucket.flush
132:                         @buckets.delete(wfid)
133:                     end
134:                 end
135:                     #
136:                     # minimizing the sync block
137: 
138:                 # TODO : spin that off this thread, to the
139:                 # flush thread...
140:                 #
141:                 if event == :terminate
142:                     if @application_context[:keep_journals] == true
143:                         #
144:                         # 'move' journal to the done/ subdir of journal/
145:                         #
146:                         FileUtils.cp(
147:                             bucket.path,
148:                             @donedir + "/" + File.basename(bucket.path))
149:                     end
150:                     FileUtils.rm bucket.path
151:                 end
152:             end

[Source]

     # File lib/openwfe/expool/journal.rb, line 178
178:             def serialize_event (event, *args)
179:                 args.insert(0, event)
180:                 args.insert(1, Time.now)
181:                 args.to_yaml
182:             end

[Validate]