Package pgq :: Module maint
[frames] | no frames]

Source Code for Module pgq.maint

 1  """PgQ maintenance functions.""" 
 2   
 3  import skytools, time 
 4   
5 -def get_pgq_api_version(curs):
6 q = "select count(1) from pg_proc p, pg_namespace n"\ 7 " where n.oid = p.pronamespace and n.nspname='pgq'"\ 8 " and p.proname='version';" 9 curs.execute(q) 10 if not curs.fetchone()[0]: 11 return '1.0.0' 12 13 curs.execute("select pgq.version()") 14 return curs.fetchone()[0]
15
16 -def version_ge(curs, want_ver):
17 """Check is db version of pgq is greater than want_ver.""" 18 db_ver = get_pgq_api_version(curs) 19 want_tuple = map(int, want_ver.split('.')) 20 db_tuple = map(int, db_ver.split('.')) 21 if db_tuple[0] != want_tuple[0]: 22 raise Exception('Wrong major version') 23 if db_tuple[1] >= want_tuple[1]: 24 return 1 25 return 0
26
27 -class MaintenanceJob(skytools.DBScript):
28 """Periodic maintenance."""
29 - def __init__(self, ticker, args):
30 skytools.DBScript.__init__(self, 'pgqadm', args) 31 self.ticker = ticker 32 self.last_time = 0 # start immidiately 33 self.last_ticks = 0 34 self.clean_ticks = 1 35 self.maint_delay = 5*60
36
37 - def startup(self):
38 # disable regular DBScript startup() 39 pass
40
41 - def reload(self):
42 skytools.DBScript.reload(self) 43 44 # force loop_delay 45 self.loop_delay = 5 46 47 self.maint_delay = 60 * self.cf.getfloat('maint_delay_min', 5) 48 self.maint_delay = self.cf.getfloat('maint_delay', self.maint_delay)
49
50 - def work(self):
51 t = time.time() 52 if self.last_time + self.maint_delay > t: 53 return 54 55 self.do_maintenance() 56 57 self.last_time = t 58 duration = time.time() - t 59 self.stat_add('maint_duration', duration)
60
61 - def do_maintenance(self):
62 """Helper function for running maintenance.""" 63 64 db = self.get_database('db', autocommit=1) 65 cx = db.cursor() 66 67 if skytools.exists_function(cx, "pgq.maint_rotate_tables_step1", 1): 68 # rotate each queue in own TX 69 q = "select queue_name from pgq.get_queue_info()" 70 cx.execute(q) 71 for row in cx.fetchall(): 72 cx.execute("select pgq.maint_rotate_tables_step1(%s)", [row[0]]) 73 res = cx.fetchone()[0] 74 if res: 75 self.log.info('Rotating %s' % row[0]) 76 else: 77 cx.execute("select pgq.maint_rotate_tables_step1();") 78 79 # finish rotation 80 cx.execute("select pgq.maint_rotate_tables_step2();") 81 82 # move retry events to main queue in small blocks 83 rcount = 0 84 while 1: 85 cx.execute('select pgq.maint_retry_events();') 86 res = cx.fetchone()[0] 87 rcount += res 88 if res == 0: 89 break 90 if rcount: 91 self.log.info('Got %d events for retry' % rcount) 92 93 # vacuum tables that are needed 94 cx.execute('set maintenance_work_mem = 32768') 95 cx.execute('select * from pgq.maint_tables_to_vacuum()') 96 for row in cx.fetchall(): 97 cx.execute('vacuum %s;' % row[0])
98