Package londiste :: Module setup
[frames] | no frames]

Source Code for Module londiste.setup

  1  #! /usr/bin/env python 
  2   
  3  """Londiste setup and sanity checker. 
  4   
  5  """ 
  6  import sys, os, skytools 
  7  from installer import * 
  8   
  9  __all__ = ['ProviderSetup', 'SubscriberSetup'] 
 10   
11 -def find_column_types(curs, table):
12 table_oid = skytools.get_table_oid(curs, table) 13 if table_oid == None: 14 return None 15 16 key_sql = """ 17 SELECT k.attname FROM pg_index i, pg_attribute k 18 WHERE i.indrelid = %d AND k.attrelid = i.indexrelid 19 AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped 20 """ % table_oid 21 22 # find columns 23 q = """ 24 SELECT a.attname as name, 25 CASE WHEN k.attname IS NOT NULL 26 THEN 'k' ELSE 'v' END AS type 27 FROM pg_attribute a LEFT JOIN (%s) k ON (k.attname = a.attname) 28 WHERE a.attrelid = %d AND a.attnum > 0 AND NOT a.attisdropped 29 ORDER BY a.attnum 30 """ % (key_sql, table_oid) 31 curs.execute(q) 32 rows = curs.dictfetchall() 33 return rows
34
35 -def make_type_string(col_rows):
36 res = map(lambda x: x['type'], col_rows) 37 return "".join(res)
38
39 -class CommonSetup(skytools.DBScript):
40 - def __init__(self, args):
41 skytools.DBScript.__init__(self, 'londiste', args) 42 self.set_single_loop(1) 43 self.pidfile = self.pidfile + ".setup" 44 45 self.pgq_queue_name = self.cf.get("pgq_queue_name") 46 self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name) 47 self.fake = self.cf.getint('fake', 0) 48 49 if len(self.args) < 3: 50 self.log.error("need subcommand") 51 sys.exit(1)
52
53 - def run(self):
54 self.admin()
55
56 - def fetch_provider_table_list(self, curs):
57 q = """select table_name, trigger_name 58 from londiste.provider_get_table_list(%s)""" 59 curs.execute(q, [self.pgq_queue_name]) 60 return curs.dictfetchall()
61
62 - def get_provider_table_list(self):
63 src_db = self.get_database('provider_db') 64 src_curs = src_db.cursor() 65 list = self.fetch_provider_table_list(src_curs) 66 src_db.commit() 67 res = [] 68 for row in list: 69 res.append(row['table_name']) 70 return res
71
72 - def get_provider_seqs(self, curs):
73 q = """SELECT * from londiste.provider_get_seq_list(%s)""" 74 curs.execute(q, [self.pgq_queue_name]) 75 res = [] 76 for row in curs.fetchall(): 77 res.append(row[0]) 78 return res
79
80 - def get_all_seqs(self, curs):
81 q = """SELECT n.nspname || '.'|| c.relname 82 from pg_class c, pg_namespace n 83 where n.oid = c.relnamespace 84 and c.relkind = 'S' 85 order by 1""" 86 curs.execute(q) 87 res = [] 88 for row in curs.fetchall(): 89 res.append(row[0]) 90 return res
91
92 - def check_provider_queue(self):
93 src_db = self.get_database('provider_db') 94 src_curs = src_db.cursor() 95 q = "select count(1) from pgq.get_queue_info(%s)" 96 src_curs.execute(q, [self.pgq_queue_name]) 97 ok = src_curs.fetchone()[0] 98 src_db.commit() 99 100 if not ok: 101 self.log.error('Event queue does not exist yet') 102 sys.exit(1)
103
104 - def fetch_subscriber_tables(self, curs):
105 q = "select * from londiste.subscriber_get_table_list(%s)" 106 curs.execute(q, [self.pgq_queue_name]) 107 return curs.dictfetchall()
108
110 dst_db = self.get_database('subscriber_db') 111 dst_curs = dst_db.cursor() 112 list = self.fetch_subscriber_tables(dst_curs) 113 dst_db.commit() 114 res = [] 115 for row in list: 116 res.append(row['table_name']) 117 return res
118
119 - def init_optparse(self, parser=None):
120 p = skytools.DBScript.init_optparse(self, parser) 121 p.add_option("--expect-sync", action="store_true", dest="expect_sync", 122 help = "no copy needed", default=False) 123 p.add_option("--force", action="store_true", 124 help="force", default=False) 125 return p
126 127 128 # 129 # Provider commands 130 # 131
132 -class ProviderSetup(CommonSetup):
133
134 - def admin(self):
135 cmd = self.args[2] 136 if cmd == "tables": 137 self.provider_show_tables() 138 elif cmd == "add": 139 self.provider_add_tables(self.args[3:]) 140 elif cmd == "remove": 141 self.provider_remove_tables(self.args[3:]) 142 elif cmd == "add-seq": 143 for seq in self.args[3:]: 144 self.provider_add_seq(seq) 145 self.provider_notify_change() 146 elif cmd == "remove-seq": 147 for seq in self.args[3:]: 148 self.provider_remove_seq(seq) 149 self.provider_notify_change() 150 elif cmd == "install": 151 self.provider_install() 152 elif cmd == "seqs": 153 self.provider_list_seqs() 154 else: 155 self.log.error('bad subcommand') 156 sys.exit(1)
157
158 - def provider_list_seqs(self):
159 src_db = self.get_database('provider_db') 160 src_curs = src_db.cursor() 161 list = self.get_provider_seqs(src_curs) 162 src_db.commit() 163 164 for seq in list: 165 print seq
166
167 - def provider_install(self):
168 src_db = self.get_database('provider_db') 169 src_curs = src_db.cursor() 170 install_provider(src_curs, self.log) 171 172 # create event queue 173 q = "select pgq.create_queue(%s)" 174 self.exec_provider(q, [self.pgq_queue_name])
175
176 - def provider_add_tables(self, table_list):
177 self.check_provider_queue() 178 179 cur_list = self.get_provider_table_list() 180 for tbl in table_list: 181 if tbl.find('.') < 0: 182 tbl = "public." + tbl 183 if tbl not in cur_list: 184 self.log.info('Adding %s' % tbl) 185 self.provider_add_table(tbl) 186 else: 187 self.log.info("Table %s already added" % tbl) 188 self.provider_notify_change()
189
190 - def provider_remove_tables(self, table_list):
191 self.check_provider_queue() 192 193 cur_list = self.get_provider_table_list() 194 for tbl in table_list: 195 if tbl.find('.') < 0: 196 tbl = "public." + tbl 197 if tbl not in cur_list: 198 self.log.info('%s already removed' % tbl) 199 else: 200 self.log.info("Removing %s" % tbl) 201 self.provider_remove_table(tbl) 202 self.provider_notify_change()
203
204 - def provider_add_table(self, tbl):
205 q = "select londiste.provider_add_table(%s, %s)" 206 self.exec_provider(q, [self.pgq_queue_name, tbl])
207
208 - def provider_remove_table(self, tbl):
209 q = "select londiste.provider_remove_table(%s, %s)" 210 self.exec_provider(q, [self.pgq_queue_name, tbl])
211
212 - def provider_show_tables(self):
213 self.check_provider_queue() 214 list = self.get_provider_table_list() 215 for tbl in list: 216 print tbl
217
218 - def provider_notify_change(self):
219 q = "select londiste.provider_notify_change(%s)" 220 self.exec_provider(q, [self.pgq_queue_name])
221
222 - def provider_add_seq(self, seq):
223 seq = skytools.fq_name(seq) 224 q = "select londiste.provider_add_seq(%s, %s)" 225 self.exec_provider(q, [self.pgq_queue_name, seq])
226
227 - def provider_remove_seq(self, seq):
228 seq = skytools.fq_name(seq) 229 q = "select londiste.provider_remove_seq(%s, %s)" 230 self.exec_provider(q, [self.pgq_queue_name, seq])
231
232 - def exec_provider(self, sql, args):
233 src_db = self.get_database('provider_db') 234 src_curs = src_db.cursor() 235 236 src_curs.execute(sql, args) 237 238 if self.fake: 239 src_db.rollback() 240 else: 241 src_db.commit()
242 243 # 244 # Subscriber commands 245 # 246
247 -class SubscriberSetup(CommonSetup):
248
249 - def admin(self):
250 cmd = self.args[2] 251 if cmd == "tables": 252 self.subscriber_show_tables() 253 elif cmd == "missing": 254 self.subscriber_missing_tables() 255 elif cmd == "add": 256 self.subscriber_add_tables(self.args[3:]) 257 elif cmd == "remove": 258 self.subscriber_remove_tables(self.args[3:]) 259 elif cmd == "resync": 260 self.subscriber_resync_tables(self.args[3:]) 261 elif cmd == "register": 262 self.subscriber_register() 263 elif cmd == "unregister": 264 self.subscriber_unregister() 265 elif cmd == "install": 266 self.subscriber_install() 267 elif cmd == "check": 268 self.check_tables(self.get_provider_table_list()) 269 elif cmd == "fkeys": 270 self.collect_fkeys(self.get_provider_table_list()) 271 elif cmd == "seqs": 272 self.subscriber_list_seqs() 273 elif cmd == "add-seq": 274 self.subscriber_add_seq(self.args[3:]) 275 elif cmd == "remove-seq": 276 self.subscriber_remove_seq(self.args[3:]) 277 else: 278 self.log.error('bad subcommand: ' + cmd) 279 sys.exit(1)
280
281 - def collect_fkeys(self, table_list):
282 dst_db = self.get_database('subscriber_db') 283 dst_curs = dst_db.cursor() 284 285 oid_list = [] 286 for tbl in table_list: 287 try: 288 oid = skytools.get_table_oid(dst_curs, tbl) 289 if oid: 290 oid_list.append(str(oid)) 291 except: 292 pass 293 if len(oid_list) == 0: 294 print "No tables" 295 return 296 oid_str = ",".join(oid_list) 297 298 q = "SELECT n.nspname || '.' || t.relname as tbl, c.conname as con,"\ 299 " pg_get_constraintdef(c.oid) as def"\ 300 " FROM pg_constraint c, pg_class t, pg_namespace n"\ 301 " WHERE c.contype = 'f' and c.conrelid in (%s)"\ 302 " AND t.oid = c.conrelid AND n.oid = t.relnamespace" % oid_str 303 dst_curs.execute(q) 304 res = dst_curs.dictfetchall() 305 dst_db.commit() 306 307 print "-- dropping" 308 for row in res: 309 q = "ALTER TABLE ONLY %(tbl)s DROP CONSTRAINT %(con)s;" 310 print q % row 311 print "-- creating" 312 for row in res: 313 q = "ALTER TABLE ONLY %(tbl)s ADD CONSTRAINT %(con)s %(def)s;" 314 print q % row
315
316 - def check_tables(self, table_list):
317 src_db = self.get_database('provider_db') 318 src_curs = src_db.cursor() 319 dst_db = self.get_database('subscriber_db') 320 dst_curs = dst_db.cursor() 321 322 failed = 0 323 for tbl in table_list: 324 self.log.info('Checking %s' % tbl) 325 if not skytools.exists_table(src_curs, tbl): 326 self.log.error('Table %s missing from provider side' % tbl) 327 failed += 1 328 elif not skytools.exists_table(dst_curs, tbl): 329 self.log.error('Table %s missing from subscriber side' % tbl) 330 failed += 1 331 else: 332 failed += self.check_table_columns(src_curs, dst_curs, tbl) 333 failed += self.check_table_triggers(dst_curs, tbl) 334 335 src_db.commit() 336 dst_db.commit() 337 338 return failed
339
340 - def check_table_triggers(self, dst_curs, tbl):
341 oid = skytools.get_table_oid(dst_curs, tbl) 342 if not oid: 343 self.log.error('Table %s not found' % tbl) 344 return 1 345 q = "select count(1) from pg_trigger where tgrelid = %s" 346 dst_curs.execute(q, [oid]) 347 got = dst_curs.fetchone()[0] 348 if got: 349 self.log.error('found trigger on table %s (%s)' % (tbl, str(oid))) 350 return 1 351 else: 352 return 0
353
354 - def check_table_columns(self, src_curs, dst_curs, tbl):
355 src_colrows = find_column_types(src_curs, tbl) 356 dst_colrows = find_column_types(dst_curs, tbl) 357 358 src_cols = make_type_string(src_colrows) 359 dst_cols = make_type_string(dst_colrows) 360 if src_cols.find('k') < 0: 361 self.log.error('provider table %s has no primary key (%s)' % ( 362 tbl, src_cols)) 363 return 1 364 if dst_cols.find('k') < 0: 365 self.log.error('subscriber table %s has no primary key (%s)' % ( 366 tbl, dst_cols)) 367 return 1 368 369 if src_cols != dst_cols: 370 self.log.warning('table %s structure is not same (%s/%s)'\ 371 ', trying to continue' % (tbl, src_cols, dst_cols)) 372 373 err = 0 374 for row in src_colrows: 375 found = 0 376 for row2 in dst_colrows: 377 if row2['name'] == row['name']: 378 found = 1 379 break 380 if not found: 381 err = 1 382 self.log.error('%s: column %s on provider not on subscriber' 383 % (tbl, row['name'])) 384 elif row['type'] != row2['type']: 385 err = 1 386 self.log.error('%s: pk different on column %s' 387 % (tbl, row['name'])) 388 389 return err
390
391 - def subscriber_install(self):
392 dst_db = self.get_database('subscriber_db') 393 dst_curs = dst_db.cursor() 394 395 install_subscriber(dst_curs, self.log) 396 397 if self.fake: 398 self.log.debug('rollback') 399 dst_db.rollback() 400 else: 401 self.log.debug('commit') 402 dst_db.commit()
403
404 - def subscriber_register(self):
405 src_db = self.get_database('provider_db') 406 src_curs = src_db.cursor() 407 src_curs.execute("select pgq.register_consumer(%s, %s)", 408 [self.pgq_queue_name, self.consumer_id]) 409 src_db.commit()
410
411 - def subscriber_unregister(self):
412 q = "select londiste.subscriber_set_table_state(%s, %s, NULL, NULL)" 413 414 dst_db = self.get_database('subscriber_db') 415 dst_curs = dst_db.cursor() 416 tbl_rows = self.fetch_subscriber_tables(dst_curs) 417 for row in tbl_rows: 418 dst_curs.execute(q, [self.pgq_queue_name, row['table_name']]) 419 dst_db.commit() 420 421 src_db = self.get_database('provider_db') 422 src_curs = src_db.cursor() 423 src_curs.execute("select pgq.unregister_consumer(%s, %s)", 424 [self.pgq_queue_name, self.consumer_id]) 425 src_db.commit()
426
427 - def subscriber_show_tables(self):
428 list = self.get_subscriber_table_list() 429 for tbl in list: 430 print tbl
431
433 provider_tables = self.get_provider_table_list() 434 subscriber_tables = self.get_subscriber_table_list() 435 for tbl in provider_tables: 436 if tbl not in subscriber_tables: 437 print tbl
438
439 - def subscriber_add_tables(self, table_list):
440 provider_tables = self.get_provider_table_list() 441 subscriber_tables = self.get_subscriber_table_list() 442 443 err = 0 444 for tbl in table_list: 445 tbl = skytools.fq_name(tbl) 446 if tbl not in provider_tables: 447 err = 1 448 self.log.error("Table %s not attached to queue" % tbl) 449 if err: 450 if self.options.force: 451 self.log.warning('--force used, ignoring errors') 452 else: 453 sys.exit(1) 454 455 err = self.check_tables(table_list) 456 if err: 457 if self.options.force: 458 self.log.warning('--force used, ignoring errors') 459 else: 460 sys.exit(1) 461 462 for tbl in table_list: 463 tbl = skytools.fq_name(tbl) 464 if tbl in subscriber_tables: 465 self.log.info("Table %s already added" % tbl) 466 else: 467 self.log.info("Adding %s" % tbl) 468 self.subscriber_add_one_table(tbl)
469
470 - def subscriber_remove_tables(self, table_list):
471 subscriber_tables = self.get_subscriber_table_list() 472 for tbl in table_list: 473 tbl = skytools.fq_name(tbl) 474 if tbl in subscriber_tables: 475 self.subscriber_remove_one_table(tbl) 476 else: 477 self.log.info("Table %s already removed" % tbl)
478
479 - def subscriber_resync_tables(self, table_list):
480 dst_db = self.get_database('subscriber_db') 481 dst_curs = dst_db.cursor() 482 list = self.fetch_subscriber_tables(dst_curs) 483 for tbl in table_list: 484 tbl = skytools.fq_name(tbl) 485 tbl_row = None 486 for row in list: 487 if row['table_name'] == tbl: 488 tbl_row = row 489 break 490 if not tbl_row: 491 self.log.warning("Table %s not found" % tbl) 492 elif tbl_row['merge_state'] != 'ok': 493 self.log.warning("Table %s is not in stable state" % tbl) 494 else: 495 self.log.info("Resyncing %s" % tbl) 496 q = "select londiste.subscriber_set_table_state(%s, %s, NULL, NULL)" 497 dst_curs.execute(q, [self.pgq_queue_name, tbl]) 498 dst_db.commit()
499
500 - def subscriber_add_one_table(self, tbl):
501 q = "select londiste.subscriber_add_table(%s, %s)" 502 503 dst_db = self.get_database('subscriber_db') 504 dst_curs = dst_db.cursor() 505 dst_curs.execute(q, [self.pgq_queue_name, tbl]) 506 if self.options.expect_sync: 507 q = "select londiste.subscriber_set_table_state(%s, %s, null, 'ok')" 508 dst_curs.execute(q, [self.pgq_queue_name, tbl]) 509 dst_db.commit()
510
511 - def subscriber_remove_one_table(self, tbl):
512 q = "select londiste.subscriber_remove_table(%s, %s)" 513 514 dst_db = self.get_database('subscriber_db') 515 dst_curs = dst_db.cursor() 516 dst_curs.execute(q, [self.pgq_queue_name, tbl]) 517 dst_db.commit()
518
519 - def get_subscriber_seq_list(self):
520 dst_db = self.get_database('subscriber_db') 521 dst_curs = dst_db.cursor() 522 q = "SELECT * from londiste.subscriber_get_seq_list(%s)" 523 dst_curs.execute(q, [self.pgq_queue_name]) 524 list = dst_curs.fetchall() 525 dst_db.commit() 526 res = [] 527 for row in list: 528 res.append(row[0]) 529 return res
530
531 - def subscriber_list_seqs(self):
532 list = self.get_subscriber_seq_list() 533 for seq in list: 534 print seq
535
536 - def subscriber_add_seq(self, seq_list):
537 src_db = self.get_database('provider_db') 538 src_curs = src_db.cursor() 539 dst_db = self.get_database('subscriber_db') 540 dst_curs = dst_db.cursor() 541 542 prov_list = self.get_provider_seqs(src_curs) 543 src_db.commit() 544 545 full_list = self.get_all_seqs(dst_curs) 546 cur_list = self.get_subscriber_seq_list() 547 548 for seq in seq_list: 549 seq = skytools.fq_name(seq) 550 if seq not in prov_list: 551 self.log.error('Seq %s does not exist on provider side' % seq) 552 continue 553 if seq not in full_list: 554 self.log.error('Seq %s does not exist on subscriber side' % seq) 555 continue 556 if seq in cur_list: 557 self.log.info('Seq %s already subscribed' % seq) 558 continue 559 560 self.log.info('Adding sequence: %s' % seq) 561 q = "select londiste.subscriber_add_seq(%s, %s)" 562 dst_curs.execute(q, [self.pgq_queue_name, seq]) 563 564 dst_db.commit()
565
566 - def subscriber_remove_seq(self, seq_list):
567 dst_db = self.get_database('subscriber_db') 568 dst_curs = dst_db.cursor() 569 cur_list = self.get_subscriber_seq_list() 570 571 for seq in seq_list: 572 seq = skytools.fq_name(seq) 573 if seq not in cur_list: 574 self.log.warning('Seq %s not subscribed') 575 else: 576 self.log.info('Removing sequence: %s' % seq) 577 q = "select londiste.subscriber_remove_seq(%s, %s)" 578 dst_curs.execute(q, [self.pgq_queue_name, seq]) 579 dst_db.commit()
580