1
2 """Repair data on subscriber.
3
4 Walks tables by primary key and searcher
5 missing inserts/updates/deletes.
6 """
7
8 import sys, os, time, psycopg, skytools
9
10 from syncer import Syncer
11
12 __all__ = ['Repairer']
13
16
18 """Get list of pkey fields in right order."""
19
20 oid = skytools.get_table_oid(curs, tbl)
21 q = """SELECT k.attname FROM pg_index i, pg_attribute k
22 WHERE i.indrelid = %s AND k.attrelid = i.indexrelid
23 AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped
24 ORDER BY k.attnum"""
25 curs.execute(q, [oid])
26 list = []
27 for row in curs.fetchall():
28 list.append(row[0])
29 return list
30
32 """Get list of columns in right order."""
33
34 oid = skytools.get_table_oid(curs, tbl)
35 q = """SELECT a.attname FROM pg_attribute a
36 WHERE a.attrelid = %s
37 AND a.attnum > 0 AND NOT a.attisdropped
38 ORDER BY a.attnum"""
39 curs.execute(q, [oid])
40 list = []
41 for row in curs.fetchall():
42 list.append(row[0])
43 return list
44
46 """Walks tables in primary key order and checks if data matches."""
47
48
50 """Actual comparision."""
51
52 src_curs = src_db.cursor()
53 dst_curs = dst_db.cursor()
54
55 self.log.info('Checking %s' % tbl)
56
57 self.common_fields = []
58 self.pkey_list = []
59 copy_tbl = self.gen_copy_tbl(tbl, src_curs, dst_curs)
60
61 dump_src = tbl + ".src"
62 dump_dst = tbl + ".dst"
63
64 self.log.info("Dumping src table: %s" % tbl)
65 self.dump_table(tbl, copy_tbl, src_curs, dump_src)
66 src_db.commit()
67 self.log.info("Dumping dst table: %s" % tbl)
68 self.dump_table(tbl, copy_tbl, dst_curs, dump_dst)
69 dst_db.commit()
70
71 self.log.info("Sorting src table: %s" % tbl)
72
73 s_in, s_out = os.popen4("sort --version")
74 s_ver = s_out.read()
75 del s_in, s_out
76 if s_ver.find("coreutils") > 0:
77 args = "-S 30%"
78 else:
79 args = ""
80 os.system("sort %s -T . -o %s.sorted %s" % (args, dump_src, dump_src))
81 self.log.info("Sorting dst table: %s" % tbl)
82 os.system("sort %s -T . -o %s.sorted %s" % (args, dump_dst, dump_dst))
83
84 self.dump_compare(tbl, dump_src + ".sorted", dump_dst + ".sorted")
85
86 os.unlink(dump_src)
87 os.unlink(dump_dst)
88 os.unlink(dump_src + ".sorted")
89 os.unlink(dump_dst + ".sorted")
90
92 self.pkey_list = get_pkey_list(src_curs, tbl)
93 dst_pkey = get_pkey_list(dst_curs, tbl)
94 if dst_pkey != self.pkey_list:
95 self.log.error('pkeys do not match')
96 sys.exit(1)
97
98 src_cols = get_column_list(src_curs, tbl)
99 dst_cols = get_column_list(dst_curs, tbl)
100 field_list = []
101 for f in self.pkey_list:
102 field_list.append(f)
103 for f in src_cols:
104 if f in self.pkey_list:
105 continue
106 if f in dst_cols:
107 field_list.append(f)
108
109 self.common_fields = field_list
110
111 tbl_expr = "%s (%s)" % (tbl, ",".join(field_list))
112
113 self.log.debug("using copy expr: %s" % tbl_expr)
114
115 return tbl_expr
116
118 f = open(fn, "w", 64*1024)
119 curs.copy_to(f, copy_tbl)
120 size = f.tell()
121 f.close()
122 self.log.info('Got %d bytes' % size)
123
125 t = ln[:-1].split('\t')
126 row = {}
127 for i in range(len(self.common_fields)):
128 row[self.common_fields[i]] = t[i]
129 return row
130
132 self.log.info("Comparing dumps: %s" % tbl)
133 self.cnt_insert = 0
134 self.cnt_update = 0
135 self.cnt_delete = 0
136 self.total_src = 0
137 self.total_dst = 0
138 f1 = open(src_fn, "r", 64*1024)
139 f2 = open(dst_fn, "r", 64*1024)
140 src_ln = f1.readline()
141 dst_ln = f2.readline()
142 if src_ln: self.total_src += 1
143 if dst_ln: self.total_dst += 1
144
145 fix = "fix.%s.sql" % tbl
146 if os.path.isfile(fix):
147 os.unlink(fix)
148
149 while src_ln or dst_ln:
150 keep_src = keep_dst = 0
151 if src_ln != dst_ln:
152 src_row = self.get_row(src_ln)
153 dst_row = self.get_row(dst_ln)
154
155 cmp = self.cmp_keys(src_row, dst_row)
156 if cmp > 0:
157
158 self.got_missed_delete(tbl, dst_row)
159 keep_src = 1
160 elif cmp < 0:
161
162 self.got_missed_insert(tbl, src_row)
163 keep_dst = 1
164 else:
165 if self.cmp_data(src_row, dst_row) != 0:
166 self.got_missed_update(tbl, src_row, dst_row)
167
168 if not keep_src:
169 src_ln = f1.readline()
170 if src_ln: self.total_src += 1
171 if not keep_dst:
172 dst_ln = f2.readline()
173 if dst_ln: self.total_dst += 1
174
175 self.log.info("finished %s: src: %d rows, dst: %d rows,"\
176 " missed: %d inserts, %d updates, %d deletes" % (
177 tbl, self.total_src, self.total_dst,
178 self.cnt_insert, self.cnt_update, self.cnt_delete))
179
181 self.cnt_insert += 1
182 fld_list = self.common_fields
183 val_list = []
184 for f in fld_list:
185 v = unescape(src_row[f])
186 val_list.append(skytools.quote_literal(v))
187 q = "insert into %s (%s) values (%s);" % (
188 tbl, ", ".join(fld_list), ", ".join(val_list))
189 self.show_fix(tbl, q, 'insert')
190
192 self.cnt_update += 1
193 fld_list = self.common_fields
194 set_list = []
195 whe_list = []
196 for f in self.pkey_list:
197 self.addcmp(whe_list, f, unescape(src_row[f]))
198 for f in fld_list:
199 v1 = src_row[f]
200 v2 = dst_row[f]
201 if self.cmp_value(v1, v2) == 0:
202 continue
203
204 self.addeq(set_list, f, unescape(v1))
205 self.addcmp(whe_list, f, unescape(v2))
206
207 q = "update only %s set %s where %s;" % (
208 tbl, ", ".join(set_list), " and ".join(whe_list))
209 self.show_fix(tbl, q, 'update')
210
212 self.cnt_delete += 1
213 whe_list = []
214 for f in self.pkey_list:
215 self.addcmp(whe_list, f, unescape(dst_row[f]))
216 q = "delete from only %s where %s;" % (tbl, " and ".join(whe_list))
217 self.show_fix(tbl, q, 'delete')
218
220
221 fn = "fix.%s.sql" % tbl
222 open(fn, "a").write("%s\n" % q)
223
224 - def addeq(self, list, f, v):
228
229 - def addcmp(self, list, f, v):
230 if v is None:
231 s = "%s is null" % f
232 else:
233 vq = skytools.quote_literal(v)
234 s = "%s = %s" % (f, vq)
235 list.append(s)
236
238 for k in self.common_fields:
239 v1 = src_row[k]
240 v2 = dst_row[k]
241 if self.cmp_value(v1, v2) != 0:
242 return -1
243 return 0
244
246 if v1 == v2:
247 return 0
248
249
250 z1 = len(v1)
251 z2 = len(v2)
252 if z1 == z2 + 3 and z2 >= 19 and v1[z2] == '+':
253 v1 = v1[:-3]
254 if v1 == v2:
255 return 0
256 elif z1 + 3 == z2 and z1 >= 19 and v2[z1] == '+':
257 v2 = v2[:-3]
258 if v1 == v2:
259 return 0
260
261 return -1
262
264 """Compare primary keys of the rows.
265
266 Returns 1 if src > dst, -1 if src < dst and 0 if src == dst"""
267
268
269 if src_row is None:
270 if dst_row is None:
271 return 0
272 return 1
273 elif dst_row is None:
274 return -1
275
276 for k in self.pkey_list:
277 v1 = src_row[k]
278 v2 = dst_row[k]
279 if v1 < v2:
280 return -1
281 elif v1 > v2:
282 return 1
283 return 0
284