Package ZenEvents :: Module zenactions
[hide private]
[frames] | no frames]

Source Code for Module ZenEvents.zenactions

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  #! /usr/bin/env python  
 14   
 15  __doc__='''zenactions 
 16   
 17  Turn events into notifications (pages, emails). 
 18   
 19  $Id$ 
 20  ''' 
 21   
 22  __version__ = "$Revision$"[11:-2] 
 23   
 24   
 25  import os 
 26  import socket 
 27  import time 
 28  from sets import Set 
 29  import Globals 
 30   
 31  from ZODB.POSException import POSError 
 32  from _mysql_exceptions import OperationalError, ProgrammingError  
 33   
 34  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 35  from Products.ZenUtils.ZenTales import talesCompile, getEngine 
 36  from ZenEventClasses import App_Start, App_Stop, Heartbeat, Status_Heartbeat  
 37  from ZenEventClasses import Cmd_Ok, Cmd_Fail 
 38  import Event 
 39  from Schedule import Schedule 
 40  from UpdateCheck import UpdateCheck 
 41  import Products.ZenUtils.Utils as Utils 
 42  from twisted.internet import reactor 
 43  from twisted.internet.protocol import ProcessProtocol 
 44  from DateTime import DateTime 
 45   
46 -def _capitalize(s):
47 return s[0:1].upper() + s[1:]
48
49 -class EventCommandProtocol(ProcessProtocol):
50
51 - def __init__(self, cmd, server):
52 self.cmd = cmd 53 self.server = server 54 self.data = '' 55 self.error = '' 56 self.timeout = reactor.callLater(cmd.defaultTimeout, self.timedOut)
57
58 - def timedOut(self):
59 self.server.log.error("Command %s timed out" % self.cmd.id) 60 self.timeout = None
61
62 - def processEnded(self, reason):
63 self.server.log.debug("Command finished: %s" % reason) 64 code = 1 65 try: 66 code = reason.value.exitCode 67 except AttributeError, ex: 68 pass 69 if self.data and code == 0: 70 self.server.log.debug("Command %s says: %s", self.cmd.id, self.data) 71 self.server.sendEvent(Event.Event(device=socket.getfqdn(), 72 eventClass=Cmd_Ok, 73 summary=self.data, 74 severity=Event.Clear, 75 component="zenactions")) 76 return 77 if self.timeout: 78 self.timeout.cancel() 79 self.timeout = None 80 summary="Timeout running: %s: %s" % (self.cmd.id, 81 'command timed out') 82 else: 83 summary="Error running: %s: %s" % (self.cmd.id, 84 'command timed out') 85 if self.error: 86 self.server.log.error("Command %s, exit code %d: %s", 87 self.cmd.id, code, self.error) 88 summary="Error running: %s: %s" % (self.cmd.id, self.error) 89 self.server.sendEvent(Event.Event(device=socket.getfqdn(), 90 eventClass=Cmd_Fail, 91 summary=summary, 92 severity=Event.Error, 93 component="zenactions"))
94
95 - def outReceived(self, text):
96 self.data += text
97
98 - def errReceived(self, text):
99 self.error += text
100 101
102 -class ZenActions(ZCmdBase):
103 """ 104 Take actions based on events in the event manager. 105 Start off by sending emails and pages. 106 """ 107 108 lastCommand = None 109 110 addstate = ("INSERT INTO alert_state " 111 "VALUES ('%s', '%s', '%s', NULL) " 112 "ON DUPLICATE KEY UPDATE lastSent = now()") 113 114 115 clearstate = ("DELETE FROM alert_state " 116 " WHERE evid='%s' " 117 " AND userid='%s' " 118 " AND rule='%s'") 119 120 #FIXME attempt to convert subquery to left join that doesn't work 121 # newsel = """select %s, evid from status s left join alert_state a 122 # on s.evid=a.evid where a.evid is null and 123 # a.userid='%s' and a.rule='%s'""" 124 125 newsel = ("SELECT %s, evid FROM status WHERE " 126 "%s AND evid NOT IN " 127 " (SELECT evid FROM alert_state " 128 " WHERE userid='%s' AND rule='%s' %s)") 129 130 clearsel = ("SELECT %s, h.evid FROM history h, alert_state a " 131 " WHERE h.evid=a.evid AND a.userid='%s' AND a.rule='%s'") 132 133 clearEventSelect = ("SELECT %s " 134 " FROM history clear, history event " 135 " WHERE clear.evid = event.clearid " 136 " AND event.evid = '%s'") 137 138
139 - def __init__(self):
140 ZCmdBase.__init__(self) 141 self.schedule = Schedule(self.options, self.dmd) 142 self.actions = [] 143 self.loadActionRules() 144 self.updateCheck = UpdateCheck() 145 self.sendEvent(Event.Event(device=socket.getfqdn(), 146 eventClass=App_Start, 147 summary="zenactions started", 148 severity=0, component="zenactions"))
149
150 - def loadActionRules(self):
151 """Load the ActionRules into the system. 152 """ 153 self.actions = [] 154 for ar in self.dmd.ZenUsers.getAllActionRules(): 155 if not ar.enabled: continue 156 userid = ar.getUser().id 157 self.actions.append(ar) 158 self.log.debug("action:%s for:%s loaded", ar.getId(), userid)
159 160
161 - def execute(self, stmt):
162 result = None 163 self.lastCommand = stmt 164 self.log.debug(stmt) 165 zem = self.dmd.ZenEventManager 166 conn = zem.connect() 167 try: 168 curs = conn.cursor() 169 result = curs.execute(stmt) 170 finally: zem.close(conn) 171 return result
172 173
174 - def query(self, stmt):
175 result = None 176 self.lastCommand = stmt 177 self.log.debug(stmt) 178 zem = self.dmd.ZenEventManager 179 conn = zem.connect() 180 try: 181 curs = conn.cursor() 182 curs.execute(stmt) 183 result = curs.fetchall() 184 finally: zem.close(conn) 185 return result
186 187
188 - def getUrl(self, evid):
189 return '%s/zport/dmd/ZenEventManager/eventFields?evid=%s' % ( 190 self.options.zopeurl, evid)
191 192
193 - def getEventsUrl(self, device):
194 return '%s%s/viewEvents' % ( 195 self.options.zopeurl, device.getPrimaryUrlPath())
196 197
198 - def getAckUrl(self, evid):
199 return '%s/zport/dmd/Events/manage_ackEvents?evids=%s&zenScreenName=viewEvents' % (self.options.zopeurl, evid)
200 201
202 - def getDeleteUrl(self, evid):
203 return '%s/zport/dmd/Events/manage_deleteEvents' % self.options.zopeurl + \ 204 '?evids=%s&zenScreenName=viewHistoryEvents' % evid
205 206
207 - def getUndeleteUrl(self, evid):
208 return '%s/zport/dmd/Events/manage_undeleteEvents' % self.options.zopeurl + \ 209 '?evid=%s&zenScreenName=viewEvents' % evid
210 211
212 - def processRules(self, zem):
213 """Run through all rules matching them against events. 214 """ 215 for ar in self.actions: 216 try: 217 self.lastCommand = None 218 # call sendPage or sendEmail 219 actfunc = getattr(self, "send"+ar.action.title()) 220 self.processEvent(zem, ar, actfunc) 221 except (SystemExit, KeyboardInterrupt, OperationalError, POSError): 222 raise 223 except: 224 if self.lastCommand: 225 self.log.warning(self.lastCommand) 226 self.log.exception("action:%s",ar.getId())
227
228 - def checkVersion(self, zem):
229 self.updateCheck.check(self.dmd, zem) 230 import transaction 231 transaction.commit()
232
233 - def processEvent(self, zem, context, action):
234 fields = context.getEventFields() 235 userid = context.getUserid() 236 # get new events 237 nwhere = context.where.strip() or '1 = 1' 238 if context.delay > 0: 239 nwhere += " and firstTime + %s < UNIX_TIMESTAMP()" % context.delay 240 awhere = '' 241 if context.repeatTime: 242 awhere += ' and lastSent + %d > now() ' % context.repeatTime 243 q = self.newsel % (",".join(fields), nwhere, userid, context.getId(), awhere) 244 for result in self.query(q): 245 evid = result[-1] 246 data = dict(zip(fields, map(zem.convert, fields, result[:-1]))) 247 data['eventUrl'] = self.getUrl(evid) 248 device = self.dmd.Devices.findDevice(data.get('device', None)) 249 if device: 250 data['eventsUrl'] = self.getEventsUrl(device) 251 else: 252 data['eventsUrl'] = 'n/a' 253 data['ackUrl'] = self.getAckUrl(evid) 254 data['deleteUrl'] = self.getDeleteUrl(evid) 255 severity = data.get('severity', -1) 256 data['severityString'] = zem.getSeverityString(severity) 257 if action(context, data, False): 258 addcmd = self.addstate % (evid, userid, context.getId()) 259 self.execute(addcmd) 260 261 # get clear events 262 historyFields = [("h.%s" % f) for f in fields] 263 historyFields = ','.join(historyFields) 264 q = self.clearsel % (historyFields, userid, context.getId()) 265 for result in self.query(q): 266 evid = result[-1] 267 data = dict(zip(fields, map(zem.convert, fields, result[:-1]))) 268 269 # get clear columns 270 cfields = [('clear.%s' % x) for x in fields] 271 q = self.clearEventSelect % (",".join(cfields), evid) 272 273 # convert clear columns to clear names 274 cfields = [('clear%s' % _capitalize(x)) for x in fields] 275 276 # there might not be a clear event, so set empty defaults 277 data.update({}.fromkeys(cfields, "")) 278 279 # pull in the clear event data 280 for values in self.query(q): 281 values = map(zem.convert, fields, values) 282 data.update(dict(zip(cfields, values))) 283 284 data['clearOrEventSummary'] = ( 285 data['clearSummary'] or data['summary']) 286 287 # add in the link to the url 288 data['eventUrl'] = self.getUrl(evid) 289 severity = data.get('severity', -1) 290 data['severityString'] = zem.getSeverityString(severity) 291 if getattr(context, 'sendClear', True): 292 action(context, data, True) 293 delcmd = self.clearstate % (evid, userid, context.getId()) 294 self.execute(delcmd)
295
296 - def maintenance(self, zem):
297 """Run stored procedures that maintain the events database. 298 """ 299 sql = 'call age_events(%s, %s);' % ( 300 zem.eventAgingHours, zem.eventAgingSeverity) 301 try: 302 self.execute(sql) 303 except ProgrammingError: 304 self.log.exception("problem with proc: '%s'" % sql)
305 306
307 - def heartbeatEvents(self):
308 """Create events for failed heartbeats. 309 """ 310 # build cache of existing heartbeat issues 311 q = ("SELECT device, component " 312 "FROM status WHERE eventClass = '%s'" % Status_Heartbeat) 313 heartbeatState = Set(self.query(q)) 314 315 # find current heartbeat failures 316 sel = "SELECT device, component FROM heartbeat " 317 sel += "WHERE DATE_ADD(lastTime, INTERVAL timeout SECOND) <= NOW();" 318 for device, comp in self.query(sel): 319 self.sendEvent( 320 Event.Event(device=device, component=comp, 321 eventClass=Status_Heartbeat, 322 summary="%s %s heartbeat failure" % (device, comp), 323 severity=Event.Error)) 324 heartbeatState.discard((device, comp)) 325 326 # clear heartbeats 327 for device, comp in heartbeatState: 328 self.sendEvent( 329 Event.Event(device=device, component=comp, 330 eventClass=Status_Heartbeat, 331 summary="%s %s heartbeat clear" % (device, comp), 332 severity=Event.Clear))
333
334 - def runEventCommand(self, cmd, data, clear = None):
335 try: 336 command = cmd.command 337 if clear: 338 command = cmd.clearCommand 339 device = self.dmd.Devices.findDevice(data.get('device', '')) 340 component = None 341 if device: 342 componentName = data.get('component') 343 for c in device.getMonitoredComponents(): 344 if c.id == componentName: 345 component = c 346 break 347 compiled = talesCompile('string:' + command) 348 environ = {'dev':device, 'component':component, 'evt':data } 349 res = compiled(getEngine().getContext(environ)) 350 if isinstance(res, Exception): 351 raise res 352 prot = EventCommandProtocol(cmd, self) 353 self.log.info('Running %s' % res) 354 reactor.spawnProcess(prot, '/bin/sh', 355 ('/bin/sh', '-c', res), 356 env=None) 357 except Exception: 358 self.log.exception('Error running command %s', cmd.id) 359 return True
360 361
362 - def eventCommands(self, zem):
363 now = time.time() 364 count = 0 365 for command in zem.commands(): 366 if command.enabled: 367 count += 1 368 self.processEvent(zem, command, self.runEventCommand) 369 self.log.info("Processed %d commands in %f", count, time.time() - now)
370 371
372 - def mainbody(self):
373 """main loop to run actions. 374 """ 375 from twisted.internet.process import reapAllProcesses 376 reapAllProcesses() 377 zem = self.dmd.ZenEventManager 378 self.loadActionRules() 379 self.eventCommands(zem) 380 self.processRules(zem) 381 self.checkVersion(zem) 382 self.maintenance(zem) 383 self.heartbeatEvents()
384
385 - def runCycle(self):
386 try: 387 start = time.time() 388 self.syncdb() 389 self.mainbody() 390 self.log.info("processed %s rules in %.2f secs", 391 len(self.actions), time.time()-start) 392 self.sendHeartbeat() 393 except: 394 self.log.exception("unexpected exception") 395 reactor.callLater(self.options.cycletime, self.runCycle)
396 397
398 - def run(self):
399 if not self.options.cycle: 400 self.schedule.run() 401 return self.mainbody() 402 self.schedule.start() 403 self.runCycle() 404 reactor.run()
405 406
407 - def sendEvent(self, evt):
408 """Send event to the system. 409 """ 410 self.dmd.ZenEventManager.sendEvent(evt)
411 412
413 - def sendHeartbeat(self):
414 """Send a heartbeat event for this monitor. 415 """ 416 timeout = self.options.cycletime*3 417 evt = Event.EventHeartbeat(socket.getfqdn(), "zenactions", timeout) 418 self.sendEvent(evt)
419 420
421 - def stop(self):
422 self.running = False 423 self.log.info("stopping") 424 self.sendEvent(Event.Event(device=socket.getfqdn(), 425 eventClass=App_Stop, 426 summary="zenactions stopped", 427 severity=3, component="zenactions"))
428
429 - def format(self, action, data, clear):
430 fmt = action.format 431 body = action.body 432 if clear: 433 fmt = action.clearFormat 434 body = action.clearBody 435 return fmt % data, body % data
436
437 - def stripTags(self, data):
438 """A quick html => plaintext converter 439 that retains and displays anchor hrefs 440 """ 441 import re 442 tags = re.compile(r'<(.|\n)+?>', re.I|re.M) 443 aattrs = re.compile(r'<a(.|\n)+?href=["\']([^"\']*)[^>]*?>([^<>]*?)</a>', re.I|re.M) 444 anchors = re.finditer(aattrs, data) 445 for x in anchors: data = data.replace(x.group(), "%s: %s" % (x.groups()[2], x.groups()[1])) 446 data = re.sub(tags, '', data) 447 return data
448
449 - def sendPage(self, action, data, clear = None):
450 """Send and event to a pager. Return True if we think page was sent, 451 False otherwise. 452 """ 453 fmt, body = self.format(action, data, clear) 454 msg = fmt % data 455 recipients = action.getAddresses() 456 if not recipients: 457 self.log.warning('failed to page %s on rule %s: %s', 458 action.getUser().id, action.id, 459 'Unspecified address.') 460 return True 461 462 result = False 463 for recipient in recipients: 464 success, errorMsg = Utils.sendPage(recipient, 465 msg, 466 self.dmd.snppHost, 467 self.dmd.snppPort) 468 if success: 469 self.log.info('sent page to %s: %s', recipient, msg) 470 # return True if anyone got the page 471 result = result or success 472 else: 473 self.log.info('failed to send page to %s: %s %s', 474 recipient, 475 msg, 476 errorMsg) 477 return result
478 479 480
481 - def sendEmail(self, action, data, clear = None):
482 """Send an event to an email address. 483 Return True if we think the email was sent, False otherwise. 484 """ 485 import smtplib 486 from email.MIMEText import MIMEText 487 from email.MIMEMultipart import MIMEMultipart 488 addr = action.getAddresses() 489 if not addr: 490 self.log.warning('failed to email %s on rule %s: %s', 491 action.getUser().id, action.id, 'Unspecified address.') 492 return True 493 494 fmt, htmlbody = self.format(action, data, clear) 495 htmlbody = htmlbody.replace('\n','<br/>\n') 496 body = self.stripTags(htmlbody) 497 emsg = MIMEMultipart('related') 498 emsgAlternative = MIMEMultipart('alternative') 499 emsg.attach( emsgAlternative ) 500 plaintext = MIMEText(body) 501 html = MIMEText(htmlbody) 502 html.set_type('text/html') 503 emsgAlternative.attach(plaintext) 504 emsgAlternative.attach(html) 505 emsg['Subject'] = fmt 506 emsg['From'] = self.dmd.getEmailFrom() 507 emsg['To'] = ', '.join(addr) 508 emsg['Date'] = DateTime().rfc822() 509 result, errorMsg = Utils.sendEmail(emsg, self.dmd.smtpHost, 510 self.dmd.smtpPort, self.dmd.smtpUseTLS, self.dmd.smtpUser, 511 self.dmd.smtpPass) 512 if result: 513 self.log.info("sent email:%s to:%s", fmt, addr) 514 else: 515 self.log.info("failed to send email to %s: %s %s", 516 ','.join(addr), fmt, errorMsg) 517 return result
518 519
520 - def buildOptions(self):
521 ZCmdBase.buildOptions(self) 522 self.parser.add_option('--cycletime', 523 dest='cycletime', default=60, type="int", 524 help="check events every cycletime seconds") 525 self.parser.add_option( 526 '--zopeurl', dest='zopeurl', 527 default='http://%s:%d' % (socket.getfqdn(), 8080), 528 help="http path to the root of the zope server")
529 530
531 - def sigTerm(self, signum, frame):
532 'controlled shutdown of main loop on interrupt' 533 try: 534 ZCmdBase.sigTerm(self, signum, frame) 535 except SystemExit: 536 reactor.stop()
537 538 if __name__ == "__main__": 539 za = ZenActions() 540 import logging 541 logging.getLogger('zen.Events').setLevel(20) 542 za.run() 543