1 """PgQ maintenance functions."""
2
3 import skytools, time
4
6 q = "select count(1) from pg_proc p, pg_namespace n"\
7 " where n.oid = p.pronamespace and n.nspname='pgq'"\
8 " and p.proname='version';"
9 curs.execute(q)
10 if not curs.fetchone()[0]:
11 return '1.0.0'
12
13 curs.execute("select pgq.version()")
14 return curs.fetchone()[0]
15
17 """Check is db version of pgq is greater than want_ver."""
18 db_ver = get_pgq_api_version(curs)
19 want_tuple = map(int, want_ver.split('.'))
20 db_tuple = map(int, db_ver.split('.'))
21 if db_tuple[0] != want_tuple[0]:
22 raise Exception('Wrong major version')
23 if db_tuple[1] >= want_tuple[1]:
24 return 1
25 return 0
26
27 -class MaintenanceJob(skytools.DBScript):
28 """Periodic maintenance."""
29 - def __init__(self, ticker, args):
30 skytools.DBScript.__init__(self, 'pgqadm', args)
31 self.ticker = ticker
32 self.last_time = 0
33 self.last_ticks = 0
34 self.clean_ticks = 1
35 self.maint_delay = 5*60
36
40
42 skytools.DBScript.reload(self)
43
44
45 self.loop_delay = 5
46
47 self.maint_delay = 60 * self.cf.getfloat('maint_delay_min', 5)
48 self.maint_delay = self.cf.getfloat('maint_delay', self.maint_delay)
49
51 t = time.time()
52 if self.last_time + self.maint_delay > t:
53 return
54
55 self.do_maintenance()
56
57 self.last_time = t
58 duration = time.time() - t
59 self.stat_add('maint_duration', duration)
60
61 - def do_maintenance(self):
62 """Helper function for running maintenance."""
63
64 db = self.get_database('db', autocommit=1)
65 cx = db.cursor()
66
67 if skytools.exists_function(cx, "pgq.maint_rotate_tables_step1", 1):
68
69 q = "select queue_name from pgq.get_queue_info()"
70 cx.execute(q)
71 for row in cx.fetchall():
72 cx.execute("select pgq.maint_rotate_tables_step1(%s)", [row[0]])
73 res = cx.fetchone()[0]
74 if res:
75 self.log.info('Rotating %s' % row[0])
76 else:
77 cx.execute("select pgq.maint_rotate_tables_step1();")
78
79
80 cx.execute("select pgq.maint_rotate_tables_step2();")
81
82
83 rcount = 0
84 while 1:
85 cx.execute('select pgq.maint_retry_events();')
86 res = cx.fetchone()[0]
87 rcount += res
88 if res == 0:
89 break
90 if rcount:
91 self.log.info('Got %d events for retry' % rcount)
92
93
94 cx.execute('set maintenance_work_mem = 32768')
95 cx.execute('select * from pgq.maint_tables_to_vacuum()')
96 for row in cx.fetchall():
97 cx.execute('vacuum %s;' % row[0])
98