1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__='''zenactions
16
17 Turn events into notifications (pages, emails).
18
19 '''
20
21
22 import socket
23 import time
24 from sets import Set
25 import Globals
26
27 from ZODB.POSException import POSError
28 from _mysql_exceptions import OperationalError, ProgrammingError
29
30 from Products.ZenUtils.ZCmdBase import ZCmdBase
31 from Products.ZenUtils.ZenTales import talesCompile, getEngine
32 from Products.ZenEvents.Exceptions import ZenEventNotFound
33 from ZenEventClasses import App_Start, App_Stop, Status_Heartbeat
34 from ZenEventClasses import Cmd_Fail
35 import Event
36 from Schedule import Schedule
37 from UpdateCheck import UpdateCheck
38 from Products.ZenUtils import Utils
39 from twisted.internet import reactor
40 from twisted.internet.protocol import ProcessProtocol
41 from email.Utils import formatdate
42
43 DEFAULT_MONITOR = "localhost"
44
46 return s[0:1].upper() + s[1:]
47
49
56
68
70 self.server.log.debug("Command finished: %s" % reason.getErrorMessage())
71 code = 1
72 try:
73 code = reason.value.exitCode
74 except AttributeError:
75 pass
76
77
78 if self.timeout:
79 self.timeout.cancel()
80 self.timeout = None
81
82 if code == 0:
83 cmdData = self.data or "<command produced no output>"
84 self.server.log.debug("Command %s says: %s", self.cmd.id, cmdData)
85 self.server.sendEvent(Event.Event(
86 device=self.server.options.monitor,
87 eventClass=Cmd_Fail,
88 severity=Event.Clear,
89 component="zenactions",
90 eventKey=self.cmd.id,
91 summary="Command succeeded: %s: %s" % (
92 self.cmd.id, cmdData),
93 ))
94 else:
95 cmdError = self.error or "<command produced no output>"
96 self.server.log.error("Command %s says %s", self.cmd.id, cmdError)
97 self.server.sendEvent(Event.Event(
98 device=self.server.options.monitor,
99 eventClass=Cmd_Fail,
100 severity=Event.Error,
101 component="zenactions",
102 eventKey=self.cmd.id,
103 summary="Error running: %s: %s" % (
104 self.cmd.id, cmdError),
105 ))
106
109
112
113
115 """
116 Take actions based on events in the event manager.
117 Start off by sending emails and pages.
118 """
119
120 lastCommand = None
121
122 addstate = ("INSERT INTO alert_state "
123 "VALUES ('%s', '%s', '%s', NULL) "
124 "ON DUPLICATE KEY UPDATE lastSent = now()")
125
126
127 clearstate = ("DELETE FROM alert_state "
128 " WHERE evid='%s' "
129 " AND userid='%s' "
130 " AND rule='%s'")
131
132
133
134
135
136
137 newsel = ("SELECT %s, evid FROM status WHERE "
138 "%s AND evid NOT IN "
139 " (SELECT evid FROM alert_state "
140 " WHERE userid='%s' AND rule='%s' %s)")
141
142 clearsel = ("SELECT %s, h.evid FROM history h, alert_state a "
143 " WHERE h.evid=a.evid AND a.userid='%s' AND a.rule='%s'")
144
145 clearEventSelect = ("SELECT %s "
146 " FROM history clear, history event "
147 " WHERE clear.evid = event.clearid "
148 " AND event.evid = '%s'")
149
150
164
166 """Load the ActionRules into the system.
167 """
168 self.actions = []
169 for ar in self.dmd.ZenUsers.getAllActionRules():
170 if not ar.enabled: continue
171 userid = ar.getUser().id
172 self.actions.append(ar)
173 self.log.debug("action:%s for:%s loaded", ar.getId(), userid)
174
175
187
188
201
202
209
210
213
214
217
218
220 return "%s/manage_ackEvents?evids=%s&zenScreenName=viewEvents" % (
221 self.getBaseUrl(device), evid)
222
223
225 return "%s/manage_deleteEvents?evids=%s" % (
226 self.getBaseUrl(device), evid) + \
227 "&zenScreenName=viewHistoryEvents"
228
229
231 return "%s/manage_undeleteEvents?evids=%s" % (
232 self.getBaseUrl(device), evid) + \
233 "&zenScreenName=viewEvents"
234
235
237 """Run through all rules matching them against events.
238 """
239 for ar in self.actions:
240 try:
241 self.lastCommand = None
242
243 actfunc = getattr(self, "send"+ar.action.title())
244 self.processEvent(zem, ar, actfunc)
245 except (SystemExit, KeyboardInterrupt, OperationalError, POSError):
246 raise
247 except:
248 if self.lastCommand:
249 self.log.warning(self.lastCommand)
250 self.log.exception("action:%s",ar.getId())
251
253 self.updateCheck.check(self.dmd, zem)
254 import transaction
255 transaction.commit()
256
258 fields = context.getEventFields()
259 userid = context.getUserid()
260
261 nwhere = context.where.strip() or '1 = 1'
262 if context.delay > 0:
263 nwhere += " and firstTime + %s < UNIX_TIMESTAMP()" % context.delay
264 awhere = ''
265 if context.repeatTime:
266 awhere += ' and DATE_ADD(lastSent, INTERVAL %d SECOND) > now() ' % (
267 context.repeatTime,)
268 q = self.newsel % (",".join(fields), nwhere, userid, context.getId(),
269 awhere)
270 for result in self.query(q):
271 evid = result[-1]
272 data = dict(zip(fields, map(zem.convert, fields, result[:-1])))
273
274
275
276 try:
277 details = dict( zem.getEventDetail(evid).getEventDetails() )
278 data.update( details )
279 except ZenEventNotFound:
280 pass
281
282 device = self.dmd.Devices.findDevice(data.get('device', None))
283 data['eventUrl'] = self.getEventUrl(evid, device)
284 if device:
285 data['eventsUrl'] = self.getEventsUrl(device)
286 else:
287 data['eventsUrl'] = 'n/a'
288 data['device'] = data.get('device', None) or ''
289 data['ackUrl'] = self.getAckUrl(evid, device)
290 data['deleteUrl'] = self.getDeleteUrl(evid, device)
291 severity = data.get('severity', -1)
292 data['severityString'] = zem.getSeverityString(severity)
293 if action(context, data, False):
294 addcmd = self.addstate % (evid, userid, context.getId())
295 self.execute(addcmd)
296
297
298 historyFields = [("h.%s" % f) for f in fields]
299 historyFields = ','.join(historyFields)
300 q = self.clearsel % (historyFields, userid, context.getId())
301 for result in self.query(q):
302 evid = result[-1]
303 data = dict(zip(fields, map(zem.convert, fields, result[:-1])))
304
305
306
307 try:
308 details = dict( zem.getEventDetailFromStatusOrHistory(evid).getEventDetails() )
309 data.update( details )
310 except ZenEventNotFound:
311 pass
312
313
314 cfields = [('clear.%s' % x) for x in fields]
315 q = self.clearEventSelect % (",".join(cfields), evid)
316
317
318 cfields = [('clear%s' % _capitalize(x)) for x in fields]
319
320
321 data.update({}.fromkeys(cfields, ""))
322
323
324 for values in self.query(q):
325 values = map(zem.convert, fields, values)
326 data.update(dict(zip(cfields, values)))
327
328
329
330
331 if data.get('clearid', None) and not data.get('clearEvid', None):
332 continue
333
334 data['clearOrEventSummary'] = (
335 data['clearSummary'] or data['summary'])
336
337
338
339
340 if not data.get('clearSummary', False) \
341 and data.get('ownerid', False):
342 data['clearSummary'] = data['ownerid']
343 data['clearFirstTime'] = data.get('stateChange', '')
344
345
346 device = self.dmd.Devices.findDevice(data.get('device', None))
347 data['eventUrl'] = self.getEventUrl(evid, device)
348 data['undeleteUrl'] = self.getUndeleteUrl(evid, device)
349 severity = data.get('severity', -1)
350 data['severityString'] = zem.getSeverityString(severity)
351 delcmd = self.clearstate % (evid, userid, context.getId())
352 if getattr(context, 'sendClear', True):
353 if action(context, data, True):
354 self.execute(delcmd)
355 else:
356 self.execute(delcmd)
357
358
359 - def maintenance(self, zem):
360 """Run stored procedures that maintain the events database.
361 """
362 sql = 'call age_events(%s, %s);' % (
363 zem.eventAgingHours, zem.eventAgingSeverity)
364 try:
365 self.execute(sql)
366 except ProgrammingError:
367 self.log.exception("problem with proc: '%s'" % sql)
368
369
371 """
372 Once per day delete events from history table.
373 If force then run the deletion statement regardless of when it was
374 last run (the deletion will still not run if the historyMaxAgeDays
375 setting in the event manager is not greater than zero.)
376 If deferred then we are running in a twisted reactor. Run the
377 deletion script in a non-blocking manner (if it is to be run) and
378 return a deferred (if the deletion script is run.)
379 In all cases return None if the deletion script is not run.
380 """
381 import datetime
382 import os
383 import twisted.internet.utils
384 import Products.ZenUtils.Utils as Utils
385 import transaction
386 import subprocess
387
388 def onSuccess(unused, startTime):
389 self.log.info('Done deleting historical events in %.2f seconds' %
390 (time.time() - startTime))
391 return None
392 def onError(error, startTime):
393 self.log.error('Error deleting historical events after '
394 '%s seconds: %s' % (time.time()-startTime,
395 error))
396 return None
397
398
399
400 d = None
401
402
403
404
405 try:
406 maxDays = int(self.dmd.ZenEventManager.historyMaxAgeDays)
407 except ValueError:
408 maxDays = 0
409 if maxDays > 0:
410
411
412 lastRun = getattr(self.dmd,
413 'lastDeleteHistoricalEvents_datetime', None)
414
415
416
417
418 lastAge = getattr(self.dmd,
419 'lastDeleteHistoricalEvents_days', None)
420 now = datetime.datetime.now()
421 if not lastRun \
422 or now - lastRun > datetime.timedelta(1) \
423 or lastAge != maxDays \
424 or force:
425 self.log.info('Deleting historical events older than %s days' %
426 maxDays)
427 startTime = time.time()
428 cmd = Utils.zenPath('Products', 'ZenUtils',
429 'ZenDeleteHistory.py')
430 args = ['--numDays=%s' % maxDays]
431 if deferred:
432
433 d = twisted.internet.utils.getProcessOutput(
434 cmd, args, os.environ, errortoo=True)
435 d.addCallback(onSuccess, startTime)
436 d.addErrback(onError, startTime)
437 else:
438
439 proc = subprocess.Popen(
440 [cmd]+args, stdout=subprocess.PIPE,
441 stderr=subprocess.STDOUT, env=os.environ)
442
443
444 output, _ = proc.communicate()
445 code = proc.wait()
446 if code:
447 onError(output, startTime)
448 else:
449 onSuccess(output, startTime)
450
451 self.dmd.lastDeleteHistoricalEvents_datetime = now
452 self.dmd.lastDeleteHistoricalEvents_days = maxDays
453 transaction.commit()
454 return d
455
456
473
475 """Create events for failed heartbeats.
476 """
477
478 q = ("SELECT monitor, component "
479 "FROM status WHERE eventClass = '%s'" % Status_Heartbeat)
480 heartbeatState = Set(self.query(q))
481
482
483
484
485 sel = "SELECT device, component FROM heartbeat "
486 sel += "WHERE DATE_ADD(lastTime, INTERVAL timeout SECOND) <= NOW();"
487 for monitor, comp in self.query(sel):
488 hostname = self.fetchMonitorHostname(monitor)
489 self.sendEvent(
490 Event.Event(device=hostname, component=comp,
491 eventClass=Status_Heartbeat,
492 summary="%s %s heartbeat failure" % (monitor, comp),
493 prodState=self.prodState,
494 monitor=monitor,
495 severity=Event.Error))
496 heartbeatState.discard((monitor, comp))
497
498
499 for monitor, comp in heartbeatState:
500 hostname = self.fetchMonitorHostname(monitor)
501 self.sendEvent(
502 Event.Event(device=hostname, component=comp,
503 eventClass=Status_Heartbeat,
504 summary="%s %s heartbeat clear" % (monitor, comp),
505 severity=Event.Clear))
506
508 try:
509 command = cmd.command
510 if clear:
511 command = cmd.clearCommand
512 device = self.dmd.Devices.findDevice(data.get('device', ''))
513 component = None
514 if device:
515 componentName = data.get('component')
516 for c in device.getMonitoredComponents():
517 if c.id == componentName:
518 component = c
519 break
520 compiled = talesCompile('string:' + command)
521 environ = {'dev':device, 'component':component, 'evt':data }
522 res = compiled(getEngine().getContext(environ))
523 if isinstance(res, Exception):
524 raise res
525 prot = EventCommandProtocol(cmd, self)
526 self.log.info('Running %s' % res)
527 reactor.spawnProcess(prot, '/bin/sh',
528 ('/bin/sh', '-c', res),
529 env=None)
530 except Exception:
531 self.log.exception('Error running command %s', cmd.id)
532 return True
533
534
543
544
545 - def mainbody(self):
546 """main loop to run actions.
547 """
548 from twisted.internet.process import reapAllProcesses
549 reapAllProcesses()
550 zem = self.dmd.ZenEventManager
551 self.loadActionRules()
552 self.eventCommands(zem)
553 self.processRules(zem)
554 self.checkVersion(zem)
555 self.maintenance(zem)
556 self.deleteHistoricalEvents(deferred=self.options.cycle)
557 self.heartbeatEvents()
558
559
571
572
574 self.prodState = filter(lambda x: x.split(':')[0] == 'Production',
575 self.dmd.prodStateConversions)
576 import socket
577 self.daemonHostname = socket.getfqdn()
578 self.monitorToHost = {}
579 try:
580
581 self.prodState = int(self.prodState[0].split(':')[1])
582 except:
583 self.prodState = 1000
584
585 if not self.options.cycle:
586 self.sendHeartbeat()
587 self.schedule.run()
588 return self.mainbody()
589 self.schedule.start()
590 self.runCycle()
591 reactor.run()
592
593
595 """Send event to the system.
596 """
597 self.dmd.ZenEventManager.sendEvent(evt)
598
599
607
608
616
632
644
645 - def sendPage(self, action, data, clear = None):
646 """Send and event to a pager. Return True if we think page was sent,
647 False otherwise.
648 """
649 fmt, body = self.format(action, data, clear)
650 recipients = action.getAddresses()
651 if not recipients:
652 self.log.warning('failed to page %s on rule %s: %s',
653 action.getUser().id, action.id,
654 'Unspecified address.')
655 return True
656
657 result = False
658 for recipient in recipients:
659 success, errorMsg = Utils.sendPage(recipient,
660 fmt,
661 self.dmd.pageCommand)
662 if success:
663 self.log.info('sent page to %s: %s', recipient, fmt)
664
665 result = result or success
666 else:
667 self.log.info('failed to send page to %s: %s %s',
668 recipient,
669 fmt,
670 errorMsg)
671 return result
672
673
674
675 - def sendEmail(self, action, data, clear = None):
676 """Send an event to an email address.
677 Return True if we think the email was sent, False otherwise.
678 """
679 from email.MIMEText import MIMEText
680 from email.MIMEMultipart import MIMEMultipart
681 addr = action.getAddresses()
682 if not addr:
683 self.log.warning('failed to email %s on rule %s: %s',
684 action.getUser().id, action.id, 'Unspecified address.')
685 return True
686
687 fmt, htmlbody = self.format(action, data, clear)
688 htmlbody = htmlbody.replace('\n','<br/>\n')
689 body = self.stripTags(htmlbody)
690 plaintext = MIMEText(body)
691
692 emsg = None
693 if action.plainText:
694 emsg = plaintext
695 else:
696 emsg = MIMEMultipart('related')
697 emsgAlternative = MIMEMultipart('alternative')
698 emsg.attach( emsgAlternative )
699 html = MIMEText(htmlbody)
700 html.set_type('text/html')
701 emsgAlternative.attach(plaintext)
702 emsgAlternative.attach(html)
703
704 emsg['Subject'] = fmt
705 emsg['From'] = self.dmd.getEmailFrom()
706 emsg['To'] = ', '.join(addr)
707 emsg['Date'] = formatdate(None, True)
708 result, errorMsg = Utils.sendEmail(emsg, self.dmd.smtpHost,
709 self.dmd.smtpPort, self.dmd.smtpUseTLS, self.dmd.smtpUser,
710 self.dmd.smtpPass)
711 if result:
712 self.log.info("rule '%s' sent email:%s to:%s",
713 action.id, fmt, addr)
714 else:
715 self.log.info("rule '%s' failed to send email to %s: %s %s",
716 action.id, ','.join(addr), fmt, errorMsg)
717 return result
718
719
721 ZCmdBase.buildOptions(self)
722 self.parser.add_option('--cycletime',
723 dest='cycletime', default=60, type="int",
724 help="check events every cycletime seconds")
725 self.parser.add_option(
726 '--zopeurl', dest='zopeurl',
727 default='http://%s:%d' % (socket.getfqdn(), 8080),
728 help="http path to the root of the zope server")
729 self.parser.add_option("--monitor", dest="monitor",
730 default=DEFAULT_MONITOR,
731 help="Name of monitor instance to use for heartbeat "
732 " events. Default is %s." % DEFAULT_MONITOR)
733
734
735 - def sigTerm(self, signum=None, frame=None):
736 'controlled shutdown of main loop on interrupt'
737 try:
738 ZCmdBase.sigTerm(self, signum, frame)
739 except SystemExit:
740 reactor.stop()
741
742 if __name__ == "__main__":
743 za = ZenActions()
744 import logging
745 logging.getLogger('zen.Events').setLevel(20)
746 za.run()
747