1
2 """PgQ consumer framework for Python.
3
4 API problems(?):
5 - process_event() and process_batch() should have db as argument.
6 - should ev.tag*() update db immidiately?
7
8 """
9
10 import sys, time, skytools
11
12 from pgq.event import *
13
14 __all__ = ['Consumer', 'RemoteConsumer', 'SerialConsumer']
15
17 """Consumer base class.
18 """
19
20 - def __init__(self, service_name, db_name, args):
21 """Initialize new consumer.
22
23 @param service_name: service_name for DBScript
24 @param db_name: name of database for get_database()
25 @param args: cmdline args for DBScript
26 """
27
28 skytools.DBScript.__init__(self, service_name, args)
29
30 self.db_name = db_name
31 self.reg_list = []
32 self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name)
33 self.pgq_queue_name = self.cf.get("pgq_queue_name")
34
36 """Attach consumer to interesting queues."""
37 res = self.register_consumer(self.pgq_queue_name)
38 return res
39
41 """Detach consumer from all queues."""
42 tmp = self.reg_list[:]
43 for q in tmp:
44 self.unregister_consumer(q)
45
47 """Process one event.
48
49 Should be overrided by user code.
50
51 Event should be tagged as done, retry or failed.
52 If not, it will be tagged as for retry.
53 """
54 raise Exception("needs to be implemented")
55
57 """Process all events in batch.
58
59 By default calls process_event for each.
60 Can be overrided by user code.
61
62 Events should be tagged as done, retry or failed.
63 If not, they will be tagged as for retry.
64 """
65 for ev in event_list:
66 self.process_event(db, ev)
67
69 """Do the work loop, once (internal)."""
70
71 if len(self.reg_list) == 0:
72 self.log.debug("Attaching")
73 self.attach()
74
75 db = self.get_database(self.db_name)
76 curs = db.cursor()
77
78 data_avail = 0
79 for queue in self.reg_list:
80 self.stat_start()
81
82
83 batch_id = self._load_next_batch(curs, queue)
84 db.commit()
85 if batch_id == None:
86 continue
87 data_avail = 1
88
89
90 list = self._load_batch_events(curs, batch_id, queue)
91 db.commit()
92
93
94 self._launch_process_batch(db, batch_id, list)
95
96
97 self._finish_batch(curs, batch_id, list)
98 db.commit()
99 self.stat_end(len(list))
100
101
102 return data_avail
103
105 db = self.get_database(self.db_name)
106 cx = db.cursor()
107 cx.execute("select pgq.register_consumer(%s, %s)",
108 [queue_name, self.consumer_id])
109 res = cx.fetchone()[0]
110 db.commit()
111
112 self.reg_list.append(queue_name)
113
114 return res
115
117 db = self.get_database(self.db_name)
118 cx = db.cursor()
119 cx.execute("select pgq.unregister_consumer(%s, %s)",
120 [queue_name, self.consumer_id])
121 db.commit()
122
123 self.reg_list.remove(queue_name)
124
127
129 """Fetch all events for this batch."""
130
131
132 sql = "select * from pgq.get_batch_events(%d)" % batch_id
133 curs.execute(sql)
134 rows = curs.dictfetchall()
135
136
137 list = []
138 for r in rows:
139 ev = Event(queue_name, r)
140 list.append(ev)
141
142 return list
143
145 """Allocate next batch. (internal)"""
146
147 q = "select pgq.next_batch(%s, %s)"
148 curs.execute(q, [queue_name, self.consumer_id])
149 return curs.fetchone()[0]
150
152 """Tag events and notify that the batch is done."""
153
154 retry = failed = 0
155 for ev in list:
156 if ev.status == EV_FAILED:
157 self._tag_failed(curs, batch_id, ev)
158 failed += 1
159 elif ev.status == EV_RETRY:
160 self._tag_retry(curs, batch_id, ev)
161 retry += 1
162 curs.execute("select pgq.finish_batch(%s)", [batch_id])
163
165 """Tag event as failed. (internal)"""
166 curs.execute("select pgq.event_failed(%s, %s, %s)",
167 [batch_id, ev.id, ev.fail_reason])
168
170 """Tag event for retry. (internal)"""
171 cx.execute("select pgq.event_retry(%s, %s, %s)",
172 [batch_id, ev.id, ev.retry_time])
173
175 """Get info about batch.
176
177 @return: Return value is a dict of:
178
179 - queue_name: queue name
180 - consumer_name: consumers name
181 - batch_start: batch start time
182 - batch_end: batch end time
183 - tick_id: end tick id
184 - prev_tick_id: start tick id
185 - lag: how far is batch_end from current moment.
186 """
187 db = self.get_database(self.db_name)
188 cx = db.cursor()
189 q = "select queue_name, consumer_name, batch_start, batch_end,"\
190 " prev_tick_id, tick_id, lag"\
191 " from pgq.get_batch_info(%s)"
192 cx.execute(q, [batch_id])
193 row = cx.dictfetchone()
194 db.commit()
195 return row
196
198 self.stat_batch_start = time.time()
199
201 t = time.time()
202 self.stat_add('count', count)
203 self.stat_add('duration', t - self.stat_batch_start)
204
205
207 """Helper for doing event processing in another database.
208
209 Requires that whole batch is processed in one TX.
210 """
211
212 - def __init__(self, service_name, db_name, remote_db, args):
215
217 """Process all events in batch.
218
219 By default calls process_event for each.
220 """
221 dst_db = self.get_database(self.remote_db)
222 curs = dst_db.cursor()
223
224 if self.is_last_batch(curs, batch_id):
225 for ev in event_list:
226 ev.tag_done()
227 return
228
229 self.process_remote_batch(db, batch_id, event_list, dst_db)
230
231 self.set_last_batch(curs, batch_id)
232 dst_db.commit()
233
235 """Helper function to keep track of last successful batch
236 in external database.
237 """
238 q = "select pgq_ext.is_batch_done(%s, %s)"
239 dst_curs.execute(q, [ self.consumer_id, batch_id ])
240 return dst_curs.fetchone()[0]
241
243 """Helper function to set last successful batch
244 in external database.
245 """
246 q = "select pgq_ext.set_batch_done(%s, %s)"
247 dst_curs.execute(q, [ self.consumer_id, batch_id ])
248
250 raise Exception('process_remote_batch not implemented')
251
253 """Consumer that applies batches sequentially in second database.
254
255 Requirements:
256 - Whole batch in one TX.
257 - Must not use retry queue.
258
259 Features:
260 - Can detect if several batches are already applied to dest db.
261 - If some ticks are lost. allows to seek back on queue.
262 Whether it succeeds, depends on pgq configuration.
263 """
264
265 - def __init__(self, service_name, db_name, remote_db, args):
266 Consumer.__init__(self, service_name, db_name, args)
267 self.remote_db = remote_db
268 self.dst_completed_table = "pgq_ext.completed_tick"
269 self.cur_batch_info = None
270
279
281 p = Consumer.init_optparse(self, parser)
282 p.add_option("--rewind", action = "store_true",
283 help = "change queue position according to destination")
284 p.add_option("--reset", action = "store_true",
285 help = "reset queue pos on destination side")
286 return p
287
289 """Process all events in batch.
290 """
291
292 dst_db = self.get_database(self.remote_db)
293 curs = dst_db.cursor()
294
295 self.cur_batch_info = self.get_batch_info(batch_id)
296
297
298 if self.is_batch_done(curs):
299 for ev in event_list:
300 ev.tag_done()
301 return
302
303
304 self.process_remote_batch(db, batch_id, event_list, dst_db)
305
306
307 for ev in event_list:
308 if ev.status == EV_RETRY:
309 raise Exception("SerialConsumer must not use retry queue")
310
311
312 self.set_batch_done(curs)
313 dst_db.commit()
314
316 """Helper function to keep track of last successful batch
317 in external database.
318 """
319
320 prev_tick = self.cur_batch_info['prev_tick_id']
321
322 q = "select last_tick_id from %s where consumer_id = %%s" % (
323 self.dst_completed_table ,)
324 dst_curs.execute(q, [self.consumer_id])
325 res = dst_curs.fetchone()
326
327 if not res or not res[0]:
328
329 return False
330 dst_tick = res[0]
331
332 if prev_tick == dst_tick:
333
334 return False
335
336 if prev_tick < dst_tick:
337 self.log.warning('Got tick %d, dst has %d - skipping' % (prev_tick, dst_tick))
338 return True
339 else:
340 self.log.error('Got tick %d, dst has %d - ticks lost' % (prev_tick, dst_tick))
341 raise Exception('Lost ticks')
342
344 """Helper function to set last successful batch
345 in external database.
346 """
347 tick_id = self.cur_batch_info['tick_id']
348 q = "delete from %s where consumer_id = %%s; "\
349 "insert into %s (consumer_id, last_tick_id) values (%%s, %%s)" % (
350 self.dst_completed_table,
351 self.dst_completed_table)
352 dst_curs.execute(q, [ self.consumer_id,
353 self.consumer_id, tick_id ])
354
359
365
367 self.log.info("removing completed tick from dst")
368 dst_db = self.get_database(self.remote_db)
369 dst_curs = dst_db.cursor()
370
371 q = "delete from %s where consumer_id = %%s" % (
372 self.dst_completed_table,)
373 dst_curs.execute(q, [self.consumer_id])
374 dst_db.commit()
375
377 raise Exception('process_remote_batch not implemented')
378
380 self.log.info("Rewinding queue")
381 src_db = self.get_database(self.db_name)
382 dst_db = self.get_database(self.remote_db)
383 src_curs = src_db.cursor()
384 dst_curs = dst_db.cursor()
385
386 q = "select last_tick_id from %s where consumer_id = %%s" % (
387 self.dst_completed_table,)
388 dst_curs.execute(q, [self.consumer_id])
389 row = dst_curs.fetchone()
390 if row:
391 dst_tick = row[0]
392 q = "select pgq.register_consumer(%s, %s, %s)"
393 src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick])
394 else:
395 self.log.warning('No tick found on dst side')
396
397 dst_db.commit()
398 src_db.commit()
399
401 self.log.info("Resetting queue tracking on dst side")
402 dst_db = self.get_database(self.remote_db)
403 dst_curs = dst_db.cursor()
404
405 q = "delete from %s where consumer_id = %%s" % (
406 self.dst_completed_table,)
407 dst_curs.execute(q, [self.consumer_id])
408 dst_db.commit()
409