Package skytools :: Module skylog
[frames] | no frames]

Source Code for Module skytools.skylog

  1  """Our log handlers for Python's logging package. 
  2  """ 
  3   
  4  import sys, os, time, socket, psycopg 
  5  import logging, logging.handlers 
  6   
  7  from quoting import quote_json 
  8   
  9  # configurable file logger 
10 -class EasyRotatingFileHandler(logging.handlers.RotatingFileHandler):
11 """Easier setup for RotatingFileHandler."""
12 - def __init__(self, filename, maxBytes = 10*1024*1024, backupCount = 3):
13 """Args same as for RotatingFileHandler, but in filename '~' is expanded.""" 14 fn = os.path.expanduser(filename) 15 logging.handlers.RotatingFileHandler.__init__(self, fn, maxBytes=maxBytes, backupCount=backupCount)
16 17 # send JSON message over UDP
18 -class UdpLogServerHandler(logging.handlers.DatagramHandler):
19 """Sends log records over UDP to logserver in JSON format.""" 20 21 # map logging levels to logserver levels 22 _level_map = { 23 logging.DEBUG : 'DEBUG', 24 logging.INFO : 'INFO', 25 logging.WARNING : 'WARN', 26 logging.ERROR : 'ERROR', 27 logging.CRITICAL: 'FATAL', 28 } 29 30 # JSON message template 31 _log_template = '{\n\t'\ 32 '"logger": "skytools.UdpLogServer",\n\t'\ 33 '"timestamp": %.0f,\n\t'\ 34 '"level": "%s",\n\t'\ 35 '"thread": null,\n\t'\ 36 '"message": %s,\n\t'\ 37 '"properties": {"application":"%s", "hostname":"%s"}\n'\ 38 '}' 39 40 # cut longer msgs 41 MAXMSG = 1024 42
43 - def makePickle(self, record):
44 """Create message in JSON format.""" 45 # get & cut msg 46 msg = self.format(record) 47 if len(msg) > self.MAXMSG: 48 msg = msg[:self.MAXMSG] 49 txt_level = self._level_map.get(record.levelno, "ERROR") 50 pkt = self._log_template % (time.time()*1000, txt_level, 51 quote_json(msg), record.name, socket.gethostname()) 52 return pkt
53
54 -class LogDBHandler(logging.handlers.SocketHandler):
55 """Sends log records into PostgreSQL server. 56 57 Additionally, does some statistics aggregating, 58 to avoid overloading log server. 59 60 It subclasses SocketHandler to get throtthling for 61 failed connections. 62 """ 63 64 # map codes to string 65 _level_map = { 66 logging.DEBUG : 'DEBUG', 67 logging.INFO : 'INFO', 68 logging.WARNING : 'WARNING', 69 logging.ERROR : 'ERROR', 70 logging.CRITICAL: 'FATAL', 71 } 72
73 - def __init__(self, connect_string):
74 """ 75 Initializes the handler with a specific connection string. 76 """ 77 78 logging.handlers.SocketHandler.__init__(self, None, None) 79 self.closeOnError = 1 80 81 self.connect_string = connect_string 82 83 self.stat_cache = {} 84 self.stat_flush_period = 60 85 # send first stat line immidiately 86 self.last_stat_flush = 0
87
88 - def createSocket(self):
89 try: 90 logging.handlers.SocketHandler.createSocket(self) 91 except: 92 self.sock = self.makeSocket()
93
94 - def makeSocket(self):
95 """Create server connection. 96 In this case its not socket but psycopg conection.""" 97 98 db = psycopg.connect(self.connect_string) 99 db.autocommit(1) 100 return db
101
102 - def emit(self, record):
103 """Process log record.""" 104 105 # we do not want log debug messages 106 if record.levelno < logging.INFO: 107 return 108 109 try: 110 self.process_rec(record) 111 except (SystemExit, KeyboardInterrupt): 112 raise 113 except: 114 self.handleError(record)
115
116 - def process_rec(self, record):
117 """Aggregate stats if needed, and send to logdb.""" 118 # render msg 119 msg = self.format(record) 120 121 # dont want to send stats too ofter 122 if record.levelno == logging.INFO and msg and msg[0] == "{": 123 self.aggregate_stats(msg) 124 if time.time() - self.last_stat_flush >= self.stat_flush_period: 125 self.flush_stats(record.name) 126 return 127 128 if record.levelno < logging.INFO: 129 self.flush_stats(record.name) 130 131 # dont send more than one line 132 ln = msg.find('\n') 133 if ln > 0: 134 msg = msg[:ln] 135 136 txt_level = self._level_map.get(record.levelno, "ERROR") 137 self.send_to_logdb(record.name, txt_level, msg)
138
139 - def aggregate_stats(self, msg):
140 """Sum stats together, to lessen load on logdb.""" 141 142 msg = msg[1:-1] 143 for rec in msg.split(", "): 144 k, v = rec.split(": ") 145 agg = self.stat_cache.get(k, 0) 146 if v.find('.') >= 0: 147 agg += float(v) 148 else: 149 agg += int(v) 150 self.stat_cache[k] = agg
151
152 - def flush_stats(self, service):
153 """Send awuired stats to logdb.""" 154 res = [] 155 for k, v in self.stat_cache.items(): 156 res.append("%s: %s" % (k, str(v))) 157 if len(res) > 0: 158 logmsg = "{%s}" % ", ".join(res) 159 self.send_to_logdb(service, "INFO", logmsg) 160 self.stat_cache = {} 161 self.last_stat_flush = time.time()
162
163 - def send_to_logdb(self, service, type, msg):
164 """Actual sending is done here.""" 165 166 if self.sock is None: 167 self.createSocket() 168 169 if self.sock: 170 logcur = self.sock.cursor() 171 query = "select * from log.add(%s, %s, %s)" 172 logcur.execute(query, [type, service, msg])
173