1
2
3 """Do a full table copy.
4
5 For internal usage.
6 """
7
8 import sys, os, skytools
9
10 from skytools.dbstruct import *
11 from playback import *
12
13 __all__ = ['CopyTable']
14
16 - def __init__(self, args, copy_thread = 1):
17 Replicator.__init__(self, args)
18
19 if copy_thread:
20 self.pidfile += ".copy"
21 self.consumer_id += "_copy"
22 self.copy_thread = 1
23
25 p = Replicator.init_optparse(self, parser)
26 p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
27 help = "avoid truncate", default=False)
28 return p
29
31 src_db = self.get_database('provider_db')
32 dst_db = self.get_database('subscriber_db')
33
34
35 src_db.commit()
36 dst_db.commit()
37
38
39 src_db.set_isolation_level(2)
40 src_db.commit()
41
42
43 src_curs = src_db.cursor()
44 dst_curs = dst_db.cursor()
45
46 self.log.info("Starting full copy of %s" % tbl_stat.name)
47
48
49 src_struct = TableStruct(src_curs, tbl_stat.name)
50 dst_struct = TableStruct(dst_curs, tbl_stat.name)
51
52
53 dlist = dst_struct.get_column_list()
54 for c in src_struct.get_column_list():
55 if c not in dlist:
56 raise Exception('Column %s does not exist on dest side' % c)
57
58
59 objs = T_CONSTRAINT | T_INDEX | T_TRIGGER | T_RULE
60 dst_struct.drop(dst_curs, objs, log = self.log)
61
62
63 self.real_copy(src_curs, dst_curs, tbl_stat.name)
64
65
66 src_curs.execute("select get_current_snapshot()")
67 snapshot = src_curs.fetchone()[0]
68 src_db.commit()
69
70
71 src_db.set_isolation_level(1)
72 src_db.commit()
73
74
75 dst_struct.create(dst_curs, objs, log = self.log)
76
77
78 tbl_stat.change_snapshot(snapshot)
79 if self.copy_thread:
80 tbl_stat.change_state(TABLE_CATCHING_UP)
81 else:
82 tbl_stat.change_state(TABLE_OK)
83 self.save_table_state(dst_curs)
84 dst_db.commit()
85
86 - def real_copy(self, srccurs, dstcurs, tablename):
87 "Main copy logic."
88
89
90 if self.options.skip_truncate:
91 self.log.info("%s: skipping truncate" % tablename)
92 else:
93 self.log.info("%s: truncating" % tablename)
94 dstcurs.execute("truncate " + tablename)
95
96
97 self.log.info("%s: start copy" % tablename)
98 col_list = skytools.get_table_columns(srccurs, tablename)
99 stats = skytools.full_copy(tablename, srccurs, dstcurs, col_list)
100 if stats:
101 self.log.info("%s: copy finished: %d bytes, %d rows" % (
102 tablename, stats[0], stats[1]))
103
104 if __name__ == '__main__':
105 script = CopyTable(sys.argv[1:])
106 script.start()
107