1
2 """Catch moment when tables are in sync on master and slave.
3 """
4
5 import sys, time, skytools
6
7 -class Syncer(skytools.DBScript):
8 """Walks tables in primary key order and checks if data matches."""
9
19
24
26 src_curs = src_db.cursor()
27
28
29 q = "select extract(epoch from ticker_lag) from pgq.get_queue_list()"\
30 " where queue_name = %s"
31 src_curs.execute(q, [self.pgq_queue_name])
32 ticker_lag = src_curs.fetchone()[0]
33 q = "select extract(epoch from lag)"\
34 " from pgq.get_consumer_list()"\
35 " where queue_name = %s"\
36 " and consumer_name = %s"
37 src_curs.execute(q, [self.pgq_queue_name, self.pgq_consumer_id])
38 res = src_curs.fetchall()
39 src_db.commit()
40
41 if len(res) == 0:
42 self.log.error('No such consumer')
43 sys.exit(1)
44 consumer_lag = res[0][0]
45
46 if consumer_lag > ticker_lag + 10 and not self.options.force:
47 self.log.error('Consumer lagging too much, cannot proceed')
48 sys.exit(1)
49
51 dst_db = self.get_database('subscriber_db')
52 dst_curs = dst_db.cursor()
53 q = "select * from londiste.subscriber_get_table_list(%s)"
54 dst_curs.execute(q, [self.pgq_queue_name])
55 res = dst_curs.dictfetchall()
56 dst_db.commit()
57 return res
58
60 src_loc = self.cf.get('provider_db')
61 lock_db = self.get_database('provider_db', cache='lock_db')
62 src_db = self.get_database('provider_db')
63 dst_db = self.get_database('subscriber_db')
64
65 self.check_consumer(src_db)
66
67 state_list = self.get_subscriber_table_state()
68 state_map = {}
69 full_list = []
70 for ts in state_list:
71 name = ts['table_name']
72 full_list.append(name)
73 state_map[name] = ts
74
75 if len(self.args) > 2:
76 tlist = self.args[2:]
77 else:
78 tlist = full_list
79
80 for tbl in tlist:
81 if not tbl in state_map:
82 self.log.warning('Table not subscribed: %s' % tbl)
83 continue
84 st = state_map[tbl]
85 if st['merge_state'] != 'ok':
86 self.log.info('Table %s not synced yet, no point' % tbl)
87 continue
88 self.check_table(tbl, lock_db, src_db, dst_db)
89 lock_db.commit()
90 src_db.commit()
91 dst_db.commit()
92
94 """Get transaction to same state, then process."""
95
96
97 lock_curs = lock_db.cursor()
98 src_curs = src_db.cursor()
99 dst_curs = dst_db.cursor()
100
101 if not skytools.exists_table(src_curs, tbl):
102 self.log.warning("Table %s does not exist on provider side" % tbl)
103 return
104 if not skytools.exists_table(dst_curs, tbl):
105 self.log.warning("Table %s does not exist on subscriber side" % tbl)
106 return
107
108
109 self.log.info('Locking %s' % tbl)
110 lock_db.commit()
111 lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % tbl)
112 lock_time = time.time()
113
114
115 self.log.info('Syncing %s' % tbl)
116
117
118 src_curs.execute("select pgq.ticker(%s)", [self.pgq_queue_name])
119 tick_id = src_curs.fetchone()[0]
120 src_db.commit()
121
122 time.sleep(0.1)
123 src_curs.execute("select pgq.ticker(%s)", [self.pgq_queue_name])
124 src_db.commit()
125 src_curs.execute("select to_char(now(), 'YYYY-MM-DD HH24:MI:SS.MS')")
126 tpos = src_curs.fetchone()[0]
127 src_db.commit()
128
129 while 1:
130 time.sleep(0.2)
131
132 q = """select now() - lag > %s, now(), lag
133 from pgq.get_consumer_list()
134 where consumer_name = %s
135 and queue_name = %s"""
136 src_curs.execute(q, [tpos, self.pgq_consumer_id, self.pgq_queue_name])
137 res = src_curs.fetchall()
138 src_db.commit()
139
140 if len(res) == 0:
141 raise Exception('No such consumer')
142
143 row = res[0]
144 self.log.debug("tpos=%s now=%s lag=%s ok=%s" % (tpos, row[1], row[2], row[0]))
145 if row[0]:
146 break
147
148
149 if time.time() > lock_time + 10 and not self.options.force:
150 self.log.error('Consumer lagging too much, exiting')
151 lock_db.rollback()
152 sys.exit(1)
153
154
155 src_curs.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
156 src_curs.execute("SELECT 1")
157
158
159 dst_db.commit()
160 dst_curs.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
161 dst_curs.execute("SELECT 1")
162
163
164 lock_db.commit()
165
166
167 self.process_sync(tbl, src_db, dst_db)
168
169
170 src_db.commit()
171 dst_db.commit()
172
174 """It gets 2 connections in state where tbl should be in same state.
175 """
176 raise Exception('process_sync not implemented')
177