1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__='''zenactions
16
17 Turn events into notifications (pages, emails).
18
19 $Id$
20 '''
21
22 __version__ = "$Revision$"[11:-2]
23
24
25 import socket
26 import time
27 from sets import Set
28 import Globals
29
30 from ZODB.POSException import POSError
31 from _mysql_exceptions import OperationalError, ProgrammingError
32
33 from Products.ZenUtils.ZCmdBase import ZCmdBase
34 from Products.ZenUtils.ZenTales import talesCompile, getEngine
35 from ZenEventClasses import App_Start, App_Stop, Status_Heartbeat
36 from ZenEventClasses import Cmd_Fail
37 import Event
38 from Schedule import Schedule
39 from UpdateCheck import UpdateCheck
40 from Products.ZenUtils import Utils
41 from twisted.internet import reactor
42 from twisted.internet.protocol import ProcessProtocol
43 from email.Utils import formatdate
44
45 DEFAULT_MONITOR = "localhost"
46
48 return s[0:1].upper() + s[1:]
49
51
58
70
72 self.server.log.debug("Command finished: %s" % reason.getErrorMessage())
73 code = 1
74 try:
75 code = reason.value.exitCode
76 except AttributeError:
77 pass
78
79
80 if self.timeout:
81 self.timeout.cancel()
82 self.timeout = None
83
84 if code == 0:
85 cmdData = self.data or "<command produced no output>"
86 self.server.log.debug("Command %s says: %s", self.cmd.id, cmdData)
87 self.server.sendEvent(Event.Event(
88 device=self.server.options.monitor,
89 eventClass=Cmd_Fail,
90 severity=Event.Clear,
91 component="zenactions",
92 eventKey=self.cmd.id,
93 summary="Command succeeded: %s: %s" % (
94 self.cmd.id, cmdData),
95 ))
96 else:
97 cmdError = self.error or "<command produced no output>"
98 self.server.log.error("Command %s says %s", self.cmd.id, cmdError)
99 self.server.sendEvent(Event.Event(
100 device=self.server.options.monitor,
101 eventClass=Cmd_Fail,
102 severity=Event.Error,
103 component="zenactions",
104 eventKey=self.cmd.id,
105 summary="Error running: %s: %s" % (
106 self.cmd.id, cmdError),
107 ))
108
111
114
115
117 """
118 Take actions based on events in the event manager.
119 Start off by sending emails and pages.
120 """
121
122 lastCommand = None
123
124 addstate = ("INSERT INTO alert_state "
125 "VALUES ('%s', '%s', '%s', NULL) "
126 "ON DUPLICATE KEY UPDATE lastSent = now()")
127
128
129 clearstate = ("DELETE FROM alert_state "
130 " WHERE evid='%s' "
131 " AND userid='%s' "
132 " AND rule='%s'")
133
134
135
136
137
138
139 newsel = ("SELECT %s, evid FROM status WHERE "
140 "%s AND evid NOT IN "
141 " (SELECT evid FROM alert_state "
142 " WHERE userid='%s' AND rule='%s' %s)")
143
144 clearsel = ("SELECT %s, h.evid FROM history h, alert_state a "
145 " WHERE h.evid=a.evid AND a.userid='%s' AND a.rule='%s'")
146
147 clearEventSelect = ("SELECT %s "
148 " FROM history clear, history event "
149 " WHERE clear.evid = event.clearid "
150 " AND event.evid = '%s'")
151
152
166
168 """Load the ActionRules into the system.
169 """
170 self.actions = []
171 for ar in self.dmd.ZenUsers.getAllActionRules():
172 if not ar.enabled: continue
173 userid = ar.getUser().id
174 self.actions.append(ar)
175 self.log.debug("action:%s for:%s loaded", ar.getId(), userid)
176
177
189
190
203
204
211
212
215
216
219
220
222 return "%s/manage_ackEvents?evids=%s&zenScreenName=viewEvents" % (
223 self.getBaseUrl(device), evid)
224
225
227 return "%s/manage_deleteEvents?evids=%s" % (
228 self.getBaseUrl(device), evid) + \
229 "&zenScreenName=viewHistoryEvents"
230
231
233 return "%s/manage_undeleteEvents?evids=%s" % (
234 self.getBaseUrl(device), evid) + \
235 "&zenScreenName=viewEvents"
236
237
239 """Run through all rules matching them against events.
240 """
241 for ar in self.actions:
242 try:
243 self.lastCommand = None
244
245 actfunc = getattr(self, "send"+ar.action.title())
246 self.processEvent(zem, ar, actfunc)
247 except (SystemExit, KeyboardInterrupt, OperationalError, POSError):
248 raise
249 except:
250 if self.lastCommand:
251 self.log.warning(self.lastCommand)
252 self.log.exception("action:%s",ar.getId())
253
255 self.updateCheck.check(self.dmd, zem)
256 import transaction
257 transaction.commit()
258
260 fields = context.getEventFields()
261 userid = context.getUserid()
262
263 nwhere = context.where.strip() or '1 = 1'
264 if context.delay > 0:
265 nwhere += " and firstTime + %s < UNIX_TIMESTAMP()" % context.delay
266 awhere = ''
267 if context.repeatTime:
268 awhere += ' and DATE_ADD(lastSent, INTERVAL %d SECOND) > now() ' % (
269 context.repeatTime,)
270 q = self.newsel % (",".join(fields), nwhere, userid, context.getId(),
271 awhere)
272 for result in self.query(q):
273 evid = result[-1]
274 data = dict(zip(fields, map(zem.convert, fields, result[:-1])))
275
276
277
278 try:
279 details = dict( zem.getEventDetail(evid).getEventDetails() )
280 data.update( details )
281 except ZenEventNotFound:
282 pass
283
284 device = self.dmd.Devices.findDevice(data.get('device', None))
285 data['eventUrl'] = self.getEventUrl(evid, device)
286 if device:
287 data['eventsUrl'] = self.getEventsUrl(device)
288 else:
289 data['eventsUrl'] = 'n/a'
290 data['device'] = data.get('device', None) or ''
291 data['ackUrl'] = self.getAckUrl(evid, device)
292 data['deleteUrl'] = self.getDeleteUrl(evid, device)
293 severity = data.get('severity', -1)
294 data['severityString'] = zem.getSeverityString(severity)
295 if action(context, data, False):
296 addcmd = self.addstate % (evid, userid, context.getId())
297 self.execute(addcmd)
298
299
300 historyFields = [("h.%s" % f) for f in fields]
301 historyFields = ','.join(historyFields)
302 q = self.clearsel % (historyFields, userid, context.getId())
303 for result in self.query(q):
304 evid = result[-1]
305 data = dict(zip(fields, map(zem.convert, fields, result[:-1])))
306
307
308
309 try:
310 details = dict( zem.getEventDetailFromStatusOrHistory(evid).getEventDetails() )
311 data.update( details )
312 except ZenEventNotFound:
313 pass
314
315
316 cfields = [('clear.%s' % x) for x in fields]
317 q = self.clearEventSelect % (",".join(cfields), evid)
318
319
320 cfields = [('clear%s' % _capitalize(x)) for x in fields]
321
322
323 data.update({}.fromkeys(cfields, ""))
324
325
326 for values in self.query(q):
327 values = map(zem.convert, fields, values)
328 data.update(dict(zip(cfields, values)))
329
330 self.log.debug( "Executing clear action with data %s", data )
331
332
333
334
335 if data.get('clearid', None) and not data.get('clearEvid', None):
336 continue
337
338 data['clearOrEventSummary'] = (
339 data['clearSummary'] or data['summary'])
340
341
342 device = self.dmd.Devices.findDevice(data.get('device', None))
343 data['eventUrl'] = self.getEventUrl(evid, device)
344 data['undeleteUrl'] = self.getUndeleteUrl(evid, device)
345 severity = data.get('severity', -1)
346 data['severityString'] = zem.getSeverityString(severity)
347 delcmd = self.clearstate % (evid, userid, context.getId())
348 if getattr(context, 'sendClear', True):
349 if action(context, data, True):
350 self.execute(delcmd)
351 else:
352 self.execute(delcmd)
353
354
355 - def maintenance(self, zem):
356 """Run stored procedures that maintain the events database.
357 """
358 sql = 'call age_events(%s, %s);' % (
359 zem.eventAgingHours, zem.eventAgingSeverity)
360 try:
361 self.execute(sql)
362 except ProgrammingError:
363 self.log.exception("problem with proc: '%s'" % sql)
364
365
367 """
368 Once per day delete events from history table.
369 If force then run the deletion statement regardless of when it was
370 last run (the deletion will still not run if the historyMaxAgeDays
371 setting in the event manager is not greater than zero.)
372 If deferred then we are running in a twisted reactor. Run the
373 deletion script in a non-blocking manner (if it is to be run) and
374 return a deferred (if the deletion script is run.)
375 In all cases return None if the deletion script is not run.
376 """
377 import datetime
378 import os
379 import twisted.internet.utils
380 import Products.ZenUtils.Utils as Utils
381 import transaction
382 import subprocess
383
384 def onSuccess(unused, startTime):
385 self.log.info('Done deleting historical events in %.2f seconds' %
386 (time.time() - startTime))
387 return None
388 def onError(error, startTime):
389 self.log.error('Error deleting historical events after '
390 '%s seconds: %s' % (time.time()-startTime,
391 error))
392 return None
393
394
395
396 d = None
397
398
399
400
401 try:
402 maxDays = int(self.dmd.ZenEventManager.historyMaxAgeDays)
403 except ValueError:
404 maxDays = 0
405 if maxDays > 0:
406
407
408 lastRun = getattr(self.dmd,
409 'lastDeleteHistoricalEvents_datetime', None)
410
411
412
413
414 lastAge = getattr(self.dmd,
415 'lastDeleteHistoricalEvents_days', None)
416 now = datetime.datetime.now()
417 if not lastRun \
418 or now - lastRun > datetime.timedelta(1) \
419 or lastAge != maxDays \
420 or force:
421 self.log.info('Deleting historical events older than %s days' %
422 maxDays)
423 startTime = time.time()
424 cmd = Utils.zenPath('Products', 'ZenUtils',
425 'ZenDeleteHistory.py')
426 args = ['--numDays=%s' % maxDays]
427 if deferred:
428
429 d = twisted.internet.utils.getProcessOutput(
430 cmd, args, os.environ, errortoo=True)
431 d.addCallback(onSuccess, startTime)
432 d.addErrback(onError, startTime)
433 else:
434
435 proc = subprocess.Popen(
436 [cmd]+args, stdout=subprocess.PIPE,
437 stderr=subprocess.STDOUT, env=os.environ)
438
439
440 output, _ = proc.communicate()
441 code = proc.wait()
442 if code:
443 onError(output, startTime)
444 else:
445 onSuccess(output, startTime)
446
447 self.dmd.lastDeleteHistoricalEvents_datetime = now
448 self.dmd.lastDeleteHistoricalEvents_days = maxDays
449 transaction.commit()
450 return d
451
452
454 """Create events for failed heartbeats.
455 """
456
457 q = ("SELECT device, component "
458 "FROM status WHERE eventClass = '%s'" % Status_Heartbeat)
459 heartbeatState = Set(self.query(q))
460
461
462 sel = "SELECT device, component FROM heartbeat "
463 sel += "WHERE DATE_ADD(lastTime, INTERVAL timeout SECOND) <= NOW();"
464 for device, comp in self.query(sel):
465 self.sendEvent(
466 Event.Event(device=device, component=comp,
467 eventClass=Status_Heartbeat,
468 summary="%s %s heartbeat failure" % (device, comp),
469 severity=Event.Error))
470 heartbeatState.discard((device, comp))
471
472
473 for device, comp in heartbeatState:
474 self.sendEvent(
475 Event.Event(device=device, component=comp,
476 eventClass=Status_Heartbeat,
477 summary="%s %s heartbeat clear" % (device, comp),
478 severity=Event.Clear))
479
481 try:
482 command = cmd.command
483 if clear:
484 command = cmd.clearCommand
485 device = self.dmd.Devices.findDevice(data.get('device', ''))
486 component = None
487 if device:
488 componentName = data.get('component')
489 for c in device.getMonitoredComponents():
490 if c.id == componentName:
491 component = c
492 break
493 compiled = talesCompile('string:' + command)
494 environ = {'dev':device, 'component':component, 'evt':data }
495 res = compiled(getEngine().getContext(environ))
496 if isinstance(res, Exception):
497 raise res
498 prot = EventCommandProtocol(cmd, self)
499 self.log.info('Running %s' % res)
500 reactor.spawnProcess(prot, '/bin/sh',
501 ('/bin/sh', '-c', res),
502 env=None)
503 except Exception:
504 self.log.exception('Error running command %s', cmd.id)
505 return True
506
507
516
517
518 - def mainbody(self):
519 """main loop to run actions.
520 """
521 from twisted.internet.process import reapAllProcesses
522 reapAllProcesses()
523 zem = self.dmd.ZenEventManager
524 self.loadActionRules()
525 self.eventCommands(zem)
526 self.processRules(zem)
527 self.checkVersion(zem)
528 self.maintenance(zem)
529 self.deleteHistoricalEvents(deferred=self.options.cycle)
530 self.heartbeatEvents()
531
532
544
545
554
555
557 """Send event to the system.
558 """
559 self.dmd.ZenEventManager.sendEvent(evt)
560
561
569
570
578
594
606
607 - def sendPage(self, action, data, clear = None):
608 """Send and event to a pager. Return True if we think page was sent,
609 False otherwise.
610 """
611 fmt, body = self.format(action, data, clear)
612 recipients = action.getAddresses()
613 if not recipients:
614 self.log.warning('failed to page %s on rule %s: %s',
615 action.getUser().id, action.id,
616 'Unspecified address.')
617 return True
618
619 result = False
620 for recipient in recipients:
621 success, errorMsg = Utils.sendPage(recipient,
622 fmt,
623 self.dmd.pageCommand)
624 if success:
625 self.log.info('sent page to %s: %s', recipient, fmt)
626
627 result = result or success
628 else:
629 self.log.info('failed to send page to %s: %s %s',
630 recipient,
631 fmt,
632 errorMsg)
633 return result
634
635
636
637 - def sendEmail(self, action, data, clear = None):
638 """Send an event to an email address.
639 Return True if we think the email was sent, False otherwise.
640 """
641 from email.MIMEText import MIMEText
642 from email.MIMEMultipart import MIMEMultipart
643 addr = action.getAddresses()
644 if not addr:
645 self.log.warning('failed to email %s on rule %s: %s',
646 action.getUser().id, action.id, 'Unspecified address.')
647 return True
648
649 fmt, htmlbody = self.format(action, data, clear)
650 htmlbody = htmlbody.replace('\n','<br/>\n')
651 body = self.stripTags(htmlbody)
652 plaintext = MIMEText(body)
653
654 emsg = None
655 if action.plainText:
656 emsg = plaintext
657 else:
658 emsg = MIMEMultipart('related')
659 emsgAlternative = MIMEMultipart('alternative')
660 emsg.attach( emsgAlternative )
661 html = MIMEText(htmlbody)
662 html.set_type('text/html')
663 emsgAlternative.attach(plaintext)
664 emsgAlternative.attach(html)
665
666 emsg['Subject'] = fmt
667 emsg['From'] = self.dmd.getEmailFrom()
668 emsg['To'] = ', '.join(addr)
669 emsg['Date'] = formatdate(None, True)
670 result, errorMsg = Utils.sendEmail(emsg, self.dmd.smtpHost,
671 self.dmd.smtpPort, self.dmd.smtpUseTLS, self.dmd.smtpUser,
672 self.dmd.smtpPass)
673 if result:
674 self.log.info("rule '%s' sent email:%s to:%s",
675 action.id, fmt, addr)
676 else:
677 self.log.info("rule '%s' failed to send email to %s: %s %s",
678 action.id, ','.join(addr), fmt, errorMsg)
679 return result
680
681
683 ZCmdBase.buildOptions(self)
684 self.parser.add_option('--cycletime',
685 dest='cycletime', default=60, type="int",
686 help="check events every cycletime seconds")
687 self.parser.add_option(
688 '--zopeurl', dest='zopeurl',
689 default='http://%s:%d' % (socket.getfqdn(), 8080),
690 help="http path to the root of the zope server")
691 self.parser.add_option("--monitor", dest="monitor",
692 default=DEFAULT_MONITOR,
693 help="Name of monitor instance to use for heartbeat "
694 " events. Default is %s." % DEFAULT_MONITOR)
695
696
697 - def sigTerm(self, signum=None, frame=None):
698 'controlled shutdown of main loop on interrupt'
699 try:
700 ZCmdBase.sigTerm(self, signum, frame)
701 except SystemExit:
702 reactor.stop()
703
704 if __name__ == "__main__":
705 za = ZenActions()
706 import logging
707 logging.getLogger('zen.Events').setLevel(20)
708 za.run()
709