Package pgq :: Module ticker
[frames] | no frames]

Source Code for Module pgq.ticker

  1  """PgQ ticker. 
  2   
  3  It will also launch maintenance job. 
  4  """ 
  5   
  6  import sys, os, time, threading 
  7  import skytools 
  8   
  9  from maint import MaintenanceJob 
 10   
 11  __all__ = ['SmartTicker'] 
 12   
13 -def is_txid_sane(curs):
14 curs.execute("select get_current_txid()") 15 txid = curs.fetchone()[0] 16 17 # on 8.2 theres no such table 18 if not skytools.exists_table(curs, 'txid.epoch'): 19 return 1 20 21 curs.execute("select epoch, last_value from txid.epoch") 22 epoch, last_val = curs.fetchone() 23 stored_val = (epoch << 32) | last_val 24 25 if stored_val <= txid: 26 return 1 27 else: 28 return 0
29
30 -class QueueStatus(object):
31 - def __init__(self, name):
32 self.queue_name = name 33 self.seq_name = None 34 self.idle_period = 60 35 self.max_lag = 3 36 self.max_count = 200 37 self.last_tick_time = 0 38 self.last_count = 0 39 self.quiet_count = 0
40
41 - def set_data(self, row):
42 self.seq_name = row['queue_event_seq'] 43 self.idle_period = row['queue_ticker_idle_period'] 44 self.max_lag = row['queue_ticker_max_lag'] 45 self.max_count = row['queue_ticker_max_count']
46
47 - def need_tick(self, cur_count, cur_time):
48 # check if tick is needed 49 need_tick = 0 50 lag = cur_time - self.last_tick_time 51 52 if cur_count == self.last_count: 53 # totally idle database 54 55 # don't go immidiately to big delays, as seq grows before commit 56 if self.quiet_count < 5: 57 if lag >= self.max_lag: 58 need_tick = 1 59 self.quiet_count += 1 60 else: 61 if lag >= self.idle_period: 62 need_tick = 1 63 else: 64 self.quiet_count = 0 65 # somewhat loaded machine 66 if cur_count - self.last_count >= self.max_count: 67 need_tick = 1 68 elif lag >= self.max_lag: 69 need_tick = 1 70 if need_tick: 71 self.last_tick_time = cur_time 72 self.last_count = cur_count 73 return need_tick
74
75 -class SmartTicker(skytools.DBScript):
76 last_tick_event = 0 77 last_tick_time = 0 78 quiet_count = 0 79 tick_count = 0 80 maint_thread = None 81
82 - def __init__(self, args):
83 skytools.DBScript.__init__(self, 'pgqadm', args) 84 85 self.ticker_log_time = 0 86 self.ticker_log_delay = 5*60 87 self.queue_map = {} 88 self.refresh_time = 0
89
90 - def reload(self):
91 skytools.DBScript.reload(self) 92 self.ticker_log_delay = self.cf.getfloat("ticker_log_delay", 5*60)
93
94 - def startup(self):
95 if self.maint_thread: 96 return 97 98 db = self.get_database("db", autocommit = 1) 99 cx = db.cursor() 100 ok = is_txid_sane(cx) 101 if not ok: 102 self.log.error('txid in bad state') 103 sys.exit(1) 104 105 self.maint_thread = MaintenanceJob(self, [self.cf.filename]) 106 t = threading.Thread(name = 'maint_thread', 107 target = self.maint_thread.run) 108 t.setDaemon(1) 109 t.start()
110
111 - def refresh_queues(self, cx):
112 q = "select queue_name, queue_event_seq, queue_ticker_idle_period,"\ 113 " queue_ticker_max_lag, queue_ticker_max_count"\ 114 " from pgq.queue"\ 115 " where not queue_external_ticker" 116 cx.execute(q) 117 new_map = {} 118 data_list = [] 119 from_list = [] 120 for row in cx.dictfetchall(): 121 queue_name = row['queue_name'] 122 try: 123 que = self.queue_map[queue_name] 124 except KeyError, x: 125 que = QueueStatus(queue_name) 126 que.set_data(row) 127 new_map[queue_name] = que 128 129 p1 = "'%s', %s.last_value" % (queue_name, que.seq_name) 130 data_list.append(p1) 131 from_list.append(que.seq_name) 132 133 self.queue_map = new_map 134 self.seq_query = "select %s from %s" % ( 135 ", ".join(data_list), 136 ", ".join(from_list)) 137 138 if len(from_list) == 0: 139 self.seq_query = None 140 141 self.refresh_time = time.time()
142
143 - def work(self):
144 db = self.get_database("db", autocommit = 1) 145 cx = db.cursor() 146 147 cur_time = time.time() 148 149 if cur_time >= self.refresh_time + 30: 150 self.refresh_queues(cx) 151 152 if not self.seq_query: 153 return 154 155 # now check seqs 156 cx.execute(self.seq_query) 157 res = cx.fetchone() 158 pos = 0 159 while pos < len(res): 160 id = res[pos] 161 val = res[pos + 1] 162 pos += 2 163 que = self.queue_map[id] 164 if que.need_tick(val, cur_time): 165 cx.execute("select pgq.ticker(%s)", [que.queue_name]) 166 self.tick_count += 1 167 168 if cur_time > self.ticker_log_time + self.ticker_log_delay: 169 self.ticker_log_time = cur_time 170 self.stat_add('ticks', self.tick_count) 171 self.tick_count = 0
172