Package skytools :: Module scripting
[frames] | no frames]

Source Code for Module skytools.scripting

  1   
  2  """Useful functions and classes for database scripts.""" 
  3   
  4  import sys, os, signal, psycopg, optparse, traceback, time 
  5  import logging, logging.handlers, logging.config 
  6   
  7  from skytools.config import * 
  8  import skytools.skylog 
  9   
 10  __all__ = ['daemonize', 'run_single_process', 'DBScript', 
 11      'I_AUTOCOMMIT', 'I_READ_COMMITTED', 'I_SERIALIZABLE'] 
 12   
 13  # 
 14  # daemon mode 
 15  # 
 16   
17 -def daemonize():
18 """Turn the process into daemon. 19 20 Goes background and disables all i/o. 21 """ 22 23 # launch new process, kill parent 24 pid = os.fork() 25 if pid != 0: 26 os._exit(0) 27 28 # start new session 29 os.setsid() 30 31 # stop i/o 32 fd = os.open("/dev/null", os.O_RDWR) 33 os.dup2(fd, 0) 34 os.dup2(fd, 1) 35 os.dup2(fd, 2) 36 if fd > 2: 37 os.close(fd)
38 39 # 40 # Pidfile locking+cleanup & daemonization combined 41 # 42
43 -def _write_pidfile(pidfile):
44 pid = os.getpid() 45 f = open(pidfile, 'w') 46 f.write(str(pid)) 47 f.close()
48
49 -def run_single_process(runnable, daemon, pidfile):
50 """Run runnable class, possibly daemonized, locked on pidfile.""" 51 52 # check if another process is running 53 if pidfile and os.path.isfile(pidfile): 54 print "Pidfile exists, another process running?" 55 sys.exit(1) 56 57 # daemonize if needed and write pidfile 58 if daemon: 59 daemonize() 60 if pidfile: 61 _write_pidfile(pidfile) 62 63 # Catch SIGTERM to cleanup pidfile 64 def sigterm_hook(signum, frame): 65 try: 66 os.remove(pidfile) 67 except: pass 68 sys.exit(0)
69 # attach it to signal 70 if pidfile: 71 signal.signal(signal.SIGTERM, sigterm_hook) 72 73 # run 74 try: 75 runnable.run() 76 finally: 77 # another try of cleaning up 78 if pidfile: 79 try: 80 os.remove(pidfile) 81 except: pass 82 83 # 84 # logging setup 85 # 86 87 _log_config_done = 0 88 _log_init_done = {} 89
90 -def _init_log(job_name, cf, log_level):
91 """Logging setup happens here.""" 92 global _log_init_done, _log_config_done 93 94 got_skylog = 0 95 use_skylog = cf.getint("use_skylog", 0) 96 97 # load logging config if needed 98 if use_skylog and not _log_config_done: 99 # python logging.config braindamage: 100 # cannot specify external classess without such hack 101 logging.skylog = skytools.skylog 102 103 # load general config 104 list = ['skylog.ini', '~/.skylog.ini', '/etc/skylog.ini'] 105 for fn in list: 106 fn = os.path.expanduser(fn) 107 if os.path.isfile(fn): 108 defs = {'job_name': job_name} 109 logging.config.fileConfig(fn, defs) 110 got_skylog = 1 111 break 112 _log_config_done = 1 113 if not got_skylog: 114 sys.stderr.write("skylog.ini not found!\n") 115 sys.exit(1) 116 117 # avoid duplicate logging init for job_name 118 log = logging.getLogger(job_name) 119 if job_name in _log_init_done: 120 return log 121 _log_init_done[job_name] = 1 122 123 # compatibility: specify ini file in script config 124 logfile = cf.getfile("logfile", "") 125 if logfile: 126 fmt = logging.Formatter('%(asctime)s %(process)s %(levelname)s %(message)s') 127 size = cf.getint('log_size', 10*1024*1024) 128 num = cf.getint('log_count', 3) 129 hdlr = logging.handlers.RotatingFileHandler( 130 logfile, 'a', size, num) 131 hdlr.setFormatter(fmt) 132 log.addHandler(hdlr) 133 134 # if skylog.ini is disabled or not available, log at least to stderr 135 if not got_skylog: 136 hdlr = logging.StreamHandler() 137 fmt = logging.Formatter('%(asctime)s %(process)s %(levelname)s %(message)s') 138 hdlr.setFormatter(fmt) 139 log.addHandler(hdlr) 140 141 log.setLevel(log_level) 142 143 return log
144 145 #: how old connections need to be closed 146 DEF_CONN_AGE = 20*60 # 20 min 147 148 #: isolation level not set 149 I_DEFAULT = -1 150 151 #: isolation level constant for AUTOCOMMIT 152 I_AUTOCOMMIT = 0 153 #: isolation level constant for READ COMMITTED 154 I_READ_COMMITTED = 1 155 #: isolation level constant for SERIALIZABLE 156 I_SERIALIZABLE = 2 157
158 -class DBCachedConn(object):
159 """Cache a db connection."""
160 - def __init__(self, name, loc, max_age = DEF_CONN_AGE):
161 self.name = name 162 self.loc = loc 163 self.conn = None 164 self.conn_time = 0 165 self.max_age = max_age 166 self.autocommit = -1 167 self.isolation_level = -1
168
169 - def get_connection(self, autocommit = 0, isolation_level = -1):
170 # autocommit overrider isolation_level 171 if autocommit: 172 isolation_level = I_AUTOCOMMIT 173 174 # default isolation_level is READ COMMITTED 175 if isolation_level < 0: 176 isolation_level = I_READ_COMMITTED 177 178 # new conn? 179 if not self.conn: 180 self.isolation_level = isolation_level 181 self.conn = psycopg.connect(self.loc) 182 183 self.conn.set_isolation_level(isolation_level) 184 self.conn_time = time.time() 185 else: 186 if self.isolation_level != isolation_level: 187 raise Exception("Conflict in isolation_level") 188 189 # done 190 return self.conn
191
192 - def refresh(self):
193 if not self.conn: 194 return 195 #for row in self.conn.notifies(): 196 # if row[0].lower() == "reload": 197 # self.reset() 198 # return 199 if not self.max_age: 200 return 201 if time.time() - self.conn_time >= self.max_age: 202 self.reset()
203
204 - def reset(self):
205 if not self.conn: 206 return 207 208 # drop reference 209 conn = self.conn 210 self.conn = None 211 212 if self.isolation_level == I_AUTOCOMMIT: 213 return 214 215 # rollback & close 216 try: 217 conn.rollback() 218 except: pass 219 try: 220 conn.close() 221 except: pass
222
223 -class DBScript(object):
224 """Base class for database scripts. 225 226 Handles logging, daemonizing, config, errors. 227 """ 228 service_name = None 229 job_name = None 230 cf = None 231 log = None 232
233 - def __init__(self, service_name, args):
234 """Script setup. 235 236 User class should override work() and optionally __init__(), startup(), 237 reload(), reset() and init_optparse(). 238 239 NB: in case of daemon, the __init__() and startup()/work() will be 240 run in different processes. So nothing fancy should be done in __init__(). 241 242 @param service_name: unique name for script. 243 It will be also default job_name, if not specified in config. 244 @param args: cmdline args (sys.argv[1:]), but can be overrided 245 """ 246 self.service_name = service_name 247 self.db_cache = {} 248 self.go_daemon = 0 249 self.do_single_loop = 0 250 self.looping = 1 251 self.need_reload = 1 252 self.stat_dict = {} 253 self.log_level = logging.INFO 254 self.work_state = 1 255 256 # parse command line 257 parser = self.init_optparse() 258 self.options, self.args = parser.parse_args(args) 259 260 # check args 261 if self.options.daemon: 262 self.go_daemon = 1 263 if self.options.quiet: 264 self.log_level = logging.WARNING 265 if self.options.verbose: 266 self.log_level = logging.DEBUG 267 if len(self.args) < 1: 268 print "need config file" 269 sys.exit(1) 270 conf_file = self.args[0] 271 272 # load config 273 self.cf = Config(self.service_name, conf_file) 274 self.job_name = self.cf.get("job_name", self.service_name) 275 self.pidfile = self.cf.getfile("pidfile", '') 276 277 self.reload() 278 279 # init logging 280 self.log = _init_log(self.job_name, self.cf, self.log_level) 281 282 # send signal, if needed 283 if self.options.cmd == "kill": 284 self.send_signal(signal.SIGTERM) 285 elif self.options.cmd == "stop": 286 self.send_signal(signal.SIGINT) 287 elif self.options.cmd == "reload": 288 self.send_signal(signal.SIGHUP)
289
290 - def init_optparse(self, parser = None):
291 """Initialize a OptionParser() instance that will be used to 292 parse command line arguments. 293 294 Note that it can be overrided both directions - either DBScript 295 will initialize a instance and passes to user code or user can 296 initialize and then pass to DBScript.init_optparse(). 297 298 @param parser: optional OptionParser() instance, 299 where DBScript should attachs its own arguments. 300 @return: initialized OptionParser() instance. 301 """ 302 if parser: 303 p = parser 304 else: 305 p = optparse.OptionParser() 306 p.set_usage("%prog [options] INI") 307 # generic options 308 p.add_option("-q", "--quiet", action="store_true", 309 help = "make program silent") 310 p.add_option("-v", "--verbose", action="store_true", 311 help = "make program verbose") 312 p.add_option("-d", "--daemon", action="store_true", 313 help = "go background") 314 315 # control options 316 g = optparse.OptionGroup(p, 'control running process') 317 g.add_option("-r", "--reload", 318 action="store_const", const="reload", dest="cmd", 319 help = "reload config (send SIGHUP)") 320 g.add_option("-s", "--stop", 321 action="store_const", const="stop", dest="cmd", 322 help = "stop program safely (send SIGINT)") 323 g.add_option("-k", "--kill", 324 action="store_const", const="kill", dest="cmd", 325 help = "kill program immidiately (send SIGTERM)") 326 p.add_option_group(g) 327 328 return p
329
330 - def send_signal(self, sig):
331 if not self.pidfile: 332 self.log.warning("No pidfile in config, nothing todo") 333 sys.exit(0) 334 if not os.path.isfile(self.pidfile): 335 self.log.warning("No pidfile, process not running") 336 sys.exit(0) 337 pid = int(open(self.pidfile, "r").read()) 338 os.kill(pid, sig) 339 sys.exit(0)
340
341 - def set_single_loop(self, do_single_loop):
342 """Changes whether the script will loop or not.""" 343 self.do_single_loop = do_single_loop
344
345 - def start(self):
346 """This will launch main processing thread.""" 347 if self.go_daemon: 348 if not self.pidfile: 349 self.log.error("Daemon needs pidfile") 350 sys.exit(1) 351 run_single_process(self, self.go_daemon, self.pidfile)
352
353 - def stop(self):
354 """Safely stops processing loop.""" 355 self.looping = 0
356
357 - def reload(self):
358 "Reload config." 359 self.cf.reload() 360 self.loop_delay = self.cf.getfloat("loop_delay", 1.0)
361
362 - def hook_sighup(self, sig, frame):
363 "Internal SIGHUP handler. Minimal code here." 364 self.need_reload = 1
365
366 - def hook_sigint(self, sig, frame):
367 "Internal SIGINT handler. Minimal code here." 368 self.stop()
369
370 - def stat_add(self, key, value):
371 self.stat_put(key, value)
372
373 - def stat_put(self, key, value):
374 """Sets a stat value.""" 375 self.stat_dict[key] = value
376
377 - def stat_increase(self, key, increase = 1):
378 """Increases a stat value.""" 379 if key in self.stat_dict: 380 self.stat_dict[key] += increase 381 else: 382 self.stat_dict[key] = increase
383
384 - def send_stats(self):
385 "Send statistics to log." 386 387 res = [] 388 for k, v in self.stat_dict.items(): 389 res.append("%s: %s" % (k, str(v))) 390 391 if len(res) == 0: 392 return 393 394 logmsg = "{%s}" % ", ".join(res) 395 self.log.info(logmsg) 396 self.stat_dict = {}
397
398 - def get_database(self, dbname, autocommit = 0, isolation_level = -1, 399 cache = None, max_age = DEF_CONN_AGE):
400 """Load cached database connection. 401 402 User must not store it permanently somewhere, 403 as all connections will be invalidated on reset. 404 """ 405 406 if not cache: 407 cache = dbname 408 if cache in self.db_cache: 409 dbc = self.db_cache[cache] 410 else: 411 loc = self.cf.get(dbname) 412 dbc = DBCachedConn(cache, loc, max_age) 413 self.db_cache[cache] = dbc 414 415 return dbc.get_connection(autocommit, isolation_level)
416
417 - def close_database(self, dbname):
418 """Explicitly close a cached connection. 419 420 Next call to get_database() will reconnect. 421 """ 422 if dbname in self.db_cache: 423 dbc = self.db_cache[dbname] 424 dbc.reset()
425
426 - def reset(self):
427 "Something bad happened, reset all connections." 428 for dbc in self.db_cache.values(): 429 dbc.reset() 430 self.db_cache = {}
431
432 - def run(self):
433 "Thread main loop." 434 435 # run startup, safely 436 try: 437 self.startup() 438 except KeyboardInterrupt, det: 439 raise 440 except SystemExit, det: 441 raise 442 except Exception, det: 443 exc, msg, tb = sys.exc_info() 444 self.log.fatal("Job %s crashed: %s: '%s' (%s: %s)" % ( 445 self.job_name, str(exc), str(msg).rstrip(), 446 str(tb), repr(traceback.format_tb(tb)))) 447 del tb 448 self.reset() 449 sys.exit(1) 450 451 while self.looping: 452 # reload config, if needed 453 if self.need_reload: 454 self.reload() 455 self.need_reload = 0 456 457 # do some work 458 work = self.run_once() 459 460 # send stats that was added 461 self.send_stats() 462 463 # reconnect if needed 464 for dbc in self.db_cache.values(): 465 dbc.refresh() 466 467 # exit if needed 468 if self.do_single_loop: 469 self.log.debug("Only single loop requested, exiting") 470 break 471 472 # remember work state 473 self.work_state = work 474 # should sleep? 475 if not work: 476 try: 477 time.sleep(self.loop_delay) 478 except Exception, d: 479 self.log.debug("sleep failed: "+str(d)) 480 sys.exit(0)
481
482 - def run_once(self):
483 "Run users work function, safely." 484 try: 485 return self.work() 486 except SystemExit, d: 487 self.send_stats() 488 self.log.info("got SystemExit(%s), exiting" % str(d)) 489 self.reset() 490 raise d 491 except KeyboardInterrupt, d: 492 self.send_stats() 493 self.log.info("got KeyboardInterrupt, exiting") 494 self.reset() 495 sys.exit(1) 496 except Exception, d: 497 self.send_stats() 498 exc, msg, tb = sys.exc_info() 499 self.log.fatal("Job %s crashed: %s: '%s' (%s: %s)" % ( 500 self.job_name, str(exc), str(msg).rstrip(), 501 str(tb), repr(traceback.format_tb(tb)))) 502 del tb 503 self.reset() 504 if self.looping: 505 time.sleep(20) 506 return 1
507
508 - def work(self):
509 "Here should user's processing happen." 510 raise Exception("Nothing implemented?")
511
512 - def startup(self):
513 """Will be called just before entering main loop. 514 515 In case of daemon, if will be called in same process as work(), 516 unlike __init__(). 517 """ 518 519 # set signals 520 signal.signal(signal.SIGHUP, self.hook_sighup) 521 signal.signal(signal.SIGINT, self.hook_sigint)
522