Package londiste :: Module table_copy
[frames] | no frames]

Source Code for Module londiste.table_copy

  1  #! /usr/bin/env python 
  2   
  3  """Do a full table copy. 
  4   
  5  For internal usage. 
  6  """ 
  7   
  8  import sys, os, skytools 
  9   
 10  from skytools.dbstruct import * 
 11  from playback import * 
 12   
 13  __all__ = ['CopyTable'] 
 14   
15 -class CopyTable(Replicator):
16 - def __init__(self, args, copy_thread = 1):
17 Replicator.__init__(self, args) 18 19 if copy_thread: 20 self.pidfile += ".copy" 21 self.consumer_id += "_copy" 22 self.copy_thread = 1
23
24 - def init_optparse(self, parser=None):
25 p = Replicator.init_optparse(self, parser) 26 p.add_option("--skip-truncate", action="store_true", dest="skip_truncate", 27 help = "avoid truncate", default=False) 28 return p
29
30 - def do_copy(self, tbl_stat):
31 src_db = self.get_database('provider_db') 32 dst_db = self.get_database('subscriber_db') 33 34 # it should not matter to pgq 35 src_db.commit() 36 dst_db.commit() 37 38 # change to SERIALIZABLE isolation level 39 src_db.set_isolation_level(2) 40 src_db.commit() 41 42 # initial sync copy 43 src_curs = src_db.cursor() 44 dst_curs = dst_db.cursor() 45 46 self.log.info("Starting full copy of %s" % tbl_stat.name) 47 48 # find dst struct 49 src_struct = TableStruct(src_curs, tbl_stat.name) 50 dst_struct = TableStruct(dst_curs, tbl_stat.name) 51 52 # check if columns match 53 dlist = dst_struct.get_column_list() 54 for c in src_struct.get_column_list(): 55 if c not in dlist: 56 raise Exception('Column %s does not exist on dest side' % c) 57 58 # drop unnecessary stuff 59 objs = T_CONSTRAINT | T_INDEX | T_TRIGGER | T_RULE 60 dst_struct.drop(dst_curs, objs, log = self.log) 61 62 # do truncate & copy 63 self.real_copy(src_curs, dst_curs, tbl_stat.name) 64 65 # get snapshot 66 src_curs.execute("select get_current_snapshot()") 67 snapshot = src_curs.fetchone()[0] 68 src_db.commit() 69 70 # restore READ COMMITTED behaviour 71 src_db.set_isolation_level(1) 72 src_db.commit() 73 74 # create previously dropped objects 75 dst_struct.create(dst_curs, objs, log = self.log) 76 77 # set state 78 tbl_stat.change_snapshot(snapshot) 79 if self.copy_thread: 80 tbl_stat.change_state(TABLE_CATCHING_UP) 81 else: 82 tbl_stat.change_state(TABLE_OK) 83 self.save_table_state(dst_curs) 84 dst_db.commit()
85
86 - def real_copy(self, srccurs, dstcurs, tablename):
87 "Main copy logic." 88 89 # drop data 90 if self.options.skip_truncate: 91 self.log.info("%s: skipping truncate" % tablename) 92 else: 93 self.log.info("%s: truncating" % tablename) 94 dstcurs.execute("truncate " + tablename) 95 96 # do copy 97 self.log.info("%s: start copy" % tablename) 98 col_list = skytools.get_table_columns(srccurs, tablename) 99 stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list) 100 if stats: 101 self.log.info("%s: copy finished: %d bytes, %d rows" % ( 102 tablename, stats[0], stats[1]))
103 104 if __name__ == '__main__': 105 script = CopyTable(sys.argv[1:]) 106 script.start() 107