Package pgq :: Module status
[frames] | no frames]

Source Code for Module pgq.status

 1   
 2  """Status display. 
 3  """ 
 4   
 5  import sys, os, skytools 
 6   
7 -def ival(data, as = None):
8 "Format interval for output" 9 if not as: 10 as = data.split('.')[-1] 11 numfmt = 'FM9999999' 12 expr = "coalesce(to_char(extract(epoch from %s), '%s') || 's', 'NULL') as %s" 13 return expr % (data, numfmt, as)
14
15 -class PGQStatus(skytools.DBScript):
16 - def __init__(self, args, check = 0):
17 skytools.DBScript.__init__(self, 'pgqadm', args) 18 19 self.show_status() 20 21 sys.exit(0)
22
23 - def show_status(self):
24 db = self.get_database("db", autocommit=1) 25 cx = db.cursor() 26 27 cx.execute("show server_version") 28 pgver = cx.fetchone()[0] 29 cx.execute("select pgq.version()") 30 qver = cx.fetchone()[0] 31 print "Postgres version: %s PgQ version: %s" % (pgver, qver) 32 33 q = """select f.queue_name, f.num_tables, %s, %s, %s, 34 q.queue_ticker_max_lag, q.queue_ticker_max_amount, 35 q.queue_ticker_idle_interval 36 from pgq.get_queue_info() f, pgq.queue q 37 where q.queue_name = f.queue_name""" % ( 38 ival('f.rotation_delay'), 39 ival('f.ticker_lag'), 40 ) 41 cx.execute(q) 42 event_rows = cx.dictfetchall() 43 44 q = """select queue_name, consumer_name, %s, %s, %s 45 from pgq.get_consumer_info()""" % ( 46 ival('lag'), 47 ival('last_seen'), 48 ) 49 cx.execute(q) 50 consumer_rows = cx.dictfetchall() 51 52 print "\n%-32s %s %9s %13s %6s" % ('Event queue', 53 'Rotation', 'Ticker', 'TLag') 54 print '-' * 78 55 for ev_row in event_rows: 56 tck = "%s/%ss/%ss" % (ev_row['queue_ticker_max_amount'], 57 ev_row['queue_ticker_max_lag'], 58 ev_row['queue_ticker_idle_interval']) 59 rot = "%s/%s" % (ev_row['queue_ntables'], ev_row['queue_rotation_period']) 60 print "%-39s%7s %9s %13s %6s" % ( 61 ev_row['queue_name'], 62 rot, 63 tck, 64 ev_row['ticker_lag'], 65 ) 66 print '-' * 78 67 print "\n%-42s %9s %9s" % ( 68 'Consumer', 'Lag', 'LastSeen') 69 print '-' * 78 70 for ev_row in event_rows: 71 cons = self.pick_consumers(ev_row, consumer_rows) 72 self.show_queue(ev_row, cons) 73 print '-' * 78 74 db.commit()
75
76 - def show_consumer(self, cons):
77 print " %-48s %9s %9s" % ( 78 cons['consumer_name'], 79 cons['lag'], cons['last_seen'])
80 - def show_queue(self, ev_row, consumer_rows):
81 print "%(queue_name)s:" % ev_row 82 for cons in consumer_rows: 83 self.show_consumer(cons)
84 85
86 - def pick_consumers(self, ev_row, consumer_rows):
87 res = [] 88 for con in consumer_rows: 89 if con['queue_name'] != ev_row['queue_name']: 90 continue 91 res.append(con) 92 return res
93