Package londiste :: Module syncer
[frames] | no frames]

Source Code for Module londiste.syncer

  1   
  2  """Catch moment when tables are in sync on master and slave. 
  3  """ 
  4   
  5  import sys, time, skytools 
  6   
7 -class Syncer(skytools.DBScript):
8 """Walks tables in primary key order and checks if data matches.""" 9
10 - def __init__(self, args):
11 skytools.DBScript.__init__(self, 'londiste', args) 12 self.set_single_loop(1) 13 14 self.pgq_queue_name = self.cf.get("pgq_queue_name") 15 self.pgq_consumer_id = self.cf.get('pgq_consumer_id', self.job_name) 16 17 if self.pidfile: 18 self.pidfile += ".repair"
19
20 - def init_optparse(self, p=None):
21 p = skytools.DBScript.init_optparse(self, p) 22 p.add_option("--force", action="store_true", help="ignore lag") 23 return p
24
25 - def check_consumer(self, src_db):
26 src_curs = src_db.cursor() 27 28 # before locking anything check if consumer is working ok 29 q = "select extract(epoch from ticker_lag) from pgq.get_queue_list()"\ 30 " where queue_name = %s" 31 src_curs.execute(q, [self.pgq_queue_name]) 32 ticker_lag = src_curs.fetchone()[0] 33 q = "select extract(epoch from lag)"\ 34 " from pgq.get_consumer_list()"\ 35 " where queue_name = %s"\ 36 " and consumer_name = %s" 37 src_curs.execute(q, [self.pgq_queue_name, self.pgq_consumer_id]) 38 res = src_curs.fetchall() 39 src_db.commit() 40 41 if len(res) == 0: 42 self.log.error('No such consumer') 43 sys.exit(1) 44 consumer_lag = res[0][0] 45 46 if consumer_lag > ticker_lag + 10 and not self.options.force: 47 self.log.error('Consumer lagging too much, cannot proceed') 48 sys.exit(1)
49
51 dst_db = self.get_database('subscriber_db') 52 dst_curs = dst_db.cursor() 53 q = "select * from londiste.subscriber_get_table_list(%s)" 54 dst_curs.execute(q, [self.pgq_queue_name]) 55 res = dst_curs.dictfetchall() 56 dst_db.commit() 57 return res
58
59 - def work(self):
60 src_loc = self.cf.get('provider_db') 61 lock_db = self.get_database('provider_db', cache='lock_db') 62 src_db = self.get_database('provider_db') 63 dst_db = self.get_database('subscriber_db') 64 65 self.check_consumer(src_db) 66 67 state_list = self.get_subscriber_table_state() 68 state_map = {} 69 full_list = [] 70 for ts in state_list: 71 name = ts['table_name'] 72 full_list.append(name) 73 state_map[name] = ts 74 75 if len(self.args) > 2: 76 tlist = self.args[2:] 77 else: 78 tlist = full_list 79 80 for tbl in tlist: 81 if not tbl in state_map: 82 self.log.warning('Table not subscribed: %s' % tbl) 83 continue 84 st = state_map[tbl] 85 if st['merge_state'] != 'ok': 86 self.log.info('Table %s not synced yet, no point' % tbl) 87 continue 88 self.check_table(tbl, lock_db, src_db, dst_db) 89 lock_db.commit() 90 src_db.commit() 91 dst_db.commit()
92
93 - def check_table(self, tbl, lock_db, src_db, dst_db):
94 """Get transaction to same state, then process.""" 95 96 97 lock_curs = lock_db.cursor() 98 src_curs = src_db.cursor() 99 dst_curs = dst_db.cursor() 100 101 if not skytools.exists_table(src_curs, tbl): 102 self.log.warning("Table %s does not exist on provider side" % tbl) 103 return 104 if not skytools.exists_table(dst_curs, tbl): 105 self.log.warning("Table %s does not exist on subscriber side" % tbl) 106 return 107 108 # lock table in separate connection 109 self.log.info('Locking %s' % tbl) 110 lock_db.commit() 111 lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % tbl) 112 lock_time = time.time() 113 114 # now wait until consumer has updated target table until locking 115 self.log.info('Syncing %s' % tbl) 116 117 # consumer must get futher than this tick 118 src_curs.execute("select pgq.ticker(%s)", [self.pgq_queue_name]) 119 tick_id = src_curs.fetchone()[0] 120 src_db.commit() 121 # avoid depending on ticker by inserting second tick also 122 time.sleep(0.1) 123 src_curs.execute("select pgq.ticker(%s)", [self.pgq_queue_name]) 124 src_db.commit() 125 src_curs.execute("select to_char(now(), 'YYYY-MM-DD HH24:MI:SS.MS')") 126 tpos = src_curs.fetchone()[0] 127 src_db.commit() 128 # now wait 129 while 1: 130 time.sleep(0.2) 131 132 q = """select now() - lag > %s, now(), lag 133 from pgq.get_consumer_list() 134 where consumer_name = %s 135 and queue_name = %s""" 136 src_curs.execute(q, [tpos, self.pgq_consumer_id, self.pgq_queue_name]) 137 res = src_curs.fetchall() 138 src_db.commit() 139 140 if len(res) == 0: 141 raise Exception('No such consumer') 142 143 row = res[0] 144 self.log.debug("tpos=%s now=%s lag=%s ok=%s" % (tpos, row[1], row[2], row[0])) 145 if row[0]: 146 break 147 148 # loop max 10 secs 149 if time.time() > lock_time + 10 and not self.options.force: 150 self.log.error('Consumer lagging too much, exiting') 151 lock_db.rollback() 152 sys.exit(1) 153 154 # take snapshot on provider side 155 src_curs.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") 156 src_curs.execute("SELECT 1") 157 158 # take snapshot on subscriber side 159 dst_db.commit() 160 dst_curs.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") 161 dst_curs.execute("SELECT 1") 162 163 # release lock 164 lock_db.commit() 165 166 # do work 167 self.process_sync(tbl, src_db, dst_db) 168 169 # done 170 src_db.commit() 171 dst_db.commit()
172
173 - def process_sync(self, tbl, src_db, dst_db):
174 """It gets 2 connections in state where tbl should be in same state. 175 """ 176 raise Exception('process_sync not implemented')
177