Package pgq :: Module consumer
[frames] | no frames]

Source Code for Module pgq.consumer

  1   
  2  """PgQ consumer framework for Python. 
  3   
  4  API problems(?): 
  5      - process_event() and process_batch() should have db as argument. 
  6      - should ev.tag*() update db immidiately? 
  7   
  8  """ 
  9   
 10  import sys, time, skytools 
 11   
 12  from pgq.event import * 
 13   
 14  __all__ = ['Consumer', 'RemoteConsumer', 'SerialConsumer'] 
 15   
16 -class Consumer(skytools.DBScript):
17 """Consumer base class. 18 """ 19
20 - def __init__(self, service_name, db_name, args):
21 """Initialize new consumer. 22 23 @param service_name: service_name for DBScript 24 @param db_name: name of database for get_database() 25 @param args: cmdline args for DBScript 26 """ 27 28 skytools.DBScript.__init__(self, service_name, args) 29 30 self.db_name = db_name 31 self.reg_list = [] 32 self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name) 33 self.pgq_queue_name = self.cf.get("pgq_queue_name")
34
35 - def attach(self):
36 """Attach consumer to interesting queues.""" 37 res = self.register_consumer(self.pgq_queue_name) 38 return res
39
40 - def detach(self):
41 """Detach consumer from all queues.""" 42 tmp = self.reg_list[:] 43 for q in tmp: 44 self.unregister_consumer(q)
45
46 - def process_event(self, db, event):
47 """Process one event. 48 49 Should be overrided by user code. 50 51 Event should be tagged as done, retry or failed. 52 If not, it will be tagged as for retry. 53 """ 54 raise Exception("needs to be implemented")
55
56 - def process_batch(self, db, batch_id, event_list):
57 """Process all events in batch. 58 59 By default calls process_event for each. 60 Can be overrided by user code. 61 62 Events should be tagged as done, retry or failed. 63 If not, they will be tagged as for retry. 64 """ 65 for ev in event_list: 66 self.process_event(db, ev)
67
68 - def work(self):
69 """Do the work loop, once (internal).""" 70 71 if len(self.reg_list) == 0: 72 self.log.debug("Attaching") 73 self.attach() 74 75 db = self.get_database(self.db_name) 76 curs = db.cursor() 77 78 data_avail = 0 79 for queue in self.reg_list: 80 self.stat_start() 81 82 # acquire batch 83 batch_id = self._load_next_batch(curs, queue) 84 db.commit() 85 if batch_id == None: 86 continue 87 data_avail = 1 88 89 # load events 90 list = self._load_batch_events(curs, batch_id, queue) 91 db.commit() 92 93 # process events 94 self._launch_process_batch(db, batch_id, list) 95 96 # done 97 self._finish_batch(curs, batch_id, list) 98 db.commit() 99 self.stat_end(len(list)) 100 101 # if false, script sleeps 102 return data_avail
103
104 - def register_consumer(self, queue_name):
105 db = self.get_database(self.db_name) 106 cx = db.cursor() 107 cx.execute("select pgq.register_consumer(%s, %s)", 108 [queue_name, self.consumer_id]) 109 res = cx.fetchone()[0] 110 db.commit() 111 112 self.reg_list.append(queue_name) 113 114 return res
115
116 - def unregister_consumer(self, queue_name):
117 db = self.get_database(self.db_name) 118 cx = db.cursor() 119 cx.execute("select pgq.unregister_consumer(%s, %s)", 120 [queue_name, self.consumer_id]) 121 db.commit() 122 123 self.reg_list.remove(queue_name)
124
125 - def _launch_process_batch(self, db, batch_id, list):
126 self.process_batch(db, batch_id, list)
127
128 - def _load_batch_events(self, curs, batch_id, queue_name):
129 """Fetch all events for this batch.""" 130 131 # load events 132 sql = "select * from pgq.get_batch_events(%d)" % batch_id 133 curs.execute(sql) 134 rows = curs.dictfetchall() 135 136 # map them to python objects 137 list = [] 138 for r in rows: 139 ev = Event(queue_name, r) 140 list.append(ev) 141 142 return list
143
144 - def _load_next_batch(self, curs, queue_name):
145 """Allocate next batch. (internal)""" 146 147 q = "select pgq.next_batch(%s, %s)" 148 curs.execute(q, [queue_name, self.consumer_id]) 149 return curs.fetchone()[0]
150
151 - def _finish_batch(self, curs, batch_id, list):
152 """Tag events and notify that the batch is done.""" 153 154 retry = failed = 0 155 for ev in list: 156 if ev.status == EV_FAILED: 157 self._tag_failed(curs, batch_id, ev) 158 failed += 1 159 elif ev.status == EV_RETRY: 160 self._tag_retry(curs, batch_id, ev) 161 retry += 1 162 curs.execute("select pgq.finish_batch(%s)", [batch_id])
163
164 - def _tag_failed(self, curs, batch_id, ev):
165 """Tag event as failed. (internal)""" 166 curs.execute("select pgq.event_failed(%s, %s, %s)", 167 [batch_id, ev.id, ev.fail_reason])
168
169 - def _tag_retry(self, cx, batch_id, ev):
170 """Tag event for retry. (internal)""" 171 cx.execute("select pgq.event_retry(%s, %s, %s)", 172 [batch_id, ev.id, ev.retry_time])
173
174 - def get_batch_info(self, batch_id):
175 """Get info about batch. 176 177 @return: Return value is a dict of: 178 179 - queue_name: queue name 180 - consumer_name: consumers name 181 - batch_start: batch start time 182 - batch_end: batch end time 183 - tick_id: end tick id 184 - prev_tick_id: start tick id 185 - lag: how far is batch_end from current moment. 186 """ 187 db = self.get_database(self.db_name) 188 cx = db.cursor() 189 q = "select queue_name, consumer_name, batch_start, batch_end,"\ 190 " prev_tick_id, tick_id, lag"\ 191 " from pgq.get_batch_info(%s)" 192 cx.execute(q, [batch_id]) 193 row = cx.dictfetchone() 194 db.commit() 195 return row
196
197 - def stat_start(self):
198 self.stat_batch_start = time.time()
199
200 - def stat_end(self, count):
201 t = time.time() 202 self.stat_add('count', count) 203 self.stat_add('duration', t - self.stat_batch_start)
204 205
206 -class RemoteConsumer(Consumer):
207 """Helper for doing event processing in another database. 208 209 Requires that whole batch is processed in one TX. 210 """ 211
212 - def __init__(self, service_name, db_name, remote_db, args):
213 Consumer.__init__(self, service_name, db_name, args) 214 self.remote_db = remote_db
215
216 - def process_batch(self, db, batch_id, event_list):
217 """Process all events in batch. 218 219 By default calls process_event for each. 220 """ 221 dst_db = self.get_database(self.remote_db) 222 curs = dst_db.cursor() 223 224 if self.is_last_batch(curs, batch_id): 225 for ev in event_list: 226 ev.tag_done() 227 return 228 229 self.process_remote_batch(db, batch_id, event_list, dst_db) 230 231 self.set_last_batch(curs, batch_id) 232 dst_db.commit()
233
234 - def is_last_batch(self, dst_curs, batch_id):
235 """Helper function to keep track of last successful batch 236 in external database. 237 """ 238 q = "select pgq_ext.is_batch_done(%s, %s)" 239 dst_curs.execute(q, [ self.consumer_id, batch_id ]) 240 return dst_curs.fetchone()[0]
241
242 - def set_last_batch(self, dst_curs, batch_id):
243 """Helper function to set last successful batch 244 in external database. 245 """ 246 q = "select pgq_ext.set_batch_done(%s, %s)" 247 dst_curs.execute(q, [ self.consumer_id, batch_id ])
248
249 - def process_remote_batch(self, db, batch_id, event_list, dst_db):
250 raise Exception('process_remote_batch not implemented')
251
252 -class SerialConsumer(Consumer):
253 """Consumer that applies batches sequentially in second database. 254 255 Requirements: 256 - Whole batch in one TX. 257 - Must not use retry queue. 258 259 Features: 260 - Can detect if several batches are already applied to dest db. 261 - If some ticks are lost. allows to seek back on queue. 262 Whether it succeeds, depends on pgq configuration. 263 """ 264
265 - def __init__(self, service_name, db_name, remote_db, args):
266 Consumer.__init__(self, service_name, db_name, args) 267 self.remote_db = remote_db 268 self.dst_completed_table = "pgq_ext.completed_tick" 269 self.cur_batch_info = None
270
271 - def startup(self):
272 if self.options.rewind: 273 self.rewind() 274 sys.exit(0) 275 if self.options.reset: 276 self.dst_reset() 277 sys.exit(0) 278 return Consumer.startup(self)
279
280 - def init_optparse(self, parser = None):
281 p = Consumer.init_optparse(self, parser) 282 p.add_option("--rewind", action = "store_true", 283 help = "change queue position according to destination") 284 p.add_option("--reset", action = "store_true", 285 help = "reset queue pos on destination side") 286 return p
287
288 - def process_batch(self, db, batch_id, event_list):
289 """Process all events in batch. 290 """ 291 292 dst_db = self.get_database(self.remote_db) 293 curs = dst_db.cursor() 294 295 self.cur_batch_info = self.get_batch_info(batch_id) 296 297 # check if done 298 if self.is_batch_done(curs): 299 for ev in event_list: 300 ev.tag_done() 301 return 302 303 # actual work 304 self.process_remote_batch(db, batch_id, event_list, dst_db) 305 306 # make sure no retry events 307 for ev in event_list: 308 if ev.status == EV_RETRY: 309 raise Exception("SerialConsumer must not use retry queue") 310 311 # finish work 312 self.set_batch_done(curs) 313 dst_db.commit()
314
315 - def is_batch_done(self, dst_curs):
316 """Helper function to keep track of last successful batch 317 in external database. 318 """ 319 320 prev_tick = self.cur_batch_info['prev_tick_id'] 321 322 q = "select last_tick_id from %s where consumer_id = %%s" % ( 323 self.dst_completed_table ,) 324 dst_curs.execute(q, [self.consumer_id]) 325 res = dst_curs.fetchone() 326 327 if not res or not res[0]: 328 # seems this consumer has not run yet against dst_db 329 return False 330 dst_tick = res[0] 331 332 if prev_tick == dst_tick: 333 # on track 334 return False 335 336 if prev_tick < dst_tick: 337 self.log.warning('Got tick %d, dst has %d - skipping' % (prev_tick, dst_tick)) 338 return True 339 else: 340 self.log.error('Got tick %d, dst has %d - ticks lost' % (prev_tick, dst_tick)) 341 raise Exception('Lost ticks')
342
343 - def set_batch_done(self, dst_curs):
344 """Helper function to set last successful batch 345 in external database. 346 """ 347 tick_id = self.cur_batch_info['tick_id'] 348 q = "delete from %s where consumer_id = %%s; "\ 349 "insert into %s (consumer_id, last_tick_id) values (%%s, %%s)" % ( 350 self.dst_completed_table, 351 self.dst_completed_table) 352 dst_curs.execute(q, [ self.consumer_id, 353 self.consumer_id, tick_id ])
354
355 - def attach(self):
356 new = Consumer.attach(self) 357 if new: 358 self.clean_completed_tick()
359
360 - def detach(self):
361 """If detaching, also clean completed tick table on dest.""" 362 363 Consumer.detach(self) 364 self.clean_completed_tick()
365
366 - def clean_completed_tick(self):
367 self.log.info("removing completed tick from dst") 368 dst_db = self.get_database(self.remote_db) 369 dst_curs = dst_db.cursor() 370 371 q = "delete from %s where consumer_id = %%s" % ( 372 self.dst_completed_table,) 373 dst_curs.execute(q, [self.consumer_id]) 374 dst_db.commit()
375
376 - def process_remote_batch(self, db, batch_id, event_list, dst_db):
377 raise Exception('process_remote_batch not implemented')
378
379 - def rewind(self):
380 self.log.info("Rewinding queue") 381 src_db = self.get_database(self.db_name) 382 dst_db = self.get_database(self.remote_db) 383 src_curs = src_db.cursor() 384 dst_curs = dst_db.cursor() 385 386 q = "select last_tick_id from %s where consumer_id = %%s" % ( 387 self.dst_completed_table,) 388 dst_curs.execute(q, [self.consumer_id]) 389 row = dst_curs.fetchone() 390 if row: 391 dst_tick = row[0] 392 q = "select pgq.register_consumer(%s, %s, %s)" 393 src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick]) 394 else: 395 self.log.warning('No tick found on dst side') 396 397 dst_db.commit() 398 src_db.commit()
399
400 - def dst_reset(self):
401 self.log.info("Resetting queue tracking on dst side") 402 dst_db = self.get_database(self.remote_db) 403 dst_curs = dst_db.cursor() 404 405 q = "delete from %s where consumer_id = %%s" % ( 406 self.dst_completed_table,) 407 dst_curs.execute(q, [self.consumer_id]) 408 dst_db.commit()
409