Class | OpenWFE::Journal |
In: |
lib/openwfe/expool/journal.rb
|
Parent: | Service |
Keeping a replayable track of the events in an OpenWFEru engine
FREQ | = | "1m" |
donedir | [R] | |
workdir | [R] |
once per minute, makes sure the buckets are flushed
# File lib/openwfe/expool/journal.rb, line 67 67: def initialize (service_name, application_context) 68: 69: super 70: 71: @buckets = {} 72: 73: @workdir = get_work_directory + "/journal" 74: @donedir = @workdir + "/done" 75: 76: FileUtils.makedirs(@donedir) unless File.exist?(@donedir) 77: 78: get_expression_pool.add_observer(:all) do |event, *args| 79: #ldebug { ":#{event} for #{args[0].class.name}" } 80: queue_event(event, *args) 81: end 82: 83: @thread_id = get_scheduler.schedule_every(FREQ) do 84: flush_buckets() 85: end 86: end
Will flush the journal of every open instance.
# File lib/openwfe/expool/journal.rb, line 91 91: def stop 92: get_scheduler.unschedule(@thread_id) if @thread_id 93: flush_buckets() 94: end
Makes sure that all the buckets are persisted to disk
# File lib/openwfe/expool/journal.rb, line 157 157: def flush_buckets 158: 159: count = 0 160: 161: synchronize do 162: 163: @buckets.each do |k, v| 164: v.flush 165: count += 1 166: end 167: @buckets.clear 168: end 169: 170: linfo { "flush_buckets() flushed #{count} buckets" } \ 171: if count > 0 172: end
# File lib/openwfe/expool/journal.rb, line 174 174: def get_bucket (wfid) 175: @buckets[wfid] ||= Bucket.new(get_path(wfid)) 176: end
# File lib/openwfe/expool/journal.rb, line 184 184: def get_path (wfid) 185: @workdir + "/" + wfid.to_s + ".journal" 186: end
Queues the events before a flush.
If the event is a :terminate, the individual bucket will get flushed.
# File lib/openwfe/expool/journal.rb, line 104 104: def queue_event (event, *args) 105: 106: #ldebug { "queue_event() :#{event}" } 107: 108: return if event == :stop 109: return if event == :launch 110: return if event == :reschedule 111: 112: wfid = extract_fei(args[0]).parent_wfid 113: # 114: # maybe args[0] could be a FlowExpression instead 115: # of a FlowExpressionId instance 116: #puts "___#{event}__wfid : #{wfid}" 117: 118: e = serialize_event(event, *args) 119: 120: bucket = nil 121: 122: synchronize do 123: 124: bucket = get_bucket(wfid) 125: bucket << e 126: 127: #ldebug { "queue_event() bucket : #{bucket.object_id}" } 128: 129: if event == :terminate 130: 131: bucket.flush 132: @buckets.delete(wfid) 133: end 134: end 135: # 136: # minimizing the sync block 137: 138: # TODO : spin that off this thread, to the 139: # flush thread... 140: # 141: if event == :terminate 142: if @application_context[:keep_journals] == true 143: # 144: # 'move' journal to the done/ subdir of journal/ 145: # 146: FileUtils.cp( 147: bucket.path, 148: @donedir + "/" + File.basename(bucket.path)) 149: end 150: FileUtils.rm bucket.path 151: end 152: end