1
2 """Useful functions and classes for database scripts."""
3
4 import sys, os, signal, psycopg, optparse, traceback, time
5 import logging, logging.handlers, logging.config
6
7 from skytools.config import *
8 import skytools.skylog
9
10 __all__ = ['daemonize', 'run_single_process', 'DBScript',
11 'I_AUTOCOMMIT', 'I_READ_COMMITTED', 'I_SERIALIZABLE']
12
13
14
15
16
18 """Turn the process into daemon.
19
20 Goes background and disables all i/o.
21 """
22
23
24 pid = os.fork()
25 if pid != 0:
26 os._exit(0)
27
28
29 os.setsid()
30
31
32 fd = os.open("/dev/null", os.O_RDWR)
33 os.dup2(fd, 0)
34 os.dup2(fd, 1)
35 os.dup2(fd, 2)
36 if fd > 2:
37 os.close(fd)
38
39
40
41
42
44 pid = os.getpid()
45 f = open(pidfile, 'w')
46 f.write(str(pid))
47 f.close()
48
50 """Run runnable class, possibly daemonized, locked on pidfile."""
51
52
53 if pidfile and os.path.isfile(pidfile):
54 print "Pidfile exists, another process running?"
55 sys.exit(1)
56
57
58 if daemon:
59 daemonize()
60 if pidfile:
61 _write_pidfile(pidfile)
62
63
64 def sigterm_hook(signum, frame):
65 try:
66 os.remove(pidfile)
67 except: pass
68 sys.exit(0)
69
70 if pidfile:
71 signal.signal(signal.SIGTERM, sigterm_hook)
72
73
74 try:
75 runnable.run()
76 finally:
77
78 if pidfile:
79 try:
80 os.remove(pidfile)
81 except: pass
82
83
84
85
86
87 _log_config_done = 0
88 _log_init_done = {}
89
91 """Logging setup happens here."""
92 global _log_init_done, _log_config_done
93
94 got_skylog = 0
95 use_skylog = cf.getint("use_skylog", 0)
96
97
98 if use_skylog and not _log_config_done:
99
100
101 logging.skylog = skytools.skylog
102
103
104 list = ['skylog.ini', '~/.skylog.ini', '/etc/skylog.ini']
105 for fn in list:
106 fn = os.path.expanduser(fn)
107 if os.path.isfile(fn):
108 defs = {'job_name': job_name}
109 logging.config.fileConfig(fn, defs)
110 got_skylog = 1
111 break
112 _log_config_done = 1
113 if not got_skylog:
114 sys.stderr.write("skylog.ini not found!\n")
115 sys.exit(1)
116
117
118 log = logging.getLogger(job_name)
119 if job_name in _log_init_done:
120 return log
121 _log_init_done[job_name] = 1
122
123
124 logfile = cf.getfile("logfile", "")
125 if logfile:
126 fmt = logging.Formatter('%(asctime)s %(process)s %(levelname)s %(message)s')
127 size = cf.getint('log_size', 10*1024*1024)
128 num = cf.getint('log_count', 3)
129 hdlr = logging.handlers.RotatingFileHandler(
130 logfile, 'a', size, num)
131 hdlr.setFormatter(fmt)
132 log.addHandler(hdlr)
133
134
135 if not got_skylog:
136 hdlr = logging.StreamHandler()
137 fmt = logging.Formatter('%(asctime)s %(process)s %(levelname)s %(message)s')
138 hdlr.setFormatter(fmt)
139 log.addHandler(hdlr)
140
141 log.setLevel(log_level)
142
143 return log
144
145
146 DEF_CONN_AGE = 20*60
147
148
149 I_DEFAULT = -1
150
151
152 I_AUTOCOMMIT = 0
153
154 I_READ_COMMITTED = 1
155
156 I_SERIALIZABLE = 2
157
159 """Cache a db connection."""
161 self.name = name
162 self.loc = loc
163 self.conn = None
164 self.conn_time = 0
165 self.max_age = max_age
166 self.autocommit = -1
167 self.isolation_level = -1
168
170
171 if autocommit:
172 isolation_level = I_AUTOCOMMIT
173
174
175 if isolation_level < 0:
176 isolation_level = I_READ_COMMITTED
177
178
179 if not self.conn:
180 self.isolation_level = isolation_level
181 self.conn = psycopg.connect(self.loc)
182
183 self.conn.set_isolation_level(isolation_level)
184 self.conn_time = time.time()
185 else:
186 if self.isolation_level != isolation_level:
187 raise Exception("Conflict in isolation_level")
188
189
190 return self.conn
191
193 if not self.conn:
194 return
195
196
197
198
199 if not self.max_age:
200 return
201 if time.time() - self.conn_time >= self.max_age:
202 self.reset()
203
205 if not self.conn:
206 return
207
208
209 conn = self.conn
210 self.conn = None
211
212 if self.isolation_level == I_AUTOCOMMIT:
213 return
214
215
216 try:
217 conn.rollback()
218 except: pass
219 try:
220 conn.close()
221 except: pass
222
224 """Base class for database scripts.
225
226 Handles logging, daemonizing, config, errors.
227 """
228 service_name = None
229 job_name = None
230 cf = None
231 log = None
232
233 - def __init__(self, service_name, args):
234 """Script setup.
235
236 User class should override work() and optionally __init__(), startup(),
237 reload(), reset() and init_optparse().
238
239 NB: in case of daemon, the __init__() and startup()/work() will be
240 run in different processes. So nothing fancy should be done in __init__().
241
242 @param service_name: unique name for script.
243 It will be also default job_name, if not specified in config.
244 @param args: cmdline args (sys.argv[1:]), but can be overrided
245 """
246 self.service_name = service_name
247 self.db_cache = {}
248 self.go_daemon = 0
249 self.do_single_loop = 0
250 self.looping = 1
251 self.need_reload = 1
252 self.stat_dict = {}
253 self.log_level = logging.INFO
254 self.work_state = 1
255
256
257 parser = self.init_optparse()
258 self.options, self.args = parser.parse_args(args)
259
260
261 if self.options.daemon:
262 self.go_daemon = 1
263 if self.options.quiet:
264 self.log_level = logging.WARNING
265 if self.options.verbose:
266 self.log_level = logging.DEBUG
267 if len(self.args) < 1:
268 print "need config file"
269 sys.exit(1)
270 conf_file = self.args[0]
271
272
273 self.cf = Config(self.service_name, conf_file)
274 self.job_name = self.cf.get("job_name", self.service_name)
275 self.pidfile = self.cf.getfile("pidfile", '')
276
277 self.reload()
278
279
280 self.log = _init_log(self.job_name, self.cf, self.log_level)
281
282
283 if self.options.cmd == "kill":
284 self.send_signal(signal.SIGTERM)
285 elif self.options.cmd == "stop":
286 self.send_signal(signal.SIGINT)
287 elif self.options.cmd == "reload":
288 self.send_signal(signal.SIGHUP)
289
291 """Initialize a OptionParser() instance that will be used to
292 parse command line arguments.
293
294 Note that it can be overrided both directions - either DBScript
295 will initialize a instance and passes to user code or user can
296 initialize and then pass to DBScript.init_optparse().
297
298 @param parser: optional OptionParser() instance,
299 where DBScript should attachs its own arguments.
300 @return: initialized OptionParser() instance.
301 """
302 if parser:
303 p = parser
304 else:
305 p = optparse.OptionParser()
306 p.set_usage("%prog [options] INI")
307
308 p.add_option("-q", "--quiet", action="store_true",
309 help = "make program silent")
310 p.add_option("-v", "--verbose", action="store_true",
311 help = "make program verbose")
312 p.add_option("-d", "--daemon", action="store_true",
313 help = "go background")
314
315
316 g = optparse.OptionGroup(p, 'control running process')
317 g.add_option("-r", "--reload",
318 action="store_const", const="reload", dest="cmd",
319 help = "reload config (send SIGHUP)")
320 g.add_option("-s", "--stop",
321 action="store_const", const="stop", dest="cmd",
322 help = "stop program safely (send SIGINT)")
323 g.add_option("-k", "--kill",
324 action="store_const", const="kill", dest="cmd",
325 help = "kill program immidiately (send SIGTERM)")
326 p.add_option_group(g)
327
328 return p
329
331 if not self.pidfile:
332 self.log.warning("No pidfile in config, nothing todo")
333 sys.exit(0)
334 if not os.path.isfile(self.pidfile):
335 self.log.warning("No pidfile, process not running")
336 sys.exit(0)
337 pid = int(open(self.pidfile, "r").read())
338 os.kill(pid, sig)
339 sys.exit(0)
340
342 """Changes whether the script will loop or not."""
343 self.do_single_loop = do_single_loop
344
346 """This will launch main processing thread."""
347 if self.go_daemon:
348 if not self.pidfile:
349 self.log.error("Daemon needs pidfile")
350 sys.exit(1)
351 run_single_process(self, self.go_daemon, self.pidfile)
352
354 """Safely stops processing loop."""
355 self.looping = 0
356
358 "Reload config."
359 self.cf.reload()
360 self.loop_delay = self.cf.getfloat("loop_delay", 1.0)
361
363 "Internal SIGHUP handler. Minimal code here."
364 self.need_reload = 1
365
367 "Internal SIGINT handler. Minimal code here."
368 self.stop()
369
372
374 """Sets a stat value."""
375 self.stat_dict[key] = value
376
378 """Increases a stat value."""
379 if key in self.stat_dict:
380 self.stat_dict[key] += increase
381 else:
382 self.stat_dict[key] = increase
383
385 "Send statistics to log."
386
387 res = []
388 for k, v in self.stat_dict.items():
389 res.append("%s: %s" % (k, str(v)))
390
391 if len(res) == 0:
392 return
393
394 logmsg = "{%s}" % ", ".join(res)
395 self.log.info(logmsg)
396 self.stat_dict = {}
397
400 """Load cached database connection.
401
402 User must not store it permanently somewhere,
403 as all connections will be invalidated on reset.
404 """
405
406 if not cache:
407 cache = dbname
408 if cache in self.db_cache:
409 dbc = self.db_cache[cache]
410 else:
411 loc = self.cf.get(dbname)
412 dbc = DBCachedConn(cache, loc, max_age)
413 self.db_cache[cache] = dbc
414
415 return dbc.get_connection(autocommit, isolation_level)
416
418 """Explicitly close a cached connection.
419
420 Next call to get_database() will reconnect.
421 """
422 if dbname in self.db_cache:
423 dbc = self.db_cache[dbname]
424 dbc.reset()
425
427 "Something bad happened, reset all connections."
428 for dbc in self.db_cache.values():
429 dbc.reset()
430 self.db_cache = {}
431
433 "Thread main loop."
434
435
436 try:
437 self.startup()
438 except KeyboardInterrupt, det:
439 raise
440 except SystemExit, det:
441 raise
442 except Exception, det:
443 exc, msg, tb = sys.exc_info()
444 self.log.fatal("Job %s crashed: %s: '%s' (%s: %s)" % (
445 self.job_name, str(exc), str(msg).rstrip(),
446 str(tb), repr(traceback.format_tb(tb))))
447 del tb
448 self.reset()
449 sys.exit(1)
450
451 while self.looping:
452
453 if self.need_reload:
454 self.reload()
455 self.need_reload = 0
456
457
458 work = self.run_once()
459
460
461 self.send_stats()
462
463
464 for dbc in self.db_cache.values():
465 dbc.refresh()
466
467
468 if self.do_single_loop:
469 self.log.debug("Only single loop requested, exiting")
470 break
471
472
473 self.work_state = work
474
475 if not work:
476 try:
477 time.sleep(self.loop_delay)
478 except Exception, d:
479 self.log.debug("sleep failed: "+str(d))
480 sys.exit(0)
481
483 "Run users work function, safely."
484 try:
485 return self.work()
486 except SystemExit, d:
487 self.send_stats()
488 self.log.info("got SystemExit(%s), exiting" % str(d))
489 self.reset()
490 raise d
491 except KeyboardInterrupt, d:
492 self.send_stats()
493 self.log.info("got KeyboardInterrupt, exiting")
494 self.reset()
495 sys.exit(1)
496 except Exception, d:
497 self.send_stats()
498 exc, msg, tb = sys.exc_info()
499 self.log.fatal("Job %s crashed: %s: '%s' (%s: %s)" % (
500 self.job_name, str(exc), str(msg).rstrip(),
501 str(tb), repr(traceback.format_tb(tb))))
502 del tb
503 self.reset()
504 if self.looping:
505 time.sleep(20)
506 return 1
507
509 "Here should user's processing happen."
510 raise Exception("Nothing implemented?")
511
513 """Will be called just before entering main loop.
514
515 In case of daemon, if will be called in same process as work(),
516 unlike __init__().
517 """
518
519
520 signal.signal(signal.SIGHUP, self.hook_sighup)
521 signal.signal(signal.SIGINT, self.hook_sigint)
522