1
2
3 """Basic replication core."""
4
5 import sys, os, time
6 import skytools, pgq
7
8 __all__ = ['Replicator', 'TableState',
9 'TABLE_MISSING', 'TABLE_IN_COPY', 'TABLE_CATCHING_UP',
10 'TABLE_WANNA_SYNC', 'TABLE_DO_SYNC', 'TABLE_OK']
11
12
13 TABLE_MISSING = 0
14 TABLE_IN_COPY = 1
15 TABLE_CATCHING_UP = 2
16 TABLE_WANNA_SYNC = 3
17 TABLE_DO_SYNC = 4
18 TABLE_OK = 5
19
20 SYNC_OK = 0
21 SYNC_LOOP = 1
22 SYNC_EXIT = 2
23
25 """Counts table statuses."""
26
27 missing = 0
28 copy = 0
29 catching_up = 0
30 wanna_sync = 0
31 do_sync = 0
32 ok = 0
33
35 """Counts and sanity checks."""
36 for t in tables:
37 if t.state == TABLE_MISSING:
38 self.missing += 1
39 elif t.state == TABLE_IN_COPY:
40 self.copy += 1
41 elif t.state == TABLE_CATCHING_UP:
42 self.catching_up += 1
43 elif t.state == TABLE_WANNA_SYNC:
44 self.wanna_sync += 1
45 elif t.state == TABLE_DO_SYNC:
46 self.do_sync += 1
47 elif t.state == TABLE_OK:
48 self.ok += 1
49
50 if self.copy + self.catching_up + self.wanna_sync + self.do_sync > 1:
51 raise Exception('Bad table state')
52
54 """Keeps state about one table."""
60
62 self.state = TABLE_MISSING
63 self.str_snapshot = None
64 self.from_snapshot = None
65 self.sync_tick_id = None
66 self.ok_batch_count = 0
67 self.last_tick = 0
68 self.changed = 1
69
71 if self.str_snapshot == str_snapshot:
72 return
73 self.log.debug("%s: change_snapshot to %s" % (self.name, str_snapshot))
74 self.str_snapshot = str_snapshot
75 if str_snapshot:
76 self.from_snapshot = skytools.Snapshot(str_snapshot)
77 else:
78 self.from_snapshot = None
79
80 if tag_changed:
81 self.ok_batch_count = 0
82 self.last_tick = None
83 self.changed = 1
84
86 if self.state == state and self.sync_tick_id == tick_id:
87 return
88 self.state = state
89 self.sync_tick_id = tick_id
90 self.changed = 1
91 self.log.debug("%s: change_state to %s" % (self.name,
92 self.render_state()))
93
95 """Make a string to be stored in db."""
96
97 if self.state == TABLE_MISSING:
98 return None
99 elif self.state == TABLE_IN_COPY:
100 return 'in-copy'
101 elif self.state == TABLE_CATCHING_UP:
102 return 'catching-up'
103 elif self.state == TABLE_WANNA_SYNC:
104 return 'wanna-sync:%d' % self.sync_tick_id
105 elif self.state == TABLE_DO_SYNC:
106 return 'do-sync:%d' % self.sync_tick_id
107 elif self.state == TABLE_OK:
108 return 'ok'
109
111 """Read state from string."""
112
113 state = -1
114 if merge_state == None:
115 state = TABLE_MISSING
116 elif merge_state == "in-copy":
117 state = TABLE_IN_COPY
118 elif merge_state == "catching-up":
119 state = TABLE_CATCHING_UP
120 elif merge_state == "ok":
121 state = TABLE_OK
122 elif merge_state == "?":
123 state = TABLE_OK
124 else:
125 tmp = merge_state.split(':')
126 if len(tmp) == 2:
127 self.sync_tick_id = int(tmp[1])
128 if tmp[0] == 'wanna-sync':
129 state = TABLE_WANNA_SYNC
130 elif tmp[0] == 'do-sync':
131 state = TABLE_DO_SYNC
132
133 if state < 0:
134 raise Exception("Bad table state: %s" % merge_state)
135
136 return state
137
139 self.log.debug("loaded_state: %s: %s / %s" % (
140 self.name, merge_state, str_snapshot))
141 self.change_snapshot(str_snapshot, 0)
142 self.state = self.parse_state(merge_state)
143 self.changed = 0
144 if merge_state == "?":
145 self.changed = 1
146
148 """Check if table wants this event."""
149
150 if copy_thread:
151 if self.state not in (TABLE_CATCHING_UP, TABLE_DO_SYNC):
152 return False
153 else:
154 if self.state != TABLE_OK:
155 return False
156
157
158 if not self.from_snapshot:
159 return True
160
161
162 if self.from_snapshot.contains(ev.txid):
163 return False
164
165
166
167 if tick_id != self.last_tick:
168 self.last_tick = tick_id
169 self.ok_batch_count += 1
170
171
172 if self.ok_batch_count > 3:
173 self.change_snapshot(None)
174 return True
175
178 self.seq_list = []
179 self.val_cache = {}
180
182 self.seq_list = seq_list
183 new_cache = {}
184 for seq in seq_list:
185 val = self.val_cache.get(seq)
186 if val:
187 new_cache[seq] = val
188 self.val_cache = new_cache
189
190 - def resync(self, src_curs, dst_curs):
191 if len(self.seq_list) == 0:
192 return
193 dat = ".last_value, ".join(self.seq_list)
194 dat += ".last_value"
195 q = "select %s from %s" % (dat, ",".join(self.seq_list))
196 src_curs.execute(q)
197 row = src_curs.fetchone()
198 for i in range(len(self.seq_list)):
199 seq = self.seq_list[i]
200 cur = row[i]
201 old = self.val_cache.get(seq)
202 if old != cur:
203 q = "select setval(%s, %s)"
204 dst_curs.execute(q, [seq, cur])
205 self.val_cache[seq] = cur
206
208 """Replication core."""
209
210 sql_command = {
211 'I': "insert into %s %s;",
212 'U': "update only %s set %s;",
213 'D': "delete from only %s where %s;",
214 }
215
216
217 cur_tick = 0
218 prev_tick = 0
219
221 pgq.SerialConsumer.__init__(self, 'londiste', 'provider_db', 'subscriber_db', args)
222
223
224 self.dst_completed_table = "londiste.completed"
225
226 self.table_list = []
227 self.table_map = {}
228
229 self.copy_thread = 0
230 self.maint_time = 0
231 self.seq_cache = SeqCache()
232 self.maint_delay = self.cf.getint('maint_delay', 600)
233 self.mirror_queue = self.cf.get('mirror_queue', '')
234
256
258 if self.copy_thread:
259 return
260
261 q = "select * from londiste.subscriber_get_seq_list(%s)"
262 dst_curs.execute(q, [self.pgq_queue_name])
263 seq_list = []
264 for row in dst_curs.fetchall():
265 seq_list.append(row[0])
266
267 self.seq_cache.set_seq_list(seq_list)
268
269 src_curs = self.get_database('provider_db').cursor()
270 self.seq_cache.resync(src_curs, dst_curs)
271
273 """Table sync loop.
274
275 Calls appropriate handles, which is expected to
276 return one of SYNC_* constants."""
277
278 self.log.debug('Sync tables')
279 while 1:
280 cnt = Counter(self.table_list)
281 if self.copy_thread:
282 res = self.sync_from_copy_thread(cnt, dst_db)
283 else:
284 res = self.sync_from_main_thread(cnt, dst_db)
285
286 if res == SYNC_EXIT:
287 self.log.debug('Sync tables: exit')
288 self.detach()
289 sys.exit(0)
290 elif res == SYNC_OK:
291 return
292 elif res != SYNC_LOOP:
293 raise Exception('Program error')
294
295 self.log.debug('Sync tables: sleeping')
296 time.sleep(3)
297 dst_db.commit()
298 self.load_table_state(dst_db.cursor())
299 dst_db.commit()
300
301 - def sync_from_main_thread(self, cnt, dst_db):
302 "Main thread sync logic."
303
304
305
306
307 if cnt.do_sync:
308
309 return SYNC_LOOP
310 elif cnt.wanna_sync:
311
312 t = self.get_table_by_state(TABLE_WANNA_SYNC)
313 if self.cur_tick >= t.sync_tick_id:
314 self.change_table_state(dst_db, t, TABLE_DO_SYNC, self.cur_tick)
315 return SYNC_LOOP
316 else:
317 return SYNC_OK
318 elif cnt.catching_up:
319
320 return SYNC_OK
321 elif cnt.copy:
322
323 return SYNC_OK
324 elif cnt.missing:
325
326 t = self.get_table_by_state(TABLE_MISSING)
327 self.change_table_state(dst_db, t, TABLE_IN_COPY)
328
329
330 self.launch_copy(t)
331
332
333
334 return SYNC_LOOP
335 else:
336
337 return SYNC_OK
338
381
383 "Actual event processing happens here."
384
385 ignored_events = 0
386 self.sql_list = []
387 mirror_list = []
388 for ev in ev_list:
389 if not self.interesting(ev):
390 ignored_events += 1
391 ev.tag_done()
392 continue
393
394 if ev.type in ('I', 'U', 'D'):
395 self.handle_data_event(ev, dst_curs)
396 else:
397 self.handle_system_event(ev, dst_curs)
398
399 if self.mirror_queue:
400 mirror_list.append(ev)
401
402
403 self.flush_sql(dst_curs)
404 self.stat_add('ignored', ignored_events)
405
406
407 if self.mirror_queue:
408 self.fill_mirror_queue(mirror_list, dst_curs)
409
411 fmt = self.sql_command[ev.type]
412 sql = fmt % (ev.extra1, ev.data)
413 self.sql_list.append(sql)
414 if len(self.sql_list) > 200:
415 self.flush_sql(dst_curs)
416 ev.tag_done()
417
419 if len(self.sql_list) == 0:
420 return
421
422 buf = "\n".join(self.sql_list)
423 self.sql_list = []
424
425 dst_curs.execute(buf)
426
428 if ev.type not in ('I', 'U', 'D'):
429 return 1
430 t = self.get_table_by_name(ev.extra1)
431 if t:
432 return t.interesting(ev, self.cur_tick, self.copy_thread)
433 else:
434 return 0
435
437 "System event."
438
439 if ev.type == "T":
440 self.log.info("got new table event: "+ev.data)
441
442 name_list = []
443 for name in ev.data.split(','):
444 name_list.append(name.strip())
445
446 del_list = []
447 for tbl in self.table_list:
448 if tbl.name in name_list:
449 continue
450 del_list.append(tbl)
451
452
453 for tbl in del_list:
454 self.log.info("Removing table %s from set" % tbl.name)
455 self.remove_table(tbl, dst_curs)
456
457 ev.tag_done()
458 else:
459 self.log.warning("Unknows op %s" % ev.type)
460 ev.tag_failed("Unknown operation")
461
463 del self.table_map[tbl.name]
464 self.table_list.remove(tbl)
465 q = "select londiste.subscriber_remove_table(%s, %s)"
466 dst_curs.execute(q, [self.pgq_queue_name, tbl.name])
467
469 """Load table state from database.
470
471 Todo: if all tables are OK, there is no need
472 to load state on every batch.
473 """
474
475 q = """select table_name, snapshot, merge_state
476 from londiste.subscriber_get_table_list(%s)
477 """
478 curs.execute(q, [self.pgq_queue_name])
479
480 new_list = []
481 new_map = {}
482 for row in curs.dictfetchall():
483 t = self.get_table_by_name(row['table_name'])
484 if not t:
485 t = TableState(row['table_name'], self.log)
486 t.loaded_state(row['merge_state'], row['snapshot'])
487 new_list.append(t)
488 new_map[t.name] = t
489
490 self.table_list = new_list
491 self.table_map = new_map
492
494 """Store changed table state in database."""
495
496 for t in self.table_list:
497 if not t.changed:
498 continue
499 merge_state = t.render_state()
500 self.log.info("storing state of %s: copy:%d new_state:%s" % (
501 t.name, self.copy_thread, merge_state))
502 q = "select londiste.subscriber_set_table_state(%s, %s, %s, %s)"
503 curs.execute(q, [self.pgq_queue_name,
504 t.name, t.str_snapshot, merge_state])
505 t.changed = 0
506
514
516 "get first table with specific state"
517
518 for t in self.table_list:
519 if t.state == state:
520 return t
521 raise Exception('No table was found with state: %d' % state)
522
524 if name.find('.') < 0:
525 name = "public.%s" % name
526 if name in self.table_map:
527 return self.table_map[name]
528 return None
529
531
532 rows = []
533 fields = ['ev_type', 'ev_data', 'ev_extra1']
534 for ev in mirror_list:
535 rows.append((ev.type, ev.data, ev.extra1))
536 pgq.bulk_insert_events(dst_curs, rows, fields, self.mirror_queue)
537
538
539 q = "select pgq.ticker(%s, %s)"
540 dst_curs.execute(q, [self.mirror_queue, self.cur_tick])
541
543 self.log.info("Launching copy process")
544 script = sys.argv[0]
545 conf = self.cf.filename
546 if self.options.verbose:
547 cmd = "%s -d -v %s copy"
548 else:
549 cmd = "%s -d %s copy"
550 cmd = cmd % (script, conf)
551 self.log.debug("Launch args: "+repr(cmd))
552 res = os.system(cmd)
553 self.log.debug("Launch result: "+repr(res))
554
555 if __name__ == '__main__':
556 script = Replicator(sys.argv[1:])
557 script.start()
558