1 """Our log handlers for Python's logging package.
2 """
3
4 import sys, os, time, socket, psycopg
5 import logging, logging.handlers
6
7 from quoting import quote_json
8
9
11 """Easier setup for RotatingFileHandler."""
12 - def __init__(self, filename, maxBytes = 10*1024*1024, backupCount = 3):
13 """Args same as for RotatingFileHandler, but in filename '~' is expanded."""
14 fn = os.path.expanduser(filename)
15 logging.handlers.RotatingFileHandler.__init__(self, fn, maxBytes=maxBytes, backupCount=backupCount)
16
17
19 """Sends log records over UDP to logserver in JSON format."""
20
21
22 _level_map = {
23 logging.DEBUG : 'DEBUG',
24 logging.INFO : 'INFO',
25 logging.WARNING : 'WARN',
26 logging.ERROR : 'ERROR',
27 logging.CRITICAL: 'FATAL',
28 }
29
30
31 _log_template = '{\n\t'\
32 '"logger": "skytools.UdpLogServer",\n\t'\
33 '"timestamp": %.0f,\n\t'\
34 '"level": "%s",\n\t'\
35 '"thread": null,\n\t'\
36 '"message": %s,\n\t'\
37 '"properties": {"application":"%s", "hostname":"%s"}\n'\
38 '}'
39
40
41 MAXMSG = 1024
42
44 """Create message in JSON format."""
45
46 msg = self.format(record)
47 if len(msg) > self.MAXMSG:
48 msg = msg[:self.MAXMSG]
49 txt_level = self._level_map.get(record.levelno, "ERROR")
50 pkt = self._log_template % (time.time()*1000, txt_level,
51 quote_json(msg), record.name, socket.gethostname())
52 return pkt
53
55 """Sends log records into PostgreSQL server.
56
57 Additionally, does some statistics aggregating,
58 to avoid overloading log server.
59
60 It subclasses SocketHandler to get throtthling for
61 failed connections.
62 """
63
64
65 _level_map = {
66 logging.DEBUG : 'DEBUG',
67 logging.INFO : 'INFO',
68 logging.WARNING : 'WARNING',
69 logging.ERROR : 'ERROR',
70 logging.CRITICAL: 'FATAL',
71 }
72
74 """
75 Initializes the handler with a specific connection string.
76 """
77
78 logging.handlers.SocketHandler.__init__(self, None, None)
79 self.closeOnError = 1
80
81 self.connect_string = connect_string
82
83 self.stat_cache = {}
84 self.stat_flush_period = 60
85
86 self.last_stat_flush = 0
87
93
95 """Create server connection.
96 In this case its not socket but psycopg conection."""
97
98 db = psycopg.connect(self.connect_string)
99 db.autocommit(1)
100 return db
101
102 - def emit(self, record):
103 """Process log record."""
104
105
106 if record.levelno < logging.INFO:
107 return
108
109 try:
110 self.process_rec(record)
111 except (SystemExit, KeyboardInterrupt):
112 raise
113 except:
114 self.handleError(record)
115
117 """Aggregate stats if needed, and send to logdb."""
118
119 msg = self.format(record)
120
121
122 if record.levelno == logging.INFO and msg and msg[0] == "{":
123 self.aggregate_stats(msg)
124 if time.time() - self.last_stat_flush >= self.stat_flush_period:
125 self.flush_stats(record.name)
126 return
127
128 if record.levelno < logging.INFO:
129 self.flush_stats(record.name)
130
131
132 ln = msg.find('\n')
133 if ln > 0:
134 msg = msg[:ln]
135
136 txt_level = self._level_map.get(record.levelno, "ERROR")
137 self.send_to_logdb(record.name, txt_level, msg)
138
140 """Sum stats together, to lessen load on logdb."""
141
142 msg = msg[1:-1]
143 for rec in msg.split(", "):
144 k, v = rec.split(": ")
145 agg = self.stat_cache.get(k, 0)
146 if v.find('.') >= 0:
147 agg += float(v)
148 else:
149 agg += int(v)
150 self.stat_cache[k] = agg
151
153 """Send awuired stats to logdb."""
154 res = []
155 for k, v in self.stat_cache.items():
156 res.append("%s: %s" % (k, str(v)))
157 if len(res) > 0:
158 logmsg = "{%s}" % ", ".join(res)
159 self.send_to_logdb(service, "INFO", logmsg)
160 self.stat_cache = {}
161 self.last_stat_flush = time.time()
162
164 """Actual sending is done here."""
165
166 if self.sock is None:
167 self.createSocket()
168
169 if self.sock:
170 logcur = self.sock.cursor()
171 query = "select * from log.add(%s, %s, %s)"
172 logcur.execute(query, [type, service, msg])
173