1 """PgQ ticker.
2
3 It will also launch maintenance job.
4 """
5
6 import sys, os, time, threading
7 import skytools
8
9 from maint import MaintenanceJob
10
11 __all__ = ['SmartTicker']
12
14 curs.execute("select get_current_txid()")
15 txid = curs.fetchone()[0]
16
17
18 if not skytools.exists_table(curs, 'txid.epoch'):
19 return 1
20
21 curs.execute("select epoch, last_value from txid.epoch")
22 epoch, last_val = curs.fetchone()
23 stored_val = (epoch << 32) | last_val
24
25 if stored_val <= txid:
26 return 1
27 else:
28 return 0
29
32 self.queue_name = name
33 self.seq_name = None
34 self.idle_period = 60
35 self.max_lag = 3
36 self.max_count = 200
37 self.last_tick_time = 0
38 self.last_count = 0
39 self.quiet_count = 0
40
42 self.seq_name = row['queue_event_seq']
43 self.idle_period = row['queue_ticker_idle_period']
44 self.max_lag = row['queue_ticker_max_lag']
45 self.max_count = row['queue_ticker_max_count']
46
48
49 need_tick = 0
50 lag = cur_time - self.last_tick_time
51
52 if cur_count == self.last_count:
53
54
55
56 if self.quiet_count < 5:
57 if lag >= self.max_lag:
58 need_tick = 1
59 self.quiet_count += 1
60 else:
61 if lag >= self.idle_period:
62 need_tick = 1
63 else:
64 self.quiet_count = 0
65
66 if cur_count - self.last_count >= self.max_count:
67 need_tick = 1
68 elif lag >= self.max_lag:
69 need_tick = 1
70 if need_tick:
71 self.last_tick_time = cur_time
72 self.last_count = cur_count
73 return need_tick
74
76 last_tick_event = 0
77 last_tick_time = 0
78 quiet_count = 0
79 tick_count = 0
80 maint_thread = None
81
83 skytools.DBScript.__init__(self, 'pgqadm', args)
84
85 self.ticker_log_time = 0
86 self.ticker_log_delay = 5*60
87 self.queue_map = {}
88 self.refresh_time = 0
89
93
110
112 q = "select queue_name, queue_event_seq, queue_ticker_idle_period,"\
113 " queue_ticker_max_lag, queue_ticker_max_count"\
114 " from pgq.queue"\
115 " where not queue_external_ticker"
116 cx.execute(q)
117 new_map = {}
118 data_list = []
119 from_list = []
120 for row in cx.dictfetchall():
121 queue_name = row['queue_name']
122 try:
123 que = self.queue_map[queue_name]
124 except KeyError, x:
125 que = QueueStatus(queue_name)
126 que.set_data(row)
127 new_map[queue_name] = que
128
129 p1 = "'%s', %s.last_value" % (queue_name, que.seq_name)
130 data_list.append(p1)
131 from_list.append(que.seq_name)
132
133 self.queue_map = new_map
134 self.seq_query = "select %s from %s" % (
135 ", ".join(data_list),
136 ", ".join(from_list))
137
138 if len(from_list) == 0:
139 self.seq_query = None
140
141 self.refresh_time = time.time()
142
144 db = self.get_database("db", autocommit = 1)
145 cx = db.cursor()
146
147 cur_time = time.time()
148
149 if cur_time >= self.refresh_time + 30:
150 self.refresh_queues(cx)
151
152 if not self.seq_query:
153 return
154
155
156 cx.execute(self.seq_query)
157 res = cx.fetchone()
158 pos = 0
159 while pos < len(res):
160 id = res[pos]
161 val = res[pos + 1]
162 pos += 2
163 que = self.queue_map[id]
164 if que.need_tick(val, cur_time):
165 cx.execute("select pgq.ticker(%s)", [que.queue_name])
166 self.tick_count += 1
167
168 if cur_time > self.ticker_log_time + self.ticker_log_delay:
169 self.ticker_log_time = cur_time
170 self.stat_add('ticks', self.tick_count)
171 self.tick_count = 0
172