Package londiste :: Module playback
[frames] | no frames]

Source Code for Module londiste.playback

  1  #! /usr/bin/env python 
  2   
  3  """Basic replication core.""" 
  4   
  5  import sys, os, time 
  6  import skytools, pgq 
  7   
  8  __all__ = ['Replicator', 'TableState', 
  9      'TABLE_MISSING', 'TABLE_IN_COPY', 'TABLE_CATCHING_UP', 
 10      'TABLE_WANNA_SYNC', 'TABLE_DO_SYNC', 'TABLE_OK'] 
 11   
 12  # state                 # owner - who is allowed to change 
 13  TABLE_MISSING      = 0  # main 
 14  TABLE_IN_COPY      = 1  # copy 
 15  TABLE_CATCHING_UP  = 2  # copy 
 16  TABLE_WANNA_SYNC   = 3  # main 
 17  TABLE_DO_SYNC      = 4  # copy 
 18  TABLE_OK           = 5  # setup 
 19   
 20  SYNC_OK   = 0  # continue with batch 
 21  SYNC_LOOP = 1  # sleep, try again 
 22  SYNC_EXIT = 2  # nothing to do, exit skript 
 23   
24 -class Counter(object):
25 """Counts table statuses.""" 26 27 missing = 0 28 copy = 0 29 catching_up = 0 30 wanna_sync = 0 31 do_sync = 0 32 ok = 0 33
34 - def __init__(self, tables):
35 """Counts and sanity checks.""" 36 for t in tables: 37 if t.state == TABLE_MISSING: 38 self.missing += 1 39 elif t.state == TABLE_IN_COPY: 40 self.copy += 1 41 elif t.state == TABLE_CATCHING_UP: 42 self.catching_up += 1 43 elif t.state == TABLE_WANNA_SYNC: 44 self.wanna_sync += 1 45 elif t.state == TABLE_DO_SYNC: 46 self.do_sync += 1 47 elif t.state == TABLE_OK: 48 self.ok += 1 49 # only one table is allowed to have in-progress copy 50 if self.copy + self.catching_up + self.wanna_sync + self.do_sync > 1: 51 raise Exception('Bad table state')
52
53 -class TableState(object):
54 """Keeps state about one table."""
55 - def __init__(self, name, log):
56 self.name = name 57 self.log = log 58 self.forget() 59 self.changed = 0
60
61 - def forget(self):
62 self.state = TABLE_MISSING 63 self.str_snapshot = None 64 self.from_snapshot = None 65 self.sync_tick_id = None 66 self.ok_batch_count = 0 67 self.last_tick = 0 68 self.changed = 1
69
70 - def change_snapshot(self, str_snapshot, tag_changed = 1):
71 if self.str_snapshot == str_snapshot: 72 return 73 self.log.debug("%s: change_snapshot to %s" % (self.name, str_snapshot)) 74 self.str_snapshot = str_snapshot 75 if str_snapshot: 76 self.from_snapshot = skytools.Snapshot(str_snapshot) 77 else: 78 self.from_snapshot = None 79 80 if tag_changed: 81 self.ok_batch_count = 0 82 self.last_tick = None 83 self.changed = 1
84
85 - def change_state(self, state, tick_id = None):
86 if self.state == state and self.sync_tick_id == tick_id: 87 return 88 self.state = state 89 self.sync_tick_id = tick_id 90 self.changed = 1 91 self.log.debug("%s: change_state to %s" % (self.name, 92 self.render_state()))
93
94 - def render_state(self):
95 """Make a string to be stored in db.""" 96 97 if self.state == TABLE_MISSING: 98 return None 99 elif self.state == TABLE_IN_COPY: 100 return 'in-copy' 101 elif self.state == TABLE_CATCHING_UP: 102 return 'catching-up' 103 elif self.state == TABLE_WANNA_SYNC: 104 return 'wanna-sync:%d' % self.sync_tick_id 105 elif self.state == TABLE_DO_SYNC: 106 return 'do-sync:%d' % self.sync_tick_id 107 elif self.state == TABLE_OK: 108 return 'ok'
109
110 - def parse_state(self, merge_state):
111 """Read state from string.""" 112 113 state = -1 114 if merge_state == None: 115 state = TABLE_MISSING 116 elif merge_state == "in-copy": 117 state = TABLE_IN_COPY 118 elif merge_state == "catching-up": 119 state = TABLE_CATCHING_UP 120 elif merge_state == "ok": 121 state = TABLE_OK 122 elif merge_state == "?": 123 state = TABLE_OK 124 else: 125 tmp = merge_state.split(':') 126 if len(tmp) == 2: 127 self.sync_tick_id = int(tmp[1]) 128 if tmp[0] == 'wanna-sync': 129 state = TABLE_WANNA_SYNC 130 elif tmp[0] == 'do-sync': 131 state = TABLE_DO_SYNC 132 133 if state < 0: 134 raise Exception("Bad table state: %s" % merge_state) 135 136 return state
137
138 - def loaded_state(self, merge_state, str_snapshot):
139 self.log.debug("loaded_state: %s: %s / %s" % ( 140 self.name, merge_state, str_snapshot)) 141 self.change_snapshot(str_snapshot, 0) 142 self.state = self.parse_state(merge_state) 143 self.changed = 0 144 if merge_state == "?": 145 self.changed = 1
146
147 - def interesting(self, ev, tick_id, copy_thread):
148 """Check if table wants this event.""" 149 150 if copy_thread: 151 if self.state not in (TABLE_CATCHING_UP, TABLE_DO_SYNC): 152 return False 153 else: 154 if self.state != TABLE_OK: 155 return False 156 157 # if no snapshot tracking, then accept always 158 if not self.from_snapshot: 159 return True 160 161 # uninteresting? 162 if self.from_snapshot.contains(ev.txid): 163 return False 164 165 # after couple interesting batches there no need to check snapshot 166 # as there can be only one partially interesting batch 167 if tick_id != self.last_tick: 168 self.last_tick = tick_id 169 self.ok_batch_count += 1 170 171 # disable batch tracking 172 if self.ok_batch_count > 3: 173 self.change_snapshot(None) 174 return True
175
176 -class SeqCache(object):
177 - def __init__(self):
178 self.seq_list = [] 179 self.val_cache = {}
180
181 - def set_seq_list(self, seq_list):
182 self.seq_list = seq_list 183 new_cache = {} 184 for seq in seq_list: 185 val = self.val_cache.get(seq) 186 if val: 187 new_cache[seq] = val 188 self.val_cache = new_cache
189
190 - def resync(self, src_curs, dst_curs):
191 if len(self.seq_list) == 0: 192 return 193 dat = ".last_value, ".join(self.seq_list) 194 dat += ".last_value" 195 q = "select %s from %s" % (dat, ",".join(self.seq_list)) 196 src_curs.execute(q) 197 row = src_curs.fetchone() 198 for i in range(len(self.seq_list)): 199 seq = self.seq_list[i] 200 cur = row[i] 201 old = self.val_cache.get(seq) 202 if old != cur: 203 q = "select setval(%s, %s)" 204 dst_curs.execute(q, [seq, cur]) 205 self.val_cache[seq] = cur
206
207 -class Replicator(pgq.SerialConsumer):
208 """Replication core.""" 209 210 sql_command = { 211 'I': "insert into %s %s;", 212 'U': "update only %s set %s;", 213 'D': "delete from only %s where %s;", 214 } 215 216 # batch info 217 cur_tick = 0 218 prev_tick = 0 219
220 - def __init__(self, args):
221 pgq.SerialConsumer.__init__(self, 'londiste', 'provider_db', 'subscriber_db', args) 222 223 # tick table in dst for SerialConsumer(). keep londiste stuff under one schema 224 self.dst_completed_table = "londiste.completed" 225 226 self.table_list = [] 227 self.table_map = {} 228 229 self.copy_thread = 0 230 self.maint_time = 0 231 self.seq_cache = SeqCache() 232 self.maint_delay = self.cf.getint('maint_delay', 600) 233 self.mirror_queue = self.cf.get('mirror_queue', '')
234
235 - def process_remote_batch(self, src_db, batch_id, ev_list, dst_db):
236 "All work for a batch. Entry point from SerialConsumer." 237 238 # this part can play freely with transactions 239 240 dst_curs = dst_db.cursor() 241 242 self.cur_tick = self.cur_batch_info['tick_id'] 243 self.prev_tick = self.cur_batch_info['prev_tick_id'] 244 245 self.load_table_state(dst_curs) 246 self.sync_tables(dst_db) 247 248 # now the actual event processing happens. 249 # they must be done all in one tx in dst side 250 # and the transaction must be kept open so that 251 # the SerialConsumer can save last tick and commit. 252 253 self.handle_seqs(dst_curs) 254 self.handle_events(dst_curs, ev_list) 255 self.save_table_state(dst_curs)
256
257 - def handle_seqs(self, dst_curs):
258 if self.copy_thread: 259 return 260 261 q = "select * from londiste.subscriber_get_seq_list(%s)" 262 dst_curs.execute(q, [self.pgq_queue_name]) 263 seq_list = [] 264 for row in dst_curs.fetchall(): 265 seq_list.append(row[0]) 266 267 self.seq_cache.set_seq_list(seq_list) 268 269 src_curs = self.get_database('provider_db').cursor() 270 self.seq_cache.resync(src_curs, dst_curs)
271
272 - def sync_tables(self, dst_db):
273 """Table sync loop. 274 275 Calls appropriate handles, which is expected to 276 return one of SYNC_* constants.""" 277 278 self.log.debug('Sync tables') 279 while 1: 280 cnt = Counter(self.table_list) 281 if self.copy_thread: 282 res = self.sync_from_copy_thread(cnt, dst_db) 283 else: 284 res = self.sync_from_main_thread(cnt, dst_db) 285 286 if res == SYNC_EXIT: 287 self.log.debug('Sync tables: exit') 288 self.detach() 289 sys.exit(0) 290 elif res == SYNC_OK: 291 return 292 elif res != SYNC_LOOP: 293 raise Exception('Program error') 294 295 self.log.debug('Sync tables: sleeping') 296 time.sleep(3) 297 dst_db.commit() 298 self.load_table_state(dst_db.cursor()) 299 dst_db.commit()
300
301 - def sync_from_main_thread(self, cnt, dst_db):
302 "Main thread sync logic." 303 304 # 305 # decide what to do - order is imortant 306 # 307 if cnt.do_sync: 308 # wait for copy thread to catch up 309 return SYNC_LOOP 310 elif cnt.wanna_sync: 311 # copy thread wants sync, if not behind, do it 312 t = self.get_table_by_state(TABLE_WANNA_SYNC) 313 if self.cur_tick >= t.sync_tick_id: 314 self.change_table_state(dst_db, t, TABLE_DO_SYNC, self.cur_tick) 315 return SYNC_LOOP 316 else: 317 return SYNC_OK 318 elif cnt.catching_up: 319 # active copy, dont worry 320 return SYNC_OK 321 elif cnt.copy: 322 # active copy, dont worry 323 return SYNC_OK 324 elif cnt.missing: 325 # seems there is no active copy thread, launch new 326 t = self.get_table_by_state(TABLE_MISSING) 327 self.change_table_state(dst_db, t, TABLE_IN_COPY) 328 329 # the copy _may_ happen immidiately 330 self.launch_copy(t) 331 332 # there cannot be interesting events in current batch 333 # but maybe there's several tables, lets do them in one go 334 return SYNC_LOOP 335 else: 336 # seems everything is in sync 337 return SYNC_OK
338
339 - def sync_from_copy_thread(self, cnt, dst_db):
340 "Copy thread sync logic." 341 342 # 343 # decide what to do - order is imortant 344 # 345 if cnt.do_sync: 346 # main thread is waiting, catch up, then handle over 347 t = self.get_table_by_state(TABLE_DO_SYNC) 348 if self.cur_tick == t.sync_tick_id: 349 self.change_table_state(dst_db, t, TABLE_OK) 350 return SYNC_EXIT 351 elif self.cur_tick < t.sync_tick_id: 352 return SYNC_OK 353 else: 354 self.log.error("copy_sync: cur_tick=%d sync_tick=%d" % ( 355 self.cur_tick, t.sync_tick_id)) 356 raise Exception('Invalid table state') 357 elif cnt.wanna_sync: 358 # wait for main thread to react 359 return SYNC_LOOP 360 elif cnt.catching_up: 361 # is there more work? 362 if self.work_state: 363 return SYNC_OK 364 365 # seems we have catched up 366 t = self.get_table_by_state(TABLE_CATCHING_UP) 367 self.change_table_state(dst_db, t, TABLE_WANNA_SYNC, self.cur_tick) 368 return SYNC_LOOP 369 elif cnt.copy: 370 # table is not copied yet, do it 371 t = self.get_table_by_state(TABLE_IN_COPY) 372 self.do_copy(t) 373 374 # forget previous value 375 self.work_state = 1 376 377 return SYNC_LOOP 378 else: 379 # nothing to do 380 return SYNC_EXIT
381
382 - def handle_events(self, dst_curs, ev_list):
383 "Actual event processing happens here." 384 385 ignored_events = 0 386 self.sql_list = [] 387 mirror_list = [] 388 for ev in ev_list: 389 if not self.interesting(ev): 390 ignored_events += 1 391 ev.tag_done() 392 continue 393 394 if ev.type in ('I', 'U', 'D'): 395 self.handle_data_event(ev, dst_curs) 396 else: 397 self.handle_system_event(ev, dst_curs) 398 399 if self.mirror_queue: 400 mirror_list.append(ev) 401 402 # finalize table changes 403 self.flush_sql(dst_curs) 404 self.stat_add('ignored', ignored_events) 405 406 # put events into mirror queue if requested 407 if self.mirror_queue: 408 self.fill_mirror_queue(mirror_list, dst_curs)
409
410 - def handle_data_event(self, ev, dst_curs):
411 fmt = self.sql_command[ev.type] 412 sql = fmt % (ev.extra1, ev.data) 413 self.sql_list.append(sql) 414 if len(self.sql_list) > 200: 415 self.flush_sql(dst_curs) 416 ev.tag_done()
417
418 - def flush_sql(self, dst_curs):
419 if len(self.sql_list) == 0: 420 return 421 422 buf = "\n".join(self.sql_list) 423 self.sql_list = [] 424 425 dst_curs.execute(buf)
426
427 - def interesting(self, ev):
428 if ev.type not in ('I', 'U', 'D'): 429 return 1 430 t = self.get_table_by_name(ev.extra1) 431 if t: 432 return t.interesting(ev, self.cur_tick, self.copy_thread) 433 else: 434 return 0
435
436 - def handle_system_event(self, ev, dst_curs):
437 "System event." 438 439 if ev.type == "T": 440 self.log.info("got new table event: "+ev.data) 441 # check tables to be dropped 442 name_list = [] 443 for name in ev.data.split(','): 444 name_list.append(name.strip()) 445 446 del_list = [] 447 for tbl in self.table_list: 448 if tbl.name in name_list: 449 continue 450 del_list.append(tbl) 451 452 # separate loop to avoid changing while iterating 453 for tbl in del_list: 454 self.log.info("Removing table %s from set" % tbl.name) 455 self.remove_table(tbl, dst_curs) 456 457 ev.tag_done() 458 else: 459 self.log.warning("Unknows op %s" % ev.type) 460 ev.tag_failed("Unknown operation")
461
462 - def remove_table(self, tbl, dst_curs):
463 del self.table_map[tbl.name] 464 self.table_list.remove(tbl) 465 q = "select londiste.subscriber_remove_table(%s, %s)" 466 dst_curs.execute(q, [self.pgq_queue_name, tbl.name])
467
468 - def load_table_state(self, curs):
469 """Load table state from database. 470 471 Todo: if all tables are OK, there is no need 472 to load state on every batch. 473 """ 474 475 q = """select table_name, snapshot, merge_state 476 from londiste.subscriber_get_table_list(%s) 477 """ 478 curs.execute(q, [self.pgq_queue_name]) 479 480 new_list = [] 481 new_map = {} 482 for row in curs.dictfetchall(): 483 t = self.get_table_by_name(row['table_name']) 484 if not t: 485 t = TableState(row['table_name'], self.log) 486 t.loaded_state(row['merge_state'], row['snapshot']) 487 new_list.append(t) 488 new_map[t.name] = t 489 490 self.table_list = new_list 491 self.table_map = new_map
492
493 - def save_table_state(self, curs):
494 """Store changed table state in database.""" 495 496 for t in self.table_list: 497 if not t.changed: 498 continue 499 merge_state = t.render_state() 500 self.log.info("storing state of %s: copy:%d new_state:%s" % ( 501 t.name, self.copy_thread, merge_state)) 502 q = "select londiste.subscriber_set_table_state(%s, %s, %s, %s)" 503 curs.execute(q, [self.pgq_queue_name, 504 t.name, t.str_snapshot, merge_state]) 505 t.changed = 0
506
507 - def change_table_state(self, dst_db, tbl, state, tick_id = None):
508 tbl.change_state(state, tick_id) 509 self.save_table_state(dst_db.cursor()) 510 dst_db.commit() 511 512 self.log.info("Table %s status changed to '%s'" % ( 513 tbl.name, tbl.render_state()))
514
515 - def get_table_by_state(self, state):
516 "get first table with specific state" 517 518 for t in self.table_list: 519 if t.state == state: 520 return t 521 raise Exception('No table was found with state: %d' % state)
522
523 - def get_table_by_name(self, name):
524 if name.find('.') < 0: 525 name = "public.%s" % name 526 if name in self.table_map: 527 return self.table_map[name] 528 return None
529
530 - def fill_mirror_queue(self, ev_list, dst_curs):
531 # insert events 532 rows = [] 533 fields = ['ev_type', 'ev_data', 'ev_extra1'] 534 for ev in mirror_list: 535 rows.append((ev.type, ev.data, ev.extra1)) 536 pgq.bulk_insert_events(dst_curs, rows, fields, self.mirror_queue) 537 538 # create tick 539 q = "select pgq.ticker(%s, %s)" 540 dst_curs.execute(q, [self.mirror_queue, self.cur_tick])
541
542 - def launch_copy(self, tbl_stat):
543 self.log.info("Launching copy process") 544 script = sys.argv[0] 545 conf = self.cf.filename 546 if self.options.verbose: 547 cmd = "%s -d -v %s copy" 548 else: 549 cmd = "%s -d %s copy" 550 cmd = cmd % (script, conf) 551 self.log.debug("Launch args: "+repr(cmd)) 552 res = os.system(cmd) 553 self.log.debug("Launch result: "+repr(res))
554 555 if __name__ == '__main__': 556 script = Replicator(sys.argv[1:]) 557 script.start() 558