Module OpenWFE::JournalReplay
In: lib/openwfe/expool/journal_replay.rb

The code decicated to replay and reconstitute journal.

Methods

Classes and Modules

Class OpenWFE::JournalReplay::ExpoolState

Public Instance methods

Outputs a report of the each of the main events that the journal traced.

The output goes to the stdout.

The output can be used to determine an offset number for a replay() of the journal.

[Source]

     # File lib/openwfe/expool/journal_replay.rb, line 117
117:         def analyze (file_path)
118: 
119:             states = decompose(file_path)
120: 
121:             states.each do |state|
122: 
123:                 next if state.dynamic.length < 1
124: 
125:                 puts
126:                 puts state.to_s
127:                 puts
128:             end
129:         end

Decomposes the given file_path into a list of states

[Source]

     # File lib/openwfe/expool/journal_replay.rb, line 134
134:         def decompose (file_path)
135: 
136:             do_decompose(load_events(file_path), [], nil, 0)
137:         end

Loads a journal file and return the content as a list of events. This method is made available for unit tests, as a public method it has not much interest.

[Source]

     # File lib/openwfe/expool/journal_replay.rb, line 144
144:         def load_events (file_path)
145: 
146:             File.open(file_path) do |f|
147:                 s = YAML.load_stream f
148:                 s.documents
149:             end
150:         end

Replays a given journal file.

The offset can be determined by running the analyze() method.

If ‘trigger_action’ is set to true, the apply or reply or cancel action found at the given offset will be triggered.

[Source]

     # File lib/openwfe/expool/journal_replay.rb, line 60
 60:         def replay (file_path, offset, trigger_action=false)
 61: 
 62:             states = decompose(file_path)
 63: 
 64:             state = nil
 65: 
 66:             states.each do |s|
 67:                 state = s if s.offset == offset
 68:             end
 69: 
 70:             raise "cannot replay offset #{offset}" unless state
 71: 
 72:             #puts "expstorage size 0 = #{get_expression_storage.size}"
 73: 
 74:             state.static.each do |update|
 75:                 flow_expression = update[3]
 76:                 flow_expression.application_context = @application_context
 77:                 get_expression_pool.update(flow_expression)
 78:             end
 79: 
 80:             get_expression_pool.reschedule
 81: 
 82:             #puts "expstorage size 1 = #{get_expression_storage.size}"
 83: 
 84:             return unless trigger_action
 85: 
 86:             #puts "sds : #{state.dynamic.size}"
 87: 
 88:             state.dynamic.each do |ply|
 89: 
 90:                 message = ply[0]
 91:                 fei = extract_fei(ply[2])
 92:                 wi = ply[3]
 93: 
 94:                 if wi
 95:                     #
 96:                     # apply, reply, reply_to_parent
 97:                     #
 98:                     get_expression_pool.send message, fei, wi
 99:                 else
100:                     #
101:                     # cancel
102:                     #
103:                     get_expression_pool.send message, fei
104:                 end
105:             end
106:         end

Takes an error event (as stored in the journal) and replays it (usually you‘d have to fix the engine conf before replaying the error trigger)

(Make sure to fix the cause of the error before triggering this method)

[Source]

     # File lib/openwfe/expool/journal_replay.rb, line 160
160:         def replay_at_error (error_source_event)
161: 
162:             get_expression_pool.queue_work \
163:                 error_source_event[3], # message (:do_apply for example)
164:                 error_source_event[2], # fei or exp
165:                 error_source_event[4]  # workitem
166: 
167:             # 0 is :error and 1 is the date and time of the error
168: 
169:             linfo do
170:                 fei = extract_fei(error_source_event[2])
171:                 "replay_at_error() #{error_source_event[3]} #{fei}"
172:             end
173:         end

Detects the last error that ocurred for a workflow instance and replays at that point (see replay_at_error).

(Make sure to fix the cause of the error before triggering this method)

[Source]

     # File lib/openwfe/expool/journal_replay.rb, line 182
182:         def replay_at_last_error (wfid)
183: 
184:             events = load_events(get_path(wfid))
185: 
186:             error_event = events.reverse.find do |evt|
187:                 evt[0] == :error
188:             end
189: 
190:             replay_at_error(error_event)
191:         end

Protected Instance methods

[Source]

     # File lib/openwfe/expool/journal_replay.rb, line 195
195:             def do_decompose (events, result, previous_state, offset)
196: 
197:                 current_state = extract_state(events, offset)
198: 
199:                 return result unless current_state
200: 
201:                 result << current_state
202: 
203:                 do_decompose(events, result, current_state, offset + 1)
204:             end

[Source]

     # File lib/openwfe/expool/journal_replay.rb, line 206
206:             def extract_state (file, offset)
207: 
208:                 events = if file.is_a?(String)
209:                     load_events(file)
210:                 else
211:                     file
212:                 end
213: 
214:                 #
215:                 # what to do with Sleep and When ?
216: 
217:                 off = -1
218:                 off = off - offset if offset
219: 
220:                 return nil if (events.size + off < 0)
221: 
222:                 events = events[0..off]
223: 
224:                 date = events[-1][1]
225: 
226:                 participants = {}
227: 
228:                 seen = {}
229:                 static = []
230:                 events.reverse.each do |e|
231: 
232:                     etype = e[0]
233:                     fei = e[2]
234: 
235:                     next if etype == :apply
236:                     next if etype == :reply
237:                     next if etype == :reply_to_parent
238:                     next if etype == :error
239:                     next if seen[fei]
240: 
241:                     seen[fei] = true
242: 
243:                     next if etype == :remove
244: 
245:                     static << e
246: 
247:                     participants[fei] = true \
248:                         if e[3].is_a? OpenWFE::ParticipantExpression
249:                 end
250: 
251:                 seen = {}
252:                 dynamic = []
253:                 events.reverse.each do |e|
254:                     etype = e[0]
255:                     fei = extract_fei e[2]
256:                     next if etype == :update
257:                     next if etype == :remove
258:                     next if etype == :error
259:                     #next if etype == :reply_to_parent
260:                     next if seen[fei]
261:                     next unless participants[fei]
262:                     seen[fei] = true
263:                     dynamic << e
264:                 end
265: 
266:                 ExpoolState.new(offset, date, static, dynamic, participants)
267:             end

[Validate]