Package ZenEvents :: Module zenactions
[hide private]
[frames] | no frames]

Source Code for Module ZenEvents.zenactions

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  #! /usr/bin/env python  
 14   
 15  __doc__='''zenactions 
 16   
 17  Turn events into notifications (pages, emails). 
 18   
 19  $Id$ 
 20  ''' 
 21   
 22  __version__ = "$Revision$"[11:-2] 
 23   
 24   
 25  import socket 
 26  import time 
 27  from sets import Set 
 28  import Globals 
 29   
 30  from ZODB.POSException import POSError 
 31  from _mysql_exceptions import OperationalError, ProgrammingError  
 32   
 33  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 34  from Products.ZenUtils.ZenTales import talesCompile, getEngine 
 35  from ZenEventClasses import App_Start, App_Stop, Status_Heartbeat  
 36  from ZenEventClasses import Cmd_Fail 
 37  import Event 
 38  from Schedule import Schedule 
 39  from UpdateCheck import UpdateCheck 
 40  from Products.ZenUtils import Utils 
 41  from twisted.internet import reactor 
 42  from twisted.internet.protocol import ProcessProtocol 
 43  from email.Utils import formatdate 
 44   
 45  DEFAULT_MONITOR = "localhost" 
 46   
47 -def _capitalize(s):
48 return s[0:1].upper() + s[1:]
49
50 -class EventCommandProtocol(ProcessProtocol):
51
52 - def __init__(self, cmd, server):
53 self.cmd = cmd 54 self.server = server 55 self.data = '' 56 self.error = '' 57 self.timeout = reactor.callLater(cmd.defaultTimeout, self.timedOut)
58
59 - def timedOut(self):
60 self.timeout = None 61 self.server.log.error("Command %s timed out" % self.cmd.id) 62 self.server.sendEvent(Event.Event( 63 device=self.server.options.monitor, 64 eventClass=Cmd_Fail, 65 severity=Event.Error, 66 component="zenactions", 67 eventKey=self.cmd.id, 68 summary="Timeout running %s" % (self.cmd.id,), 69 ))
70
71 - def processEnded(self, reason):
72 self.server.log.debug("Command finished: %s" % reason.getErrorMessage()) 73 code = 1 74 try: 75 code = reason.value.exitCode 76 except AttributeError: 77 pass 78 79 # The process has ended. We can cancel the timeout now. 80 if self.timeout: 81 self.timeout.cancel() 82 self.timeout = None 83 84 if code == 0: 85 cmdData = self.data or "<command produced no output>" 86 self.server.log.debug("Command %s says: %s", self.cmd.id, cmdData) 87 self.server.sendEvent(Event.Event( 88 device=self.server.options.monitor, 89 eventClass=Cmd_Fail, 90 severity=Event.Clear, 91 component="zenactions", 92 eventKey=self.cmd.id, 93 summary="Command succeeded: %s: %s" % ( 94 self.cmd.id, cmdData), 95 )) 96 else: 97 cmdError = self.error or "<command produced no output>" 98 self.server.log.error("Command %s says %s", self.cmd.id, cmdError) 99 self.server.sendEvent(Event.Event( 100 device=self.server.options.monitor, 101 eventClass=Cmd_Fail, 102 severity=Event.Error, 103 component="zenactions", 104 eventKey=self.cmd.id, 105 summary="Error running: %s: %s" % ( 106 self.cmd.id, cmdError), 107 ))
108
109 - def outReceived(self, text):
110 self.data += text
111
112 - def errReceived(self, text):
113 self.error += text
114 115
116 -class ZenActions(ZCmdBase):
117 """ 118 Take actions based on events in the event manager. 119 Start off by sending emails and pages. 120 """ 121 122 lastCommand = None 123 124 addstate = ("INSERT INTO alert_state " 125 "VALUES ('%s', '%s', '%s', NULL) " 126 "ON DUPLICATE KEY UPDATE lastSent = now()") 127 128 129 clearstate = ("DELETE FROM alert_state " 130 " WHERE evid='%s' " 131 " AND userid='%s' " 132 " AND rule='%s'") 133 134 #FIXME attempt to convert subquery to left join that doesn't work 135 # newsel = """select %s, evid from status s left join alert_state a 136 # on s.evid=a.evid where a.evid is null and 137 # a.userid='%s' and a.rule='%s'""" 138 139 newsel = ("SELECT %s, evid FROM status WHERE " 140 "%s AND evid NOT IN " 141 " (SELECT evid FROM alert_state " 142 " WHERE userid='%s' AND rule='%s' %s)") 143 144 clearsel = ("SELECT %s, h.evid FROM history h, alert_state a " 145 " WHERE h.evid=a.evid AND a.userid='%s' AND a.rule='%s'") 146 147 clearEventSelect = ("SELECT %s " 148 " FROM history clear, history event " 149 " WHERE clear.evid = event.clearid " 150 " AND event.evid = '%s'") 151 152
153 - def __init__(self):
154 ZCmdBase.__init__(self) 155 self.schedule = Schedule(self.options, self.dmd) 156 self.schedule.sendEvent = self.dmd.ZenEventManager.sendEvent 157 self.schedule.monitor = self.options.monitor 158 159 self.actions = [] 160 self.loadActionRules() 161 self.updateCheck = UpdateCheck() 162 self.sendEvent(Event.Event(device=self.options.monitor, 163 eventClass=App_Start, 164 summary="zenactions started", 165 severity=0, component="zenactions"))
166
167 - def loadActionRules(self):
168 """Load the ActionRules into the system. 169 """ 170 self.actions = [] 171 for ar in self.dmd.ZenUsers.getAllActionRules(): 172 if not ar.enabled: continue 173 userid = ar.getUser().id 174 self.actions.append(ar) 175 self.log.debug("action:%s for:%s loaded", ar.getId(), userid)
176 177
178 - def execute(self, stmt):
179 result = None 180 self.lastCommand = stmt 181 self.log.debug(stmt) 182 zem = self.dmd.ZenEventManager 183 conn = zem.connect() 184 try: 185 curs = conn.cursor() 186 result = curs.execute(stmt) 187 finally: zem.close(conn) 188 return result
189 190
191 - def query(self, stmt):
192 result = None 193 self.lastCommand = stmt 194 self.log.debug(stmt) 195 zem = self.dmd.ZenEventManager 196 conn = zem.connect() 197 try: 198 curs = conn.cursor() 199 curs.execute(stmt) 200 result = curs.fetchall() 201 finally: zem.close(conn) 202 return result
203 204
205 - def getBaseUrl(self, device=None):
206 url = self.options.zopeurl 207 if device: 208 return "%s%s" % (url, device.getPrimaryUrlPath()) 209 else: 210 return "%s/zport/dmd/Events" % (url)
211 212
213 - def getEventUrl(self, evid, device=None):
214 return "%s/eventFields?evid=%s" % (self.getBaseUrl(device), evid)
215 216
217 - def getEventsUrl(self, device=None):
218 return "%s/viewEvents" % self.getBaseUrl(device)
219 220
221 - def getAckUrl(self, evid, device=None):
222 return "%s/manage_ackEvents?evids=%s&zenScreenName=viewEvents" % ( 223 self.getBaseUrl(device), evid)
224 225
226 - def getDeleteUrl(self, evid, device=None):
227 return "%s/manage_deleteEvents?evids=%s" % ( 228 self.getBaseUrl(device), evid) + \ 229 "&zenScreenName=viewHistoryEvents"
230 231
232 - def getUndeleteUrl(self, evid, device=None):
233 return "%s/manage_undeleteEvents?evids=%s" % ( 234 self.getBaseUrl(device), evid) + \ 235 "&zenScreenName=viewEvents"
236 237
238 - def processRules(self, zem):
239 """Run through all rules matching them against events. 240 """ 241 for ar in self.actions: 242 try: 243 self.lastCommand = None 244 # call sendPage or sendEmail 245 actfunc = getattr(self, "send"+ar.action.title()) 246 self.processEvent(zem, ar, actfunc) 247 except (SystemExit, KeyboardInterrupt, OperationalError, POSError): 248 raise 249 except: 250 if self.lastCommand: 251 self.log.warning(self.lastCommand) 252 self.log.exception("action:%s",ar.getId())
253
254 - def checkVersion(self, zem):
255 self.updateCheck.check(self.dmd, zem) 256 import transaction 257 transaction.commit()
258
259 - def processEvent(self, zem, context, action):
260 fields = context.getEventFields() 261 userid = context.getUserid() 262 # get new events 263 nwhere = context.where.strip() or '1 = 1' 264 if context.delay > 0: 265 nwhere += " and firstTime + %s < UNIX_TIMESTAMP()" % context.delay 266 awhere = '' 267 if context.repeatTime: 268 awhere += ' and DATE_ADD(lastSent, INTERVAL %d SECOND) > now() ' % ( 269 context.repeatTime,) 270 q = self.newsel % (",".join(fields), nwhere, userid, context.getId(), 271 awhere) 272 for result in self.query(q): 273 evid = result[-1] 274 data = dict(zip(fields, map(zem.convert, fields, result[:-1]))) 275 276 # Make details available to event commands. zem.getEventDetail 277 # uses the status table (which is where this event came from 278 try: 279 details = dict( zem.getEventDetail(evid).getEventDetails() ) 280 data.update( details ) 281 except ZenEventNotFound: 282 pass 283 284 device = self.dmd.Devices.findDevice(data.get('device', None)) 285 data['eventUrl'] = self.getEventUrl(evid, device) 286 if device: 287 data['eventsUrl'] = self.getEventsUrl(device) 288 else: 289 data['eventsUrl'] = 'n/a' 290 data['device'] = data.get('device', None) or '' 291 data['ackUrl'] = self.getAckUrl(evid, device) 292 data['deleteUrl'] = self.getDeleteUrl(evid, device) 293 severity = data.get('severity', -1) 294 data['severityString'] = zem.getSeverityString(severity) 295 if action(context, data, False): 296 addcmd = self.addstate % (evid, userid, context.getId()) 297 self.execute(addcmd) 298 299 # get clear events 300 historyFields = [("h.%s" % f) for f in fields] 301 historyFields = ','.join(historyFields) 302 q = self.clearsel % (historyFields, userid, context.getId()) 303 for result in self.query(q): 304 evid = result[-1] 305 data = dict(zip(fields, map(zem.convert, fields, result[:-1]))) 306 307 # For clear events we are using the history table, so get the event details 308 # using the history table. 309 try: 310 details = dict( zem.getEventDetailFromStatusOrHistory(evid).getEventDetails() ) 311 data.update( details ) 312 except ZenEventNotFound: 313 pass 314 315 # get clear columns 316 cfields = [('clear.%s' % x) for x in fields] 317 q = self.clearEventSelect % (",".join(cfields), evid) 318 319 # convert clear columns to clear names 320 cfields = [('clear%s' % _capitalize(x)) for x in fields] 321 322 # there might not be a clear event, so set empty defaults 323 data.update({}.fromkeys(cfields, "")) 324 325 # pull in the clear event data 326 for values in self.query(q): 327 values = map(zem.convert, fields, values) 328 data.update(dict(zip(cfields, values))) 329 330 self.log.debug( "Executing clear action with data %s", data ) 331 332 # If our event has a clearid, but we have no clear data it means 333 # that we're in a small delay before it is inserted. We'll wait 334 # until next time to deal with the clear. 335 if data.get('clearid', None) and not data.get('clearEvid', None): 336 continue 337 338 data['clearOrEventSummary'] = ( 339 data['clearSummary'] or data['summary']) 340 341 # add in the link to the url 342 device = self.dmd.Devices.findDevice(data.get('device', None)) 343 data['eventUrl'] = self.getEventUrl(evid, device) 344 data['undeleteUrl'] = self.getUndeleteUrl(evid, device) 345 severity = data.get('severity', -1) 346 data['severityString'] = zem.getSeverityString(severity) 347 delcmd = self.clearstate % (evid, userid, context.getId()) 348 if getattr(context, 'sendClear', True): 349 if action(context, data, True): 350 self.execute(delcmd) 351 else: 352 self.execute(delcmd)
353 354
355 - def maintenance(self, zem):
356 """Run stored procedures that maintain the events database. 357 """ 358 sql = 'call age_events(%s, %s);' % ( 359 zem.eventAgingHours, zem.eventAgingSeverity) 360 try: 361 self.execute(sql) 362 except ProgrammingError: 363 self.log.exception("problem with proc: '%s'" % sql)
364 365
366 - def deleteHistoricalEvents(self, deferred=False, force=False):
367 """ 368 Once per day delete events from history table. 369 If force then run the deletion statement regardless of when it was 370 last run (the deletion will still not run if the historyMaxAgeDays 371 setting in the event manager is not greater than zero.) 372 If deferred then we are running in a twisted reactor. Run the 373 deletion script in a non-blocking manner (if it is to be run) and 374 return a deferred (if the deletion script is run.) 375 In all cases return None if the deletion script is not run. 376 """ 377 import datetime 378 import os 379 import twisted.internet.utils 380 import Products.ZenUtils.Utils as Utils 381 import transaction 382 import subprocess 383 384 def onSuccess(unused, startTime): 385 self.log.info('Done deleting historical events in %.2f seconds' % 386 (time.time() - startTime)) 387 return None
388 def onError(error, startTime): 389 self.log.error('Error deleting historical events after ' 390 '%s seconds: %s' % (time.time()-startTime, 391 error)) 392 return None
393 394 # d is the return value. It is a deferred if the deferred argument 395 # is true and if we run the deletion script. Otherwise it is None 396 d = None 397 398 # Unless the event manager has a positive number of days for its 399 # historyMaxAgeDays setting then there is never any point in 400 # performing the deletion. 401 try: 402 maxDays = int(self.dmd.ZenEventManager.historyMaxAgeDays) 403 except ValueError: 404 maxDays = 0 405 if maxDays > 0: 406 # lastDeleteHistoricalEvents_datetime is when the deletion 407 # script was last run 408 lastRun = getattr(self.dmd, 409 'lastDeleteHistoricalEvents_datetime', None) 410 # lastDeleteHistoricalEvents_days is the value of historyMaxAgeDays 411 # the last time the deletion script was run. If this value has 412 # changed then we run the script again regardless of when it was 413 # last run. 414 lastAge = getattr(self.dmd, 415 'lastDeleteHistoricalEvents_days', None) 416 now = datetime.datetime.now() 417 if not lastRun \ 418 or now - lastRun > datetime.timedelta(1) \ 419 or lastAge != maxDays \ 420 or force: 421 self.log.info('Deleting historical events older than %s days' % 422 maxDays) 423 startTime = time.time() 424 cmd = Utils.zenPath('Products', 'ZenUtils', 425 'ZenDeleteHistory.py') 426 args = ['--numDays=%s' % maxDays] 427 if deferred: 428 # We're in a twisted reactor, so make a twisty call 429 d = twisted.internet.utils.getProcessOutput( 430 cmd, args, os.environ, errortoo=True) 431 d.addCallback(onSuccess, startTime) 432 d.addErrback(onError, startTime) 433 else: 434 # Not in a reactor, so do this in a blocking manner 435 proc = subprocess.Popen( 436 [cmd]+args, stdout=subprocess.PIPE, 437 stderr=subprocess.STDOUT, env=os.environ) 438 # Trying to mimic how twisted returns results to us 439 # sort of. 440 output, _ = proc.communicate() 441 code = proc.wait() 442 if code: 443 onError(output, startTime) 444 else: 445 onSuccess(output, startTime) 446 # Record circumstances of this run 447 self.dmd.lastDeleteHistoricalEvents_datetime = now 448 self.dmd.lastDeleteHistoricalEvents_days = maxDays 449 transaction.commit() 450 return d 451 452
453 - def heartbeatEvents(self):
454 """Create events for failed heartbeats. 455 """ 456 # build cache of existing heartbeat issues 457 q = ("SELECT device, component " 458 "FROM status WHERE eventClass = '%s'" % Status_Heartbeat) 459 heartbeatState = Set(self.query(q)) 460 461 # find current heartbeat failures 462 sel = "SELECT device, component FROM heartbeat " 463 sel += "WHERE DATE_ADD(lastTime, INTERVAL timeout SECOND) <= NOW();" 464 for device, comp in self.query(sel): 465 self.sendEvent( 466 Event.Event(device=device, component=comp, 467 eventClass=Status_Heartbeat, 468 summary="%s %s heartbeat failure" % (device, comp), 469 severity=Event.Error)) 470 heartbeatState.discard((device, comp)) 471 472 # clear heartbeats 473 for device, comp in heartbeatState: 474 self.sendEvent( 475 Event.Event(device=device, component=comp, 476 eventClass=Status_Heartbeat, 477 summary="%s %s heartbeat clear" % (device, comp), 478 severity=Event.Clear))
479
480 - def runEventCommand(self, cmd, data, clear = None):
481 try: 482 command = cmd.command 483 if clear: 484 command = cmd.clearCommand 485 device = self.dmd.Devices.findDevice(data.get('device', '')) 486 component = None 487 if device: 488 componentName = data.get('component') 489 for c in device.getMonitoredComponents(): 490 if c.id == componentName: 491 component = c 492 break 493 compiled = talesCompile('string:' + command) 494 environ = {'dev':device, 'component':component, 'evt':data } 495 res = compiled(getEngine().getContext(environ)) 496 if isinstance(res, Exception): 497 raise res 498 prot = EventCommandProtocol(cmd, self) 499 self.log.info('Running %s' % res) 500 reactor.spawnProcess(prot, '/bin/sh', 501 ('/bin/sh', '-c', res), 502 env=None) 503 except Exception: 504 self.log.exception('Error running command %s', cmd.id) 505 return True
506 507
508 - def eventCommands(self, zem):
509 now = time.time() 510 count = 0 511 for command in zem.commands(): 512 if command.enabled: 513 count += 1 514 self.processEvent(zem, command, self.runEventCommand) 515 self.log.info("Processed %d commands in %f", count, time.time() - now)
516 517
518 - def mainbody(self):
519 """main loop to run actions. 520 """ 521 from twisted.internet.process import reapAllProcesses 522 reapAllProcesses() 523 zem = self.dmd.ZenEventManager 524 self.loadActionRules() 525 self.eventCommands(zem) 526 self.processRules(zem) 527 self.checkVersion(zem) 528 self.maintenance(zem) 529 self.deleteHistoricalEvents(deferred=self.options.cycle) 530 self.heartbeatEvents()
531 532
533 - def runCycle(self):
534 try: 535 start = time.time() 536 self.syncdb() 537 self.mainbody() 538 self.log.info("processed %s rules in %.2f secs", 539 len(self.actions), time.time()-start) 540 self.sendHeartbeat() 541 except: 542 self.log.exception("unexpected exception") 543 reactor.callLater(self.options.cycletime, self.runCycle)
544 545
546 - def run(self):
547 if not self.options.cycle: 548 self.sendHeartbeat() 549 self.schedule.run() 550 return self.mainbody() 551 self.schedule.start() 552 self.runCycle() 553 reactor.run()
554 555
556 - def sendEvent(self, evt):
557 """Send event to the system. 558 """ 559 self.dmd.ZenEventManager.sendEvent(evt)
560 561
562 - def sendHeartbeat(self):
563 """Send a heartbeat event for this monitor. 564 """ 565 timeout = self.options.cycletime*3 566 evt = Event.EventHeartbeat(self.options.monitor, "zenactions", timeout) 567 self.sendEvent(evt) 568 self.niceDoggie(self.options.cycletime)
569 570
571 - def stop(self):
572 self.running = False 573 self.log.info("stopping") 574 self.sendEvent(Event.Event(device=self.options.monitor, 575 eventClass=App_Stop, 576 summary="zenactions stopped", 577 severity=3, component="zenactions"))
578
579 - def format(self, action, data, clear):
580 fmt = action.format 581 body = action.body 582 if clear: 583 fmt = action.clearFormat 584 body = action.clearBody 585 try: 586 fmt = fmt % data 587 except Exception, ex: 588 fmt = "Error formatting event: %s" % (str(ex),) 589 try: 590 body = body % data 591 except Exception, ex: 592 body = "Error formatting event body: %s" % (str(ex),) 593 return fmt, body
594
595 - def stripTags(self, data):
596 """A quick html => plaintext converter 597 that retains and displays anchor hrefs 598 """ 599 import re 600 tags = re.compile(r'<(.|\n)+?>', re.I|re.M) 601 aattrs = re.compile(r'<a(.|\n)+?href=["\']([^"\']*)[^>]*?>([^<>]*?)</a>', re.I|re.M) 602 anchors = re.finditer(aattrs, data) 603 for x in anchors: data = data.replace(x.group(), "%s: %s" % (x.groups()[2], x.groups()[1])) 604 data = re.sub(tags, '', data) 605 return data
606
607 - def sendPage(self, action, data, clear = None):
608 """Send and event to a pager. Return True if we think page was sent, 609 False otherwise. 610 """ 611 fmt, body = self.format(action, data, clear) 612 recipients = action.getAddresses() 613 if not recipients: 614 self.log.warning('failed to page %s on rule %s: %s', 615 action.getUser().id, action.id, 616 'Unspecified address.') 617 return True 618 619 result = False 620 for recipient in recipients: 621 success, errorMsg = Utils.sendPage(recipient, 622 fmt, 623 self.dmd.pageCommand) 624 if success: 625 self.log.info('sent page to %s: %s', recipient, fmt) 626 # return True if anyone got the page 627 result = result or success 628 else: 629 self.log.info('failed to send page to %s: %s %s', 630 recipient, 631 fmt, 632 errorMsg) 633 return result
634 635 636
637 - def sendEmail(self, action, data, clear = None):
638 """Send an event to an email address. 639 Return True if we think the email was sent, False otherwise. 640 """ 641 from email.MIMEText import MIMEText 642 from email.MIMEMultipart import MIMEMultipart 643 addr = action.getAddresses() 644 if not addr: 645 self.log.warning('failed to email %s on rule %s: %s', 646 action.getUser().id, action.id, 'Unspecified address.') 647 return True 648 649 fmt, htmlbody = self.format(action, data, clear) 650 htmlbody = htmlbody.replace('\n','<br/>\n') 651 body = self.stripTags(htmlbody) 652 plaintext = MIMEText(body) 653 654 emsg = None 655 if action.plainText: 656 emsg = plaintext 657 else: 658 emsg = MIMEMultipart('related') 659 emsgAlternative = MIMEMultipart('alternative') 660 emsg.attach( emsgAlternative ) 661 html = MIMEText(htmlbody) 662 html.set_type('text/html') 663 emsgAlternative.attach(plaintext) 664 emsgAlternative.attach(html) 665 666 emsg['Subject'] = fmt 667 emsg['From'] = self.dmd.getEmailFrom() 668 emsg['To'] = ', '.join(addr) 669 emsg['Date'] = formatdate(None, True) 670 result, errorMsg = Utils.sendEmail(emsg, self.dmd.smtpHost, 671 self.dmd.smtpPort, self.dmd.smtpUseTLS, self.dmd.smtpUser, 672 self.dmd.smtpPass) 673 if result: 674 self.log.info("rule '%s' sent email:%s to:%s", 675 action.id, fmt, addr) 676 else: 677 self.log.info("rule '%s' failed to send email to %s: %s %s", 678 action.id, ','.join(addr), fmt, errorMsg) 679 return result
680 681
682 - def buildOptions(self):
683 ZCmdBase.buildOptions(self) 684 self.parser.add_option('--cycletime', 685 dest='cycletime', default=60, type="int", 686 help="check events every cycletime seconds") 687 self.parser.add_option( 688 '--zopeurl', dest='zopeurl', 689 default='http://%s:%d' % (socket.getfqdn(), 8080), 690 help="http path to the root of the zope server") 691 self.parser.add_option("--monitor", dest="monitor", 692 default=DEFAULT_MONITOR, 693 help="Name of monitor instance to use for heartbeat " 694 " events. Default is %s." % DEFAULT_MONITOR)
695 696
697 - def sigTerm(self, signum=None, frame=None):
698 'controlled shutdown of main loop on interrupt' 699 try: 700 ZCmdBase.sigTerm(self, signum, frame) 701 except SystemExit: 702 reactor.stop()
703 704 if __name__ == "__main__": 705 za = ZenActions() 706 import logging 707 logging.getLogger('zen.Events').setLevel(20) 708 za.run() 709