Module | OpenWFE::ThreadedStorageMixin |
In: |
lib/openwfe/expool/threadedexpstorage.rb
|
This mixin gathers all the logic for a threaded expression storage, one that doesn‘t immediately stores workitems (removes overriding operations). Using this threaded storage brings a very important perf benefit.
THREADED_FREQ | = | "427" |
calls process_queue() before the call the super class each() method.
# File lib/openwfe/expool/threadedexpstorage.rb, line 73 73: def find_expressions (options) 74: 75: process_queue 76: super 77: end
Adds the queue() method as an observer to the update and remove events of the expression pool. :update and :remove mean changes to expressions in the persistence that‘s why they are observed.
# File lib/openwfe/expool/threadedexpstorage.rb, line 166 166: def observe_expool 167: 168: get_expression_pool.add_observer(:update) do |event, fei, fe| 169: ldebug { ":update for #{fei.to_debug_s}" } 170: queue event, fei, fe 171: end 172: get_expression_pool.add_observer(:remove) do |event, fei| 173: ldebug { ":remove for #{fei.to_debug_s}" } 174: queue event, fei 175: end 176: end
the actual "do persist" order
# File lib/openwfe/expool/threadedexpstorage.rb, line 115 115: def process_queue 116: 117: return unless @events.size > 0 118: # 119: # trying to exit as quickly as possible 120: 121: ldebug do 122: "process_queue() #{@events.size} events #{@op_count} ops" 123: end 124: 125: synchronize do 126: @events.each_value do |v| 127: event = v[0] 128: begin 129: if event == :update 130: self[v[1]] = v[2] 131: else 132: safe_delete(v[1]) 133: end 134: rescue Exception => e 135: lwarn do 136: "process_queue() ':#{event}' exception\n" + 137: OpenWFE::exception_to_s(e) 138: end 139: end 140: end 141: @op_count = 0 142: @events.clear 143: end 144: end
queues an event for later (well within a second) persistence
# File lib/openwfe/expool/threadedexpstorage.rb, line 97 97: def queue (event, fei, fe=nil) 98: synchronize do 99: 100: old_size = @events.size 101: @op_count += 1 102: 103: @events[fei] = [ event, fei, fe ] 104: 105: ldebug do 106: "queue() ops #{@op_count} "+ 107: "size #{old_size} -> #{@events.size}" 108: end 109: end 110: end
a call to delete that tolerates missing .yaml files
# File lib/openwfe/expool/threadedexpstorage.rb, line 149 149: def safe_delete (fei) 150: begin 151: self.delete(fei) 152: rescue Exception => e 153: # lwarn do 154: # "safe_delete() exception\n" + 155: # OpenWFE::exception_to_s(e) 156: # end 157: end 158: end