| Class | OpenWFE::Journal |
| In: |
lib/openwfe/expool/journal.rb
|
| Parent: | Service |
Keeping a replayable track of the events in an OpenWFEru engine
| FREQ | = | "1m" |
| donedir | [R] | |
| workdir | [R] |
once per minute, makes sure the buckets are flushed
# File lib/openwfe/expool/journal.rb, line 67
67: def initialize (service_name, application_context)
68:
69: super
70:
71: @buckets = {}
72:
73: @workdir = get_work_directory + "/journal"
74: @donedir = @workdir + "/done"
75:
76: FileUtils.makedirs(@donedir) unless File.exist?(@donedir)
77:
78: get_expression_pool.add_observer(:all) do |event, *args|
79: #ldebug { ":#{event} for #{args[0].class.name}" }
80: queue_event(event, *args)
81: end
82:
83: @thread_id = get_scheduler.schedule_every(FREQ) do
84: flush_buckets()
85: end
86: end
Will flush the journal of every open instance.
# File lib/openwfe/expool/journal.rb, line 91
91: def stop
92: get_scheduler.unschedule(@thread_id) if @thread_id
93: flush_buckets()
94: end
Makes sure that all the buckets are persisted to disk
# File lib/openwfe/expool/journal.rb, line 157
157: def flush_buckets
158:
159: count = 0
160:
161: synchronize do
162:
163: @buckets.each do |k, v|
164: v.flush
165: count += 1
166: end
167: @buckets.clear
168: end
169:
170: linfo { "flush_buckets() flushed #{count} buckets" } \
171: if count > 0
172: end
# File lib/openwfe/expool/journal.rb, line 174
174: def get_bucket (wfid)
175: @buckets[wfid] ||= Bucket.new(get_path(wfid))
176: end
# File lib/openwfe/expool/journal.rb, line 184
184: def get_path (wfid)
185: @workdir + "/" + wfid.to_s + ".journal"
186: end
Queues the events before a flush.
If the event is a :terminate, the individual bucket will get flushed.
# File lib/openwfe/expool/journal.rb, line 104
104: def queue_event (event, *args)
105:
106: #ldebug { "queue_event() :#{event}" }
107:
108: return if event == :stop
109: return if event == :launch
110: return if event == :reschedule
111:
112: wfid = extract_fei(args[0]).parent_wfid
113: #
114: # maybe args[0] could be a FlowExpression instead
115: # of a FlowExpressionId instance
116: #puts "___#{event}__wfid : #{wfid}"
117:
118: e = serialize_event(event, *args)
119:
120: bucket = nil
121:
122: synchronize do
123:
124: bucket = get_bucket(wfid)
125: bucket << e
126:
127: #ldebug { "queue_event() bucket : #{bucket.object_id}" }
128:
129: if event == :terminate
130:
131: bucket.flush
132: @buckets.delete(wfid)
133: end
134: end
135: #
136: # minimizing the sync block
137:
138: # TODO : spin that off this thread, to the
139: # flush thread...
140: #
141: if event == :terminate
142: if @application_context[:keep_journals] == true
143: #
144: # 'move' journal to the done/ subdir of journal/
145: #
146: FileUtils.cp(
147: bucket.path,
148: @donedir + "/" + File.basename(bucket.path))
149: end
150: FileUtils.rm bucket.path
151: end
152: end