1
2
3 """Londiste setup and sanity checker.
4
5 """
6 import sys, os, skytools
7 from installer import *
8
9 __all__ = ['ProviderSetup', 'SubscriberSetup']
10
12 table_oid = skytools.get_table_oid(curs, table)
13 if table_oid == None:
14 return None
15
16 key_sql = """
17 SELECT k.attname FROM pg_index i, pg_attribute k
18 WHERE i.indrelid = %d AND k.attrelid = i.indexrelid
19 AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped
20 """ % table_oid
21
22
23 q = """
24 SELECT a.attname as name,
25 CASE WHEN k.attname IS NOT NULL
26 THEN 'k' ELSE 'v' END AS type
27 FROM pg_attribute a LEFT JOIN (%s) k ON (k.attname = a.attname)
28 WHERE a.attrelid = %d AND a.attnum > 0 AND NOT a.attisdropped
29 ORDER BY a.attnum
30 """ % (key_sql, table_oid)
31 curs.execute(q)
32 rows = curs.dictfetchall()
33 return rows
34
36 res = map(lambda x: x['type'], col_rows)
37 return "".join(res)
38
41 skytools.DBScript.__init__(self, 'londiste', args)
42 self.set_single_loop(1)
43 self.pidfile = self.pidfile + ".setup"
44
45 self.pgq_queue_name = self.cf.get("pgq_queue_name")
46 self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name)
47 self.fake = self.cf.getint('fake', 0)
48
49 if len(self.args) < 3:
50 self.log.error("need subcommand")
51 sys.exit(1)
52
55
57 q = """select table_name, trigger_name
58 from londiste.provider_get_table_list(%s)"""
59 curs.execute(q, [self.pgq_queue_name])
60 return curs.dictfetchall()
61
63 src_db = self.get_database('provider_db')
64 src_curs = src_db.cursor()
65 list = self.fetch_provider_table_list(src_curs)
66 src_db.commit()
67 res = []
68 for row in list:
69 res.append(row['table_name'])
70 return res
71
73 q = """SELECT * from londiste.provider_get_seq_list(%s)"""
74 curs.execute(q, [self.pgq_queue_name])
75 res = []
76 for row in curs.fetchall():
77 res.append(row[0])
78 return res
79
81 q = """SELECT n.nspname || '.'|| c.relname
82 from pg_class c, pg_namespace n
83 where n.oid = c.relnamespace
84 and c.relkind = 'S'
85 order by 1"""
86 curs.execute(q)
87 res = []
88 for row in curs.fetchall():
89 res.append(row[0])
90 return res
91
93 src_db = self.get_database('provider_db')
94 src_curs = src_db.cursor()
95 q = "select count(1) from pgq.get_queue_info(%s)"
96 src_curs.execute(q, [self.pgq_queue_name])
97 ok = src_curs.fetchone()[0]
98 src_db.commit()
99
100 if not ok:
101 self.log.error('Event queue does not exist yet')
102 sys.exit(1)
103
105 q = "select * from londiste.subscriber_get_table_list(%s)"
106 curs.execute(q, [self.pgq_queue_name])
107 return curs.dictfetchall()
108
110 dst_db = self.get_database('subscriber_db')
111 dst_curs = dst_db.cursor()
112 list = self.fetch_subscriber_tables(dst_curs)
113 dst_db.commit()
114 res = []
115 for row in list:
116 res.append(row['table_name'])
117 return res
118
120 p = skytools.DBScript.init_optparse(self, parser)
121 p.add_option("--expect-sync", action="store_true", dest="expect_sync",
122 help = "no copy needed", default=False)
123 p.add_option("--force", action="store_true",
124 help="force", default=False)
125 return p
126
127
128
129
130
131
133
157
159 src_db = self.get_database('provider_db')
160 src_curs = src_db.cursor()
161 list = self.get_provider_seqs(src_curs)
162 src_db.commit()
163
164 for seq in list:
165 print seq
166
175
177 self.check_provider_queue()
178
179 cur_list = self.get_provider_table_list()
180 for tbl in table_list:
181 if tbl.find('.') < 0:
182 tbl = "public." + tbl
183 if tbl not in cur_list:
184 self.log.info('Adding %s' % tbl)
185 self.provider_add_table(tbl)
186 else:
187 self.log.info("Table %s already added" % tbl)
188 self.provider_notify_change()
189
191 self.check_provider_queue()
192
193 cur_list = self.get_provider_table_list()
194 for tbl in table_list:
195 if tbl.find('.') < 0:
196 tbl = "public." + tbl
197 if tbl not in cur_list:
198 self.log.info('%s already removed' % tbl)
199 else:
200 self.log.info("Removing %s" % tbl)
201 self.provider_remove_table(tbl)
202 self.provider_notify_change()
203
205 q = "select londiste.provider_add_table(%s, %s)"
206 self.exec_provider(q, [self.pgq_queue_name, tbl])
207
209 q = "select londiste.provider_remove_table(%s, %s)"
210 self.exec_provider(q, [self.pgq_queue_name, tbl])
211
213 self.check_provider_queue()
214 list = self.get_provider_table_list()
215 for tbl in list:
216 print tbl
217
219 q = "select londiste.provider_notify_change(%s)"
220 self.exec_provider(q, [self.pgq_queue_name])
221
226
231
233 src_db = self.get_database('provider_db')
234 src_curs = src_db.cursor()
235
236 src_curs.execute(sql, args)
237
238 if self.fake:
239 src_db.rollback()
240 else:
241 src_db.commit()
242
243
244
245
246
248
280
282 dst_db = self.get_database('subscriber_db')
283 dst_curs = dst_db.cursor()
284
285 oid_list = []
286 for tbl in table_list:
287 try:
288 oid = skytools.get_table_oid(dst_curs, tbl)
289 if oid:
290 oid_list.append(str(oid))
291 except:
292 pass
293 if len(oid_list) == 0:
294 print "No tables"
295 return
296 oid_str = ",".join(oid_list)
297
298 q = "SELECT n.nspname || '.' || t.relname as tbl, c.conname as con,"\
299 " pg_get_constraintdef(c.oid) as def"\
300 " FROM pg_constraint c, pg_class t, pg_namespace n"\
301 " WHERE c.contype = 'f' and c.conrelid in (%s)"\
302 " AND t.oid = c.conrelid AND n.oid = t.relnamespace" % oid_str
303 dst_curs.execute(q)
304 res = dst_curs.dictfetchall()
305 dst_db.commit()
306
307 print "-- dropping"
308 for row in res:
309 q = "ALTER TABLE ONLY %(tbl)s DROP CONSTRAINT %(con)s;"
310 print q % row
311 print "-- creating"
312 for row in res:
313 q = "ALTER TABLE ONLY %(tbl)s ADD CONSTRAINT %(con)s %(def)s;"
314 print q % row
315
317 src_db = self.get_database('provider_db')
318 src_curs = src_db.cursor()
319 dst_db = self.get_database('subscriber_db')
320 dst_curs = dst_db.cursor()
321
322 failed = 0
323 for tbl in table_list:
324 self.log.info('Checking %s' % tbl)
325 if not skytools.exists_table(src_curs, tbl):
326 self.log.error('Table %s missing from provider side' % tbl)
327 failed += 1
328 elif not skytools.exists_table(dst_curs, tbl):
329 self.log.error('Table %s missing from subscriber side' % tbl)
330 failed += 1
331 else:
332 failed += self.check_table_columns(src_curs, dst_curs, tbl)
333 failed += self.check_table_triggers(dst_curs, tbl)
334
335 src_db.commit()
336 dst_db.commit()
337
338 return failed
339
341 oid = skytools.get_table_oid(dst_curs, tbl)
342 if not oid:
343 self.log.error('Table %s not found' % tbl)
344 return 1
345 q = "select count(1) from pg_trigger where tgrelid = %s"
346 dst_curs.execute(q, [oid])
347 got = dst_curs.fetchone()[0]
348 if got:
349 self.log.error('found trigger on table %s (%s)' % (tbl, str(oid)))
350 return 1
351 else:
352 return 0
353
355 src_colrows = find_column_types(src_curs, tbl)
356 dst_colrows = find_column_types(dst_curs, tbl)
357
358 src_cols = make_type_string(src_colrows)
359 dst_cols = make_type_string(dst_colrows)
360 if src_cols.find('k') < 0:
361 self.log.error('provider table %s has no primary key (%s)' % (
362 tbl, src_cols))
363 return 1
364 if dst_cols.find('k') < 0:
365 self.log.error('subscriber table %s has no primary key (%s)' % (
366 tbl, dst_cols))
367 return 1
368
369 if src_cols != dst_cols:
370 self.log.warning('table %s structure is not same (%s/%s)'\
371 ', trying to continue' % (tbl, src_cols, dst_cols))
372
373 err = 0
374 for row in src_colrows:
375 found = 0
376 for row2 in dst_colrows:
377 if row2['name'] == row['name']:
378 found = 1
379 break
380 if not found:
381 err = 1
382 self.log.error('%s: column %s on provider not on subscriber'
383 % (tbl, row['name']))
384 elif row['type'] != row2['type']:
385 err = 1
386 self.log.error('%s: pk different on column %s'
387 % (tbl, row['name']))
388
389 return err
390
392 dst_db = self.get_database('subscriber_db')
393 dst_curs = dst_db.cursor()
394
395 install_subscriber(dst_curs, self.log)
396
397 if self.fake:
398 self.log.debug('rollback')
399 dst_db.rollback()
400 else:
401 self.log.debug('commit')
402 dst_db.commit()
403
405 src_db = self.get_database('provider_db')
406 src_curs = src_db.cursor()
407 src_curs.execute("select pgq.register_consumer(%s, %s)",
408 [self.pgq_queue_name, self.consumer_id])
409 src_db.commit()
410
412 q = "select londiste.subscriber_set_table_state(%s, %s, NULL, NULL)"
413
414 dst_db = self.get_database('subscriber_db')
415 dst_curs = dst_db.cursor()
416 tbl_rows = self.fetch_subscriber_tables(dst_curs)
417 for row in tbl_rows:
418 dst_curs.execute(q, [self.pgq_queue_name, row['table_name']])
419 dst_db.commit()
420
421 src_db = self.get_database('provider_db')
422 src_curs = src_db.cursor()
423 src_curs.execute("select pgq.unregister_consumer(%s, %s)",
424 [self.pgq_queue_name, self.consumer_id])
425 src_db.commit()
426
428 list = self.get_subscriber_table_list()
429 for tbl in list:
430 print tbl
431
433 provider_tables = self.get_provider_table_list()
434 subscriber_tables = self.get_subscriber_table_list()
435 for tbl in provider_tables:
436 if tbl not in subscriber_tables:
437 print tbl
438
440 provider_tables = self.get_provider_table_list()
441 subscriber_tables = self.get_subscriber_table_list()
442
443 err = 0
444 for tbl in table_list:
445 tbl = skytools.fq_name(tbl)
446 if tbl not in provider_tables:
447 err = 1
448 self.log.error("Table %s not attached to queue" % tbl)
449 if err:
450 if self.options.force:
451 self.log.warning('--force used, ignoring errors')
452 else:
453 sys.exit(1)
454
455 err = self.check_tables(table_list)
456 if err:
457 if self.options.force:
458 self.log.warning('--force used, ignoring errors')
459 else:
460 sys.exit(1)
461
462 for tbl in table_list:
463 tbl = skytools.fq_name(tbl)
464 if tbl in subscriber_tables:
465 self.log.info("Table %s already added" % tbl)
466 else:
467 self.log.info("Adding %s" % tbl)
468 self.subscriber_add_one_table(tbl)
469
471 subscriber_tables = self.get_subscriber_table_list()
472 for tbl in table_list:
473 tbl = skytools.fq_name(tbl)
474 if tbl in subscriber_tables:
475 self.subscriber_remove_one_table(tbl)
476 else:
477 self.log.info("Table %s already removed" % tbl)
478
480 dst_db = self.get_database('subscriber_db')
481 dst_curs = dst_db.cursor()
482 list = self.fetch_subscriber_tables(dst_curs)
483 for tbl in table_list:
484 tbl = skytools.fq_name(tbl)
485 tbl_row = None
486 for row in list:
487 if row['table_name'] == tbl:
488 tbl_row = row
489 break
490 if not tbl_row:
491 self.log.warning("Table %s not found" % tbl)
492 elif tbl_row['merge_state'] != 'ok':
493 self.log.warning("Table %s is not in stable state" % tbl)
494 else:
495 self.log.info("Resyncing %s" % tbl)
496 q = "select londiste.subscriber_set_table_state(%s, %s, NULL, NULL)"
497 dst_curs.execute(q, [self.pgq_queue_name, tbl])
498 dst_db.commit()
499
501 q = "select londiste.subscriber_add_table(%s, %s)"
502
503 dst_db = self.get_database('subscriber_db')
504 dst_curs = dst_db.cursor()
505 dst_curs.execute(q, [self.pgq_queue_name, tbl])
506 if self.options.expect_sync:
507 q = "select londiste.subscriber_set_table_state(%s, %s, null, 'ok')"
508 dst_curs.execute(q, [self.pgq_queue_name, tbl])
509 dst_db.commit()
510
512 q = "select londiste.subscriber_remove_table(%s, %s)"
513
514 dst_db = self.get_database('subscriber_db')
515 dst_curs = dst_db.cursor()
516 dst_curs.execute(q, [self.pgq_queue_name, tbl])
517 dst_db.commit()
518
520 dst_db = self.get_database('subscriber_db')
521 dst_curs = dst_db.cursor()
522 q = "SELECT * from londiste.subscriber_get_seq_list(%s)"
523 dst_curs.execute(q, [self.pgq_queue_name])
524 list = dst_curs.fetchall()
525 dst_db.commit()
526 res = []
527 for row in list:
528 res.append(row[0])
529 return res
530
535
537 src_db = self.get_database('provider_db')
538 src_curs = src_db.cursor()
539 dst_db = self.get_database('subscriber_db')
540 dst_curs = dst_db.cursor()
541
542 prov_list = self.get_provider_seqs(src_curs)
543 src_db.commit()
544
545 full_list = self.get_all_seqs(dst_curs)
546 cur_list = self.get_subscriber_seq_list()
547
548 for seq in seq_list:
549 seq = skytools.fq_name(seq)
550 if seq not in prov_list:
551 self.log.error('Seq %s does not exist on provider side' % seq)
552 continue
553 if seq not in full_list:
554 self.log.error('Seq %s does not exist on subscriber side' % seq)
555 continue
556 if seq in cur_list:
557 self.log.info('Seq %s already subscribed' % seq)
558 continue
559
560 self.log.info('Adding sequence: %s' % seq)
561 q = "select londiste.subscriber_add_seq(%s, %s)"
562 dst_curs.execute(q, [self.pgq_queue_name, seq])
563
564 dst_db.commit()
565
567 dst_db = self.get_database('subscriber_db')
568 dst_curs = dst_db.cursor()
569 cur_list = self.get_subscriber_seq_list()
570
571 for seq in seq_list:
572 seq = skytools.fq_name(seq)
573 if seq not in cur_list:
574 self.log.warning('Seq %s not subscribed')
575 else:
576 self.log.info('Removing sequence: %s' % seq)
577 q = "select londiste.subscriber_remove_seq(%s, %s)"
578 dst_curs.execute(q, [self.pgq_queue_name, seq])
579 dst_db.commit()
580