Module | OpenWFE::JournalReplay |
In: |
lib/openwfe/expool/journal_replay.rb
|
The code decicated to replay and reconstitute journal.
Outputs a report of the each of the main events that the journal traced.
The output goes to the stdout.
The output can be used to determine an offset number for a replay() of the journal.
# File lib/openwfe/expool/journal_replay.rb, line 117 117: def analyze (file_path) 118: 119: states = decompose(file_path) 120: 121: states.each do |state| 122: 123: next if state.dynamic.length < 1 124: 125: puts 126: puts state.to_s 127: puts 128: end 129: end
Decomposes the given file_path into a list of states
# File lib/openwfe/expool/journal_replay.rb, line 134 134: def decompose (file_path) 135: 136: do_decompose(load_events(file_path), [], nil, 0) 137: end
Loads a journal file and return the content as a list of events. This method is made available for unit tests, as a public method it has not much interest.
# File lib/openwfe/expool/journal_replay.rb, line 144 144: def load_events (file_path) 145: 146: File.open(file_path) do |f| 147: s = YAML.load_stream f 148: s.documents 149: end 150: end
Replays a given journal file.
The offset can be determined by running the analyze() method.
If ‘trigger_action’ is set to true, the apply or reply or cancel action found at the given offset will be triggered.
# File lib/openwfe/expool/journal_replay.rb, line 60 60: def replay (file_path, offset, trigger_action=false) 61: 62: states = decompose(file_path) 63: 64: state = nil 65: 66: states.each do |s| 67: state = s if s.offset == offset 68: end 69: 70: raise "cannot replay offset #{offset}" unless state 71: 72: #puts "expstorage size 0 = #{get_expression_storage.size}" 73: 74: state.static.each do |update| 75: flow_expression = update[3] 76: flow_expression.application_context = @application_context 77: get_expression_pool.update(flow_expression) 78: end 79: 80: get_expression_pool.reschedule 81: 82: #puts "expstorage size 1 = #{get_expression_storage.size}" 83: 84: return unless trigger_action 85: 86: #puts "sds : #{state.dynamic.size}" 87: 88: state.dynamic.each do |ply| 89: 90: message = ply[0] 91: fei = extract_fei(ply[2]) 92: wi = ply[3] 93: 94: if wi 95: # 96: # apply, reply, reply_to_parent 97: # 98: get_expression_pool.send message, fei, wi 99: else 100: # 101: # cancel 102: # 103: get_expression_pool.send message, fei 104: end 105: end 106: end
Takes an error event (as stored in the journal) and replays it (usually you‘d have to fix the engine conf before replaying the error trigger)
(Make sure to fix the cause of the error before triggering this method)
# File lib/openwfe/expool/journal_replay.rb, line 160 160: def replay_at_error (error_source_event) 161: 162: get_expression_pool.queue_work \ 163: error_source_event[3], # message (:do_apply for example) 164: error_source_event[2], # fei or exp 165: error_source_event[4] # workitem 166: 167: # 0 is :error and 1 is the date and time of the error 168: 169: linfo do 170: fei = extract_fei(error_source_event[2]) 171: "replay_at_error() #{error_source_event[3]} #{fei}" 172: end 173: end
Detects the last error that ocurred for a workflow instance and replays at that point (see replay_at_error).
(Make sure to fix the cause of the error before triggering this method)
# File lib/openwfe/expool/journal_replay.rb, line 182 182: def replay_at_last_error (wfid) 183: 184: events = load_events(get_path(wfid)) 185: 186: error_event = events.reverse.find do |evt| 187: evt[0] == :error 188: end 189: 190: replay_at_error(error_event) 191: end
# File lib/openwfe/expool/journal_replay.rb, line 195 195: def do_decompose (events, result, previous_state, offset) 196: 197: current_state = extract_state(events, offset) 198: 199: return result unless current_state 200: 201: result << current_state 202: 203: do_decompose(events, result, current_state, offset + 1) 204: end
# File lib/openwfe/expool/journal_replay.rb, line 206 206: def extract_state (file, offset) 207: 208: events = if file.is_a?(String) 209: load_events(file) 210: else 211: file 212: end 213: 214: # 215: # what to do with Sleep and When ? 216: 217: off = -1 218: off = off - offset if offset 219: 220: return nil if (events.size + off < 0) 221: 222: events = events[0..off] 223: 224: date = events[-1][1] 225: 226: participants = {} 227: 228: seen = {} 229: static = [] 230: events.reverse.each do |e| 231: 232: etype = e[0] 233: fei = e[2] 234: 235: next if etype == :apply 236: next if etype == :reply 237: next if etype == :reply_to_parent 238: next if etype == :error 239: next if seen[fei] 240: 241: seen[fei] = true 242: 243: next if etype == :remove 244: 245: static << e 246: 247: participants[fei] = true \ 248: if e[3].is_a? OpenWFE::ParticipantExpression 249: end 250: 251: seen = {} 252: dynamic = [] 253: events.reverse.each do |e| 254: etype = e[0] 255: fei = extract_fei e[2] 256: next if etype == :update 257: next if etype == :remove 258: next if etype == :error 259: #next if etype == :reply_to_parent 260: next if seen[fei] 261: next unless participants[fei] 262: seen[fei] = true 263: dynamic << e 264: end 265: 266: ExpoolState.new(offset, date, static, dynamic, participants) 267: end