Package ZenEvents :: Module zenactions
[hide private]
[frames] | no frames]

Source Code for Module ZenEvents.zenactions

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  #! /usr/bin/env python  
 14   
 15  __doc__='''zenactions 
 16   
 17  Turn events into notifications (pages, emails). 
 18   
 19  $Id$ 
 20  ''' 
 21   
 22  __version__ = "$Revision$"[11:-2] 
 23   
 24   
 25  import socket 
 26  import time 
 27  from sets import Set 
 28  import Globals 
 29   
 30  from ZODB.POSException import POSError 
 31  from _mysql_exceptions import OperationalError, ProgrammingError  
 32   
 33  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 34  from Products.ZenUtils.ZenTales import talesCompile, getEngine 
 35  from ZenEventClasses import App_Start, App_Stop, Status_Heartbeat  
 36  from ZenEventClasses import Cmd_Fail 
 37  import Event 
 38  from Schedule import Schedule 
 39  from UpdateCheck import UpdateCheck 
 40  from Products.ZenUtils import Utils 
 41  from twisted.internet import reactor 
 42  from twisted.internet.protocol import ProcessProtocol 
 43  from email.Utils import formatdate 
 44   
 45  DEFAULT_MONITOR = "localhost" 
 46   
47 -def _capitalize(s):
48 return s[0:1].upper() + s[1:]
49
50 -class EventCommandProtocol(ProcessProtocol):
51
52 - def __init__(self, cmd, server):
53 self.cmd = cmd 54 self.server = server 55 self.data = '' 56 self.error = '' 57 self.timeout = reactor.callLater(cmd.defaultTimeout, self.timedOut)
58
59 - def timedOut(self):
60 self.timeout = None 61 self.server.log.error("Command %s timed out" % self.cmd.id) 62 self.server.sendEvent(Event.Event( 63 device=self.server.options.monitor, 64 eventClass=Cmd_Fail, 65 severity=Event.Error, 66 component="zenactions", 67 eventKey=self.cmd.id, 68 summary="Timeout running %s" % (self.cmd.id,), 69 ))
70
71 - def processEnded(self, reason):
72 self.server.log.debug("Command finished: %s" % reason.getErrorMessage()) 73 code = 1 74 try: 75 code = reason.value.exitCode 76 except AttributeError: 77 pass 78 79 # The process has ended. We can cancel the timeout now. 80 if self.timeout: 81 self.timeout.cancel() 82 self.timeout = None 83 84 if code == 0: 85 cmdData = self.data or "<command produced no output>" 86 self.server.log.debug("Command %s says: %s", self.cmd.id, cmdData) 87 self.server.sendEvent(Event.Event( 88 device=self.server.options.monitor, 89 eventClass=Cmd_Fail, 90 severity=Event.Clear, 91 component="zenactions", 92 eventKey=self.cmd.id, 93 summary="Command succeeded: %s: %s" % ( 94 self.cmd.id, cmdData), 95 )) 96 else: 97 cmdError = self.error or "<command produced no output>" 98 self.server.log.error("Command %s says %s", self.cmd.id, cmdError) 99 self.server.sendEvent(Event.Event( 100 device=self.server.options.monitor, 101 eventClass=Cmd_Fail, 102 severity=Event.Error, 103 component="zenactions", 104 eventKey=self.cmd.id, 105 summary="Error running: %s: %s" % ( 106 self.cmd.id, cmdError), 107 ))
108
109 - def outReceived(self, text):
110 self.data += text
111
112 - def errReceived(self, text):
113 self.error += text
114 115
116 -class ZenActions(ZCmdBase):
117 """ 118 Take actions based on events in the event manager. 119 Start off by sending emails and pages. 120 """ 121 122 lastCommand = None 123 124 addstate = ("INSERT INTO alert_state " 125 "VALUES ('%s', '%s', '%s', NULL) " 126 "ON DUPLICATE KEY UPDATE lastSent = now()") 127 128 129 clearstate = ("DELETE FROM alert_state " 130 " WHERE evid='%s' " 131 " AND userid='%s' " 132 " AND rule='%s'") 133 134 #FIXME attempt to convert subquery to left join that doesn't work 135 # newsel = """select %s, evid from status s left join alert_state a 136 # on s.evid=a.evid where a.evid is null and 137 # a.userid='%s' and a.rule='%s'""" 138 139 newsel = ("SELECT %s, evid FROM status WHERE " 140 "%s AND evid NOT IN " 141 " (SELECT evid FROM alert_state " 142 " WHERE userid='%s' AND rule='%s' %s)") 143 144 clearsel = ("SELECT %s, h.evid FROM history h, alert_state a " 145 " WHERE h.evid=a.evid AND a.userid='%s' AND a.rule='%s'") 146 147 clearEventSelect = ("SELECT %s " 148 " FROM history clear, history event " 149 " WHERE clear.evid = event.clearid " 150 " AND event.evid = '%s'") 151 152
153 - def __init__(self):
154 ZCmdBase.__init__(self) 155 self.schedule = Schedule(self.options, self.dmd) 156 self.schedule.sendEvent = self.dmd.ZenEventManager.sendEvent 157 self.schedule.monitor = self.options.monitor 158 159 self.actions = [] 160 self.loadActionRules() 161 self.updateCheck = UpdateCheck() 162 self.sendEvent(Event.Event(device=self.options.monitor, 163 eventClass=App_Start, 164 summary="zenactions started", 165 severity=0, component="zenactions"))
166
167 - def loadActionRules(self):
168 """Load the ActionRules into the system. 169 """ 170 self.actions = [] 171 for ar in self.dmd.ZenUsers.getAllActionRules(): 172 if not ar.enabled: continue 173 userid = ar.getUser().id 174 self.actions.append(ar) 175 self.log.debug("action:%s for:%s loaded", ar.getId(), userid)
176 177
178 - def execute(self, stmt):
179 result = None 180 self.lastCommand = stmt 181 self.log.debug(stmt) 182 zem = self.dmd.ZenEventManager 183 conn = zem.connect() 184 try: 185 curs = conn.cursor() 186 result = curs.execute(stmt) 187 finally: zem.close(conn) 188 return result
189 190
191 - def query(self, stmt):
192 result = None 193 self.lastCommand = stmt 194 self.log.debug(stmt) 195 zem = self.dmd.ZenEventManager 196 conn = zem.connect() 197 try: 198 curs = conn.cursor() 199 curs.execute(stmt) 200 result = curs.fetchall() 201 finally: zem.close(conn) 202 return result
203 204
205 - def getUrl(self, evid):
206 return '%s/zport/dmd/Events/eventFields?evid=%s' % ( 207 self.options.zopeurl, evid)
208 209
210 - def getEventsUrl(self, device):
211 return '%s%s/viewEvents' % ( 212 self.options.zopeurl, device.getPrimaryUrlPath())
213 214
215 - def getAckUrl(self, evid):
216 return '%s/zport/dmd/Events/manage_ackEvents?evids=%s&zenScreenName=viewEvents' % (self.options.zopeurl, evid)
217 218
219 - def getDeleteUrl(self, evid):
220 return '%s/zport/dmd/Events/manage_deleteEvents' % self.options.zopeurl + \ 221 '?evids=%s&zenScreenName=viewHistoryEvents' % evid
222 223
224 - def getUndeleteUrl(self, evid):
225 return '%s/zport/dmd/Events/manage_undeleteEvents' % self.options.zopeurl + \ 226 '?evid=%s&zenScreenName=viewEvents' % evid
227 228
229 - def processRules(self, zem):
230 """Run through all rules matching them against events. 231 """ 232 for ar in self.actions: 233 try: 234 self.lastCommand = None 235 # call sendPage or sendEmail 236 actfunc = getattr(self, "send"+ar.action.title()) 237 self.processEvent(zem, ar, actfunc) 238 except (SystemExit, KeyboardInterrupt, OperationalError, POSError): 239 raise 240 except: 241 if self.lastCommand: 242 self.log.warning(self.lastCommand) 243 self.log.exception("action:%s",ar.getId())
244
245 - def checkVersion(self, zem):
246 self.updateCheck.check(self.dmd, zem) 247 import transaction 248 transaction.commit()
249
250 - def processEvent(self, zem, context, action):
251 fields = context.getEventFields() 252 userid = context.getUserid() 253 # get new events 254 nwhere = context.where.strip() or '1 = 1' 255 if context.delay > 0: 256 nwhere += " and firstTime + %s < UNIX_TIMESTAMP()" % context.delay 257 awhere = '' 258 if context.repeatTime: 259 awhere += ' and DATE_ADD(lastSent, INTERVAL %d SECOND) > now() ' % ( 260 context.repeatTime,) 261 q = self.newsel % (",".join(fields), nwhere, userid, context.getId(), 262 awhere) 263 for result in self.query(q): 264 evid = result[-1] 265 data = dict(zip(fields, map(zem.convert, fields, result[:-1]))) 266 data['eventUrl'] = self.getUrl(evid) 267 device = self.dmd.Devices.findDevice(data.get('device', None)) 268 if device: 269 data['eventsUrl'] = self.getEventsUrl(device) 270 else: 271 data['eventsUrl'] = 'n/a' 272 data['device'] = data.get('device', None) or '' 273 data['ackUrl'] = self.getAckUrl(evid) 274 data['deleteUrl'] = self.getDeleteUrl(evid) 275 severity = data.get('severity', -1) 276 data['severityString'] = zem.getSeverityString(severity) 277 if action(context, data, False): 278 addcmd = self.addstate % (evid, userid, context.getId()) 279 self.execute(addcmd) 280 281 # get clear events 282 historyFields = [("h.%s" % f) for f in fields] 283 historyFields = ','.join(historyFields) 284 q = self.clearsel % (historyFields, userid, context.getId()) 285 for result in self.query(q): 286 evid = result[-1] 287 data = dict(zip(fields, map(zem.convert, fields, result[:-1]))) 288 289 # get clear columns 290 cfields = [('clear.%s' % x) for x in fields] 291 q = self.clearEventSelect % (",".join(cfields), evid) 292 293 # convert clear columns to clear names 294 cfields = [('clear%s' % _capitalize(x)) for x in fields] 295 296 # there might not be a clear event, so set empty defaults 297 data.update({}.fromkeys(cfields, "")) 298 299 # pull in the clear event data 300 for values in self.query(q): 301 values = map(zem.convert, fields, values) 302 data.update(dict(zip(cfields, values))) 303 304 data['clearOrEventSummary'] = ( 305 data['clearSummary'] or data['summary']) 306 307 # add in the link to the url 308 data['eventUrl'] = self.getUrl(evid) 309 severity = data.get('severity', -1) 310 data['severityString'] = zem.getSeverityString(severity) 311 delcmd = self.clearstate % (evid, userid, context.getId()) 312 if getattr(context, 'sendClear', True): 313 if action(context, data, True): 314 self.execute(delcmd) 315 else: 316 self.execute(delcmd)
317 318
319 - def maintenance(self, zem):
320 """Run stored procedures that maintain the events database. 321 """ 322 sql = 'call age_events(%s, %s);' % ( 323 zem.eventAgingHours, zem.eventAgingSeverity) 324 try: 325 self.execute(sql) 326 except ProgrammingError: 327 self.log.exception("problem with proc: '%s'" % sql)
328 329
330 - def deleteHistoricalEvents(self, deferred=False, force=False):
331 """ 332 Once per day delete events from history table. 333 If force then run the deletion statement regardless of when it was 334 last run (the deletion will still not run if the historyMaxAgeDays 335 setting in the event manager is not greater than zero.) 336 If deferred then we are running in a twisted reactor. Run the 337 deletion script in a non-blocking manner (if it is to be run) and 338 return a deferred (if the deletion script is run.) 339 In all cases return None if the deletion script is not run. 340 """ 341 import datetime 342 import os 343 import twisted.internet.utils 344 import Products.ZenUtils.Utils as Utils 345 import transaction 346 import subprocess 347 348 def onSuccess(unused, startTime): 349 self.log.info('Done deleting historical events in %.2f seconds' % 350 (time.time() - startTime)) 351 return None
352 def onError(error, startTime): 353 self.log.error('Error deleting historical events after ' 354 '%s seconds: %s' % (time.time()-startTime, 355 error)) 356 return None
357 358 # d is the return value. It is a deferred if the deferred argument 359 # is true and if we run the deletion script. Otherwise it is None 360 d = None 361 362 # Unless the event manager has a positive number of days for its 363 # historyMaxAgeDays setting then there is never any point in 364 # performing the deletion. 365 try: 366 maxDays = int(self.dmd.ZenEventManager.historyMaxAgeDays) 367 except ValueError: 368 maxDays = 0 369 if maxDays > 0: 370 # lastDeleteHistoricalEvents_datetime is when the deletion 371 # script was last run 372 lastRun = getattr(self.dmd, 373 'lastDeleteHistoricalEvents_datetime', None) 374 # lastDeleteHistoricalEvents_days is the value of historyMaxAgeDays 375 # the last time the deletion script was run. If this value has 376 # changed then we run the script again regardless of when it was 377 # last run. 378 lastAge = getattr(self.dmd, 379 'lastDeleteHistoricalEvents_days', None) 380 now = datetime.datetime.now() 381 if not lastRun \ 382 or now - lastRun > datetime.timedelta(1) \ 383 or lastAge != maxDays \ 384 or force: 385 self.log.info('Deleting historical events older than %s days' % 386 maxDays) 387 startTime = time.time() 388 cmd = Utils.zenPath('Products', 'ZenUtils', 389 'ZenDeleteHistory.py') 390 args = ['--numDays=%s' % maxDays] 391 if deferred: 392 # We're in a twisted reactor, so make a twisty call 393 d = twisted.internet.utils.getProcessOutput( 394 cmd, args, os.environ, errortoo=True) 395 d.addCallback(onSuccess, startTime) 396 d.addErrback(onError, startTime) 397 else: 398 # Not in a reactor, so do this in a blocking manner 399 proc = subprocess.Popen( 400 [cmd]+args, stdout=subprocess.PIPE, 401 stderr=subprocess.STDOUT, env=os.environ) 402 # Trying to mimic how twisted returns results to us 403 # sort of. 404 output, _ = proc.communicate() 405 code = proc.wait() 406 if code: 407 onError(output, startTime) 408 else: 409 onSuccess(output, startTime) 410 # Record circumstances of this run 411 self.dmd.lastDeleteHistoricalEvents_datetime = now 412 self.dmd.lastDeleteHistoricalEvents_days = maxDays 413 transaction.commit() 414 return d 415 416
417 - def heartbeatEvents(self):
418 """Create events for failed heartbeats. 419 """ 420 # build cache of existing heartbeat issues 421 q = ("SELECT device, component " 422 "FROM status WHERE eventClass = '%s'" % Status_Heartbeat) 423 heartbeatState = Set(self.query(q)) 424 425 # find current heartbeat failures 426 sel = "SELECT device, component FROM heartbeat " 427 sel += "WHERE DATE_ADD(lastTime, INTERVAL timeout SECOND) <= NOW();" 428 for device, comp in self.query(sel): 429 self.sendEvent( 430 Event.Event(device=device, component=comp, 431 eventClass=Status_Heartbeat, 432 summary="%s %s heartbeat failure" % (device, comp), 433 severity=Event.Error)) 434 heartbeatState.discard((device, comp)) 435 436 # clear heartbeats 437 for device, comp in heartbeatState: 438 self.sendEvent( 439 Event.Event(device=device, component=comp, 440 eventClass=Status_Heartbeat, 441 summary="%s %s heartbeat clear" % (device, comp), 442 severity=Event.Clear))
443
444 - def runEventCommand(self, cmd, data, clear = None):
445 try: 446 command = cmd.command 447 if clear: 448 command = cmd.clearCommand 449 device = self.dmd.Devices.findDevice(data.get('device', '')) 450 component = None 451 if device: 452 componentName = data.get('component') 453 for c in device.getMonitoredComponents(): 454 if c.id == componentName: 455 component = c 456 break 457 compiled = talesCompile('string:' + command) 458 environ = {'dev':device, 'component':component, 'evt':data } 459 res = compiled(getEngine().getContext(environ)) 460 if isinstance(res, Exception): 461 raise res 462 prot = EventCommandProtocol(cmd, self) 463 self.log.info('Running %s' % res) 464 reactor.spawnProcess(prot, '/bin/sh', 465 ('/bin/sh', '-c', res), 466 env=None) 467 except Exception: 468 self.log.exception('Error running command %s', cmd.id) 469 return True
470 471
472 - def eventCommands(self, zem):
473 now = time.time() 474 count = 0 475 for command in zem.commands(): 476 if command.enabled: 477 count += 1 478 self.processEvent(zem, command, self.runEventCommand) 479 self.log.info("Processed %d commands in %f", count, time.time() - now)
480 481
482 - def mainbody(self):
483 """main loop to run actions. 484 """ 485 from twisted.internet.process import reapAllProcesses 486 reapAllProcesses() 487 zem = self.dmd.ZenEventManager 488 self.loadActionRules() 489 self.eventCommands(zem) 490 self.processRules(zem) 491 self.checkVersion(zem) 492 self.maintenance(zem) 493 self.deleteHistoricalEvents(deferred=self.options.cycle) 494 self.heartbeatEvents()
495 496
497 - def runCycle(self):
498 try: 499 start = time.time() 500 self.syncdb() 501 self.mainbody() 502 self.log.info("processed %s rules in %.2f secs", 503 len(self.actions), time.time()-start) 504 self.sendHeartbeat() 505 except: 506 self.log.exception("unexpected exception") 507 reactor.callLater(self.options.cycletime, self.runCycle)
508 509
510 - def run(self):
511 if not self.options.cycle: 512 self.sendHeartbeat() 513 self.schedule.run() 514 return self.mainbody() 515 self.schedule.start() 516 self.runCycle() 517 reactor.run()
518 519
520 - def sendEvent(self, evt):
521 """Send event to the system. 522 """ 523 self.dmd.ZenEventManager.sendEvent(evt)
524 525
526 - def sendHeartbeat(self):
527 """Send a heartbeat event for this monitor. 528 """ 529 timeout = self.options.cycletime*3 530 evt = Event.EventHeartbeat(self.options.monitor, "zenactions", timeout) 531 self.sendEvent(evt) 532 self.niceDoggie(self.options.cycletime)
533 534
535 - def stop(self):
536 self.running = False 537 self.log.info("stopping") 538 self.sendEvent(Event.Event(device=self.options.monitor, 539 eventClass=App_Stop, 540 summary="zenactions stopped", 541 severity=3, component="zenactions"))
542
543 - def format(self, action, data, clear):
544 fmt = action.format 545 body = action.body 546 if clear: 547 fmt = action.clearFormat 548 body = action.clearBody 549 try: 550 fmt = fmt % data 551 except Exception, ex: 552 fmt = "Error formatting event: %s" % (str(ex),) 553 try: 554 body = body % data 555 except Exception, ex: 556 body = "Error formatting event body: %s" % (str(ex),) 557 return fmt, body
558
559 - def stripTags(self, data):
560 """A quick html => plaintext converter 561 that retains and displays anchor hrefs 562 """ 563 import re 564 tags = re.compile(r'<(.|\n)+?>', re.I|re.M) 565 aattrs = re.compile(r'<a(.|\n)+?href=["\']([^"\']*)[^>]*?>([^<>]*?)</a>', re.I|re.M) 566 anchors = re.finditer(aattrs, data) 567 for x in anchors: data = data.replace(x.group(), "%s: %s" % (x.groups()[2], x.groups()[1])) 568 data = re.sub(tags, '', data) 569 return data
570
571 - def sendPage(self, action, data, clear = None):
572 """Send and event to a pager. Return True if we think page was sent, 573 False otherwise. 574 """ 575 fmt, body = self.format(action, data, clear) 576 recipients = action.getAddresses() 577 if not recipients: 578 self.log.warning('failed to page %s on rule %s: %s', 579 action.getUser().id, action.id, 580 'Unspecified address.') 581 return True 582 583 result = False 584 for recipient in recipients: 585 success, errorMsg = Utils.sendPage(recipient, 586 fmt, 587 self.dmd.pageCommand) 588 if success: 589 self.log.info('sent page to %s: %s', recipient, fmt) 590 # return True if anyone got the page 591 result = result or success 592 else: 593 self.log.info('failed to send page to %s: %s %s', 594 recipient, 595 fmt, 596 errorMsg) 597 return result
598 599 600
601 - def sendEmail(self, action, data, clear = None):
602 """Send an event to an email address. 603 Return True if we think the email was sent, False otherwise. 604 """ 605 from email.MIMEText import MIMEText 606 from email.MIMEMultipart import MIMEMultipart 607 addr = action.getAddresses() 608 if not addr: 609 self.log.warning('failed to email %s on rule %s: %s', 610 action.getUser().id, action.id, 'Unspecified address.') 611 return True 612 613 fmt, htmlbody = self.format(action, data, clear) 614 htmlbody = htmlbody.replace('\n','<br/>\n') 615 body = self.stripTags(htmlbody) 616 plaintext = MIMEText(body) 617 618 emsg = None 619 if action.plainText: 620 emsg = plaintext 621 else: 622 emsg = MIMEMultipart('related') 623 emsgAlternative = MIMEMultipart('alternative') 624 emsg.attach( emsgAlternative ) 625 html = MIMEText(htmlbody) 626 html.set_type('text/html') 627 emsgAlternative.attach(plaintext) 628 emsgAlternative.attach(html) 629 630 emsg['Subject'] = fmt 631 emsg['From'] = self.dmd.getEmailFrom() 632 emsg['To'] = ', '.join(addr) 633 emsg['Date'] = formatdate(None, True) 634 result, errorMsg = Utils.sendEmail(emsg, self.dmd.smtpHost, 635 self.dmd.smtpPort, self.dmd.smtpUseTLS, self.dmd.smtpUser, 636 self.dmd.smtpPass) 637 if result: 638 self.log.info("rule '%s' sent email:%s to:%s", 639 action.id, fmt, addr) 640 else: 641 self.log.info("rule '%s' failed to send email to %s: %s %s", 642 action.id, ','.join(addr), fmt, errorMsg) 643 return result
644 645
646 - def buildOptions(self):
647 ZCmdBase.buildOptions(self) 648 self.parser.add_option('--cycletime', 649 dest='cycletime', default=60, type="int", 650 help="check events every cycletime seconds") 651 self.parser.add_option( 652 '--zopeurl', dest='zopeurl', 653 default='http://%s:%d' % (socket.getfqdn(), 8080), 654 help="http path to the root of the zope server") 655 self.parser.add_option("--monitor", dest="monitor", 656 default=DEFAULT_MONITOR, 657 help="Name of monitor instance to use for heartbeat " 658 " events. Default is %s." % DEFAULT_MONITOR)
659 660
661 - def sigTerm(self, signum=None, frame=None):
662 'controlled shutdown of main loop on interrupt' 663 try: 664 ZCmdBase.sigTerm(self, signum, frame) 665 except SystemExit: 666 reactor.stop()
667 668 if __name__ == "__main__": 669 za = ZenActions() 670 import logging 671 logging.getLogger('zen.Events').setLevel(20) 672 za.run() 673