Module OpenWFE::ThreadedStorageMixin
In: lib/openwfe/expool/threadedexpstorage.rb

This mixin gathers all the logic for a threaded expression storage, one that doesn‘t immediately stores workitems (removes overriding operations). Using this threaded storage brings a very important perf benefit.

Methods

Constants

THREADED_FREQ = "427"

Public Instance methods

calls process_queue() before the call the super class each() method.

[Source]

    # File lib/openwfe/expool/threadedexpstorage.rb, line 73
73:         def find_expressions (options)
74: 
75:             process_queue
76:             super
77:         end

Will take care of stopping the ‘queue processing’ thread.

[Source]

    # File lib/openwfe/expool/threadedexpstorage.rb, line 60
60:         def stop
61: 
62:             get_scheduler.unschedule(@thread_id) if @thread_id
63: 
64:             process_queue
65:                 #
66:                 # flush every remaining events (especially the :delete ones)
67:         end

Protected Instance methods

Adds the queue() method as an observer to the update and remove events of the expression pool. :update and :remove mean changes to expressions in the persistence that‘s why they are observed.

[Source]

     # File lib/openwfe/expool/threadedexpstorage.rb, line 166
166:             def observe_expool
167: 
168:                 get_expression_pool.add_observer(:update) do |event, fei, fe|
169:                     ldebug { ":update  for #{fei.to_debug_s}" }
170:                     queue event, fei, fe
171:                 end
172:                 get_expression_pool.add_observer(:remove) do |event, fei|
173:                     ldebug { ":remove  for #{fei.to_debug_s}" }
174:                     queue event, fei
175:                 end
176:             end

the actual "do persist" order

[Source]

     # File lib/openwfe/expool/threadedexpstorage.rb, line 115
115:             def process_queue
116: 
117:                 return unless @events.size > 0
118:                     #
119:                     # trying to exit as quickly as possible
120: 
121:                 ldebug do 
122:                     "process_queue() #{@events.size} events #{@op_count} ops"
123:                 end
124: 
125:                 synchronize do
126:                     @events.each_value do |v|
127:                         event = v[0]
128:                         begin
129:                             if event == :update
130:                                 self[v[1]] = v[2]
131:                             else
132:                                 safe_delete(v[1])
133:                             end
134:                         rescue Exception => e
135:                             lwarn do
136:                                 "process_queue() ':#{event}' exception\n" + 
137:                                 OpenWFE::exception_to_s(e)
138:                             end
139:                         end
140:                     end
141:                     @op_count = 0
142:                     @events.clear
143:                 end
144:             end

queues an event for later (well within a second) persistence

[Source]

     # File lib/openwfe/expool/threadedexpstorage.rb, line 97
 97:             def queue (event, fei, fe=nil)
 98:                 synchronize do
 99: 
100:                     old_size = @events.size
101:                     @op_count += 1
102: 
103:                     @events[fei] = [ event, fei, fe ]
104: 
105:                     ldebug do 
106:                         "queue() ops #{@op_count} "+
107:                         "size #{old_size} -> #{@events.size}"
108:                     end
109:                 end
110:             end

a call to delete that tolerates missing .yaml files

[Source]

     # File lib/openwfe/expool/threadedexpstorage.rb, line 149
149:             def safe_delete (fei)
150:                 begin
151:                     self.delete(fei)
152:                 rescue Exception => e
153:                 #    lwarn do
154:                 #        "safe_delete() exception\n" + 
155:                 #        OpenWFE::exception_to_s(e)
156:                 #    end
157:                 end
158:             end

starts the thread that does the actual persistence.

[Source]

    # File lib/openwfe/expool/threadedexpstorage.rb, line 84
84:             def start_processing_thread
85: 
86:                 @events = {}
87:                 @op_count = 0
88: 
89:                 @thread_id = get_scheduler.schedule_every THREADED_FREQ do
90:                     process_queue
91:                 end
92:             end

[Validate]