Package londiste :: Module repair
[frames] | no frames]

Source Code for Module londiste.repair

  1   
  2  """Repair data on subscriber. 
  3   
  4  Walks tables by primary key and searcher 
  5  missing inserts/updates/deletes. 
  6  """ 
  7   
  8  import sys, os, time, psycopg, skytools 
  9   
 10  from syncer import Syncer 
 11   
 12  __all__ = ['Repairer'] 
 13   
14 -def unescape(s):
15 return skytools.unescape_copy(s)
16
17 -def get_pkey_list(curs, tbl):
18 """Get list of pkey fields in right order.""" 19 20 oid = skytools.get_table_oid(curs, tbl) 21 q = """SELECT k.attname FROM pg_index i, pg_attribute k 22 WHERE i.indrelid = %s AND k.attrelid = i.indexrelid 23 AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped 24 ORDER BY k.attnum""" 25 curs.execute(q, [oid]) 26 list = [] 27 for row in curs.fetchall(): 28 list.append(row[0]) 29 return list
30
31 -def get_column_list(curs, tbl):
32 """Get list of columns in right order.""" 33 34 oid = skytools.get_table_oid(curs, tbl) 35 q = """SELECT a.attname FROM pg_attribute a 36 WHERE a.attrelid = %s 37 AND a.attnum > 0 AND NOT a.attisdropped 38 ORDER BY a.attnum""" 39 curs.execute(q, [oid]) 40 list = [] 41 for row in curs.fetchall(): 42 list.append(row[0]) 43 return list
44
45 -class Repairer(Syncer):
46 """Walks tables in primary key order and checks if data matches.""" 47 48
49 - def process_sync(self, tbl, src_db, dst_db):
50 """Actual comparision.""" 51 52 src_curs = src_db.cursor() 53 dst_curs = dst_db.cursor() 54 55 self.log.info('Checking %s' % tbl) 56 57 self.common_fields = [] 58 self.pkey_list = [] 59 copy_tbl = self.gen_copy_tbl(tbl, src_curs, dst_curs) 60 61 dump_src = tbl + ".src" 62 dump_dst = tbl + ".dst" 63 64 self.log.info("Dumping src table: %s" % tbl) 65 self.dump_table(tbl, copy_tbl, src_curs, dump_src) 66 src_db.commit() 67 self.log.info("Dumping dst table: %s" % tbl) 68 self.dump_table(tbl, copy_tbl, dst_curs, dump_dst) 69 dst_db.commit() 70 71 self.log.info("Sorting src table: %s" % tbl) 72 73 s_in, s_out = os.popen4("sort --version") 74 s_ver = s_out.read() 75 del s_in, s_out 76 if s_ver.find("coreutils") > 0: 77 args = "-S 30%" 78 else: 79 args = "" 80 os.system("sort %s -T . -o %s.sorted %s" % (args, dump_src, dump_src)) 81 self.log.info("Sorting dst table: %s" % tbl) 82 os.system("sort %s -T . -o %s.sorted %s" % (args, dump_dst, dump_dst)) 83 84 self.dump_compare(tbl, dump_src + ".sorted", dump_dst + ".sorted") 85 86 os.unlink(dump_src) 87 os.unlink(dump_dst) 88 os.unlink(dump_src + ".sorted") 89 os.unlink(dump_dst + ".sorted")
90
91 - def gen_copy_tbl(self, tbl, src_curs, dst_curs):
92 self.pkey_list = get_pkey_list(src_curs, tbl) 93 dst_pkey = get_pkey_list(dst_curs, tbl) 94 if dst_pkey != self.pkey_list: 95 self.log.error('pkeys do not match') 96 sys.exit(1) 97 98 src_cols = get_column_list(src_curs, tbl) 99 dst_cols = get_column_list(dst_curs, tbl) 100 field_list = [] 101 for f in self.pkey_list: 102 field_list.append(f) 103 for f in src_cols: 104 if f in self.pkey_list: 105 continue 106 if f in dst_cols: 107 field_list.append(f) 108 109 self.common_fields = field_list 110 111 tbl_expr = "%s (%s)" % (tbl, ",".join(field_list)) 112 113 self.log.debug("using copy expr: %s" % tbl_expr) 114 115 return tbl_expr
116
117 - def dump_table(self, tbl, copy_tbl, curs, fn):
118 f = open(fn, "w", 64*1024) 119 curs.copy_to(f, copy_tbl) 120 size = f.tell() 121 f.close() 122 self.log.info('Got %d bytes' % size)
123
124 - def get_row(self, ln):
125 t = ln[:-1].split('\t') 126 row = {} 127 for i in range(len(self.common_fields)): 128 row[self.common_fields[i]] = t[i] 129 return row
130
131 - def dump_compare(self, tbl, src_fn, dst_fn):
132 self.log.info("Comparing dumps: %s" % tbl) 133 self.cnt_insert = 0 134 self.cnt_update = 0 135 self.cnt_delete = 0 136 self.total_src = 0 137 self.total_dst = 0 138 f1 = open(src_fn, "r", 64*1024) 139 f2 = open(dst_fn, "r", 64*1024) 140 src_ln = f1.readline() 141 dst_ln = f2.readline() 142 if src_ln: self.total_src += 1 143 if dst_ln: self.total_dst += 1 144 145 fix = "fix.%s.sql" % tbl 146 if os.path.isfile(fix): 147 os.unlink(fix) 148 149 while src_ln or dst_ln: 150 keep_src = keep_dst = 0 151 if src_ln != dst_ln: 152 src_row = self.get_row(src_ln) 153 dst_row = self.get_row(dst_ln) 154 155 cmp = self.cmp_keys(src_row, dst_row) 156 if cmp > 0: 157 # src > dst 158 self.got_missed_delete(tbl, dst_row) 159 keep_src = 1 160 elif cmp < 0: 161 # src < dst 162 self.got_missed_insert(tbl, src_row) 163 keep_dst = 1 164 else: 165 if self.cmp_data(src_row, dst_row) != 0: 166 self.got_missed_update(tbl, src_row, dst_row) 167 168 if not keep_src: 169 src_ln = f1.readline() 170 if src_ln: self.total_src += 1 171 if not keep_dst: 172 dst_ln = f2.readline() 173 if dst_ln: self.total_dst += 1 174 175 self.log.info("finished %s: src: %d rows, dst: %d rows,"\ 176 " missed: %d inserts, %d updates, %d deletes" % ( 177 tbl, self.total_src, self.total_dst, 178 self.cnt_insert, self.cnt_update, self.cnt_delete))
179
180 - def got_missed_insert(self, tbl, src_row):
181 self.cnt_insert += 1 182 fld_list = self.common_fields 183 val_list = [] 184 for f in fld_list: 185 v = unescape(src_row[f]) 186 val_list.append(skytools.quote_literal(v)) 187 q = "insert into %s (%s) values (%s);" % ( 188 tbl, ", ".join(fld_list), ", ".join(val_list)) 189 self.show_fix(tbl, q, 'insert')
190
191 - def got_missed_update(self, tbl, src_row, dst_row):
192 self.cnt_update += 1 193 fld_list = self.common_fields 194 set_list = [] 195 whe_list = [] 196 for f in self.pkey_list: 197 self.addcmp(whe_list, f, unescape(src_row[f])) 198 for f in fld_list: 199 v1 = src_row[f] 200 v2 = dst_row[f] 201 if self.cmp_value(v1, v2) == 0: 202 continue 203 204 self.addeq(set_list, f, unescape(v1)) 205 self.addcmp(whe_list, f, unescape(v2)) 206 207 q = "update only %s set %s where %s;" % ( 208 tbl, ", ".join(set_list), " and ".join(whe_list)) 209 self.show_fix(tbl, q, 'update')
210
211 - def got_missed_delete(self, tbl, dst_row, pkey_list):
212 self.cnt_delete += 1 213 whe_list = [] 214 for f in self.pkey_list: 215 self.addcmp(whe_list, f, unescape(dst_row[f])) 216 q = "delete from only %s where %s;" % (tbl, " and ".join(whe_list)) 217 self.show_fix(tbl, q, 'delete')
218
219 - def show_fix(self, tbl, q, desc):
220 #self.log.warning("missed %s: %s" % (desc, q)) 221 fn = "fix.%s.sql" % tbl 222 open(fn, "a").write("%s\n" % q)
223
224 - def addeq(self, list, f, v):
225 vq = skytools.quote_literal(v) 226 s = "%s = %s" % (f, vq) 227 list.append(s)
228
229 - def addcmp(self, list, f, v):
230 if v is None: 231 s = "%s is null" % f 232 else: 233 vq = skytools.quote_literal(v) 234 s = "%s = %s" % (f, vq) 235 list.append(s)
236
237 - def cmp_data(self, src_row, dst_row):
238 for k in self.common_fields: 239 v1 = src_row[k] 240 v2 = dst_row[k] 241 if self.cmp_value(v1, v2) != 0: 242 return -1 243 return 0
244
245 - def cmp_value(self, v1, v2):
246 if v1 == v2: 247 return 0 248 249 # try to work around tz vs. notz 250 z1 = len(v1) 251 z2 = len(v2) 252 if z1 == z2 + 3 and z2 >= 19 and v1[z2] == '+': 253 v1 = v1[:-3] 254 if v1 == v2: 255 return 0 256 elif z1 + 3 == z2 and z1 >= 19 and v2[z1] == '+': 257 v2 = v2[:-3] 258 if v1 == v2: 259 return 0 260 261 return -1
262
263 - def cmp_keys(self, src_row, dst_row):
264 """Compare primary keys of the rows. 265 266 Returns 1 if src > dst, -1 if src < dst and 0 if src == dst""" 267 268 # None means table is done. tag it larger than any existing row. 269 if src_row is None: 270 if dst_row is None: 271 return 0 272 return 1 273 elif dst_row is None: 274 return -1 275 276 for k in self.pkey_list: 277 v1 = src_row[k] 278 v2 = dst_row[k] 279 if v1 < v2: 280 return -1 281 elif v1 > v2: 282 return 1 283 return 0
284