1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__='''zenactions
16
17 Turn events into notifications (pages, emails).
18
19 '''
20
21 import re
22 import socket
23 import time
24 from sets import Set
25 import Globals
26
27 from ZODB.POSException import POSError
28 from _mysql_exceptions import OperationalError, ProgrammingError
29
30 from Products.ZenUtils.ProcessQueue import ProcessQueue
31 from Products.ZenUtils.ZCmdBase import ZCmdBase
32 from Products.ZenUtils.ZenTales import talesCompile, getEngine
33 from Products.ZenEvents.Exceptions import ZenEventNotFound
34 from ZenEventClasses import App_Start, App_Stop, Status_Heartbeat
35 from ZenEventClasses import Cmd_Fail
36 import Event
37 from Schedule import Schedule
38 from UpdateCheck import UpdateCheck
39 from Products.ZenUtils import Utils
40 from twisted.internet import reactor
41 from twisted.internet.protocol import ProcessProtocol
42 from email.Utils import formatdate
43
44 DEFAULT_MONITOR = "localhost"
45
46 deviceFilterRegex = re.compile("Device\s(.*?)'(.*?)'", re.IGNORECASE)
47
48
50 return s[0:1].upper() + s[1:]
51
53
55 self.cmd = cmd
56 self.server = server
57 self.data = ''
58 self.error = ''
59
71
73 self.server.log.debug("Command finished: %s" % reason.getErrorMessage())
74 code = 1
75 try:
76 code = reason.value.exitCode
77 except AttributeError:
78 pass
79
80 if code == 0:
81 cmdData = self.data or "<command produced no output>"
82 self.server.log.debug("Command %s says: %s", self.cmd.id, cmdData)
83 self.server.sendEvent(Event.Event(
84 device=self.server.options.monitor,
85 eventClass=Cmd_Fail,
86 severity=Event.Clear,
87 component="zenactions",
88 eventKey=self.cmd.id,
89 summary="Command succeeded: %s: %s" % (
90 self.cmd.id, cmdData),
91 ))
92 else:
93 cmdError = self.error or "<command produced no output>"
94 self.server.log.error("Command %s says %s", self.cmd.id, cmdError)
95 self.server.sendEvent(Event.Event(
96 device=self.server.options.monitor,
97 eventClass=Cmd_Fail,
98 severity=Event.Error,
99 component="zenactions",
100 eventKey=self.cmd.id,
101 summary="Error running: %s: %s" % (
102 self.cmd.id, cmdError),
103 ))
104
107
110
111
113 """
114 Take actions based on events in the event manager.
115 Start off by sending emails and pages.
116 """
117
118 lastCommand = None
119
120 addstate = ("INSERT INTO alert_state "
121 "VALUES ('%s', '%s', '%s', NULL) "
122 "ON DUPLICATE KEY UPDATE lastSent = now()")
123
124
125 clearstate = ("DELETE FROM alert_state "
126 " WHERE evid='%s' "
127 " AND userid='%s' "
128 " AND rule='%s'")
129
130
131
132
133
134
135 newsel = ("SELECT %s, evid FROM status WHERE "
136 "%s AND evid NOT IN "
137 " (SELECT evid FROM alert_state "
138 " WHERE userid='%s' AND rule='%s' %s)")
139
140 clearsel = ("SELECT %s, h.evid FROM history h, alert_state a "
141 " WHERE h.evid=a.evid AND a.userid='%s' AND a.rule='%s'")
142
143 clearEventSelect = ("SELECT %s "
144 " FROM history clear, history event "
145 " WHERE clear.evid = event.clearid "
146 " AND event.evid = '%s'")
147
148
150 """Load the ActionRules into the system.
151 """
152 self.actions = []
153 for ar in self.dmd.ZenUsers.getAllActionRules():
154 if not ar.enabled: continue
155 userid = ar.getUser().id
156 self.actions.append(ar)
157 self.log.debug("action:%s for:%s loaded", ar.getId(), userid)
158
159
161 """
162 Get a cursor for the ZenEventManager connection. Execute statement
163 and call the callback with the cursor and number of row affected.
164 """
165 result = None
166 self.lastCommand = stmt
167 self.log.debug(stmt)
168 zem = self.dmd.ZenEventManager
169 conn = zem.connect()
170 try:
171 curs = conn.cursor()
172 rowsAffected = curs.execute(stmt)
173 result = callback(cursor=curs, rowsAffected=rowsAffected)
174 finally:
175 zem.close(conn)
176 return result
177
178
180 """
181 Execute stmt against ZenEventManager connection and return the number
182 of rows that were affected.
183 """
184 def callback(rowsAffected, **unused):
185 return rowsAffected
186 return self._getCursor(stmt, callback)
187
188
190 """
191 Execute stmt against ZenEventManager connection and fetch all results.
192 """
193 def callback(cursor, **unused):
194 return cursor.fetchall()
195 return self._getCursor(stmt, callback)
196
197
199 """
200 Execute stmt against ZenEventManager connection and return the cursor
201 description.
202 """
203 def callback(cursor, **unused):
204 return cursor.description
205 return self._getCursor(stmt, callback)
206
207
209 """
210 Returns the column names for the table using a ZenEventManager
211 connection.
212 """
213 description = self._describe("SELECT * FROM %s LIMIT 0" % table)
214 return [d[0] for d in description]
215
216
223
224
227
228
231
232
234 return "%s/manage_ackEvents?evids=%s&zenScreenName=viewEvents" % (
235 self.getBaseUrl(device), evid)
236
237
239 return "%s/manage_deleteEvents?evids=%s" % (
240 self.getBaseUrl(device), evid) + \
241 "&zenScreenName=viewHistoryEvents"
242
243
245 return "%s/manage_undeleteEvents?evids=%s" % (
246 self.getBaseUrl(device), evid) + \
247 "&zenScreenName=viewEvents"
248
249
251 """Run through all rules matching them against events.
252 """
253 for ar in self.actions:
254 try:
255 self.lastCommand = None
256
257 actfunc = getattr(self, "send"+ar.action.title())
258 self.processEvent(zem, ar, actfunc)
259 except (SystemExit, KeyboardInterrupt, OperationalError, POSError):
260 raise
261 except:
262 if self.lastCommand:
263 self.log.warning(self.lastCommand)
264 self.log.exception("action:%s",ar.getId())
265
267 self.updateCheck.check(self.dmd, zem)
268 import transaction
269 transaction.commit()
270
272 """This is somewhat janky but we only store the device id in the mysql
273 database but allow people to search based on the device "title".
274 This method resolves the disparity by searching the catalog first and using
275 those results.
276 """
277
278 if not 'device' in whereClause:
279 return whereClause
280 matches = deviceFilterRegex.findall(whereClause)
281
282
283 if not matches:
284 return whereClause
285
286
287 deviceids = []
288 include = 'IN'
289
290
291 for match in matches:
292 operator = match[0]
293 searchTerm = match[-1]
294 originalDeviceFilter = operator + "'" + searchTerm + "'"
295
296
297 if searchTerm.startswith('%'):
298 searchTerm = '.*' + searchTerm
299 if searchTerm.endswith('%'):
300 searchTerm = searchTerm + '.*'
301
302
303 deviceids = zem._getDeviceIdsMatching(searchTerm.replace("%", ""), globSearch=False)
304
305
306 if not deviceids:
307 continue
308
309 if not operator.lower().strip() in ('like', '='):
310 include = 'NOT IN'
311 deviceFilter = " %s ('%s') " % (include, "','".join(deviceids))
312 whereClause = whereClause.replace(originalDeviceFilter, deviceFilter)
313
314 return whereClause
315
316
318 userFields = context.getEventFields()
319 columnNames = self._columnNames('status')
320 fields = [f for f in userFields if f in columnNames]
321 userid = context.getUserid()
322
323 nwhere = context.where.strip() or '1 = 1'
324 nwhere = self.filterDeviceName(zem, nwhere)
325 if context.delay > 0:
326 nwhere += " and firstTime + %s < UNIX_TIMESTAMP()" % context.delay
327 awhere = ''
328 if context.repeatTime:
329 awhere += ' and DATE_ADD(lastSent, INTERVAL %d SECOND) > now() ' % (
330 context.repeatTime,)
331 q = self.newsel % (",".join(fields), nwhere, userid, context.getId(),
332 awhere)
333 for result in self.query(q):
334 evid = result[-1]
335 data = dict(zip(fields, map(zem.convert, fields, result[:-1])))
336
337
338
339 try:
340 details = dict( zem.getEventDetail(evid).getEventDetails() )
341 data.update( details )
342 except ZenEventNotFound:
343 pass
344
345 device = self.dmd.Devices.findDevice(data.get('device', None))
346 data['eventUrl'] = self.getEventUrl(evid, device)
347 if device:
348 data['eventsUrl'] = self.getEventsUrl(device)
349 else:
350 data['eventsUrl'] = 'n/a'
351 data['device'] = data.get('device', None) or ''
352 data['ackUrl'] = self.getAckUrl(evid, device)
353 data['deleteUrl'] = self.getDeleteUrl(evid, device)
354 severity = data.get('severity', -1)
355 data['severityString'] = zem.getSeverityString(severity)
356 if action(context, data, False):
357 addcmd = self.addstate % (evid, userid, context.getId())
358 self.execute(addcmd)
359
360
361 historyFields = [("h.%s" % f) for f in fields]
362 historyFields = ','.join(historyFields)
363 q = self.clearsel % (historyFields, userid, context.getId())
364 for result in self.query(q):
365 evid = result[-1]
366 data = dict(zip(fields, map(zem.convert, fields, result[:-1])))
367
368
369
370 try:
371 details = dict( zem.getEventDetailFromStatusOrHistory(evid).getEventDetails() )
372 data.update( details )
373 except ZenEventNotFound:
374 pass
375
376
377 cfields = [('clear.%s' % x) for x in fields]
378 q = self.clearEventSelect % (",".join(cfields), evid)
379
380
381 cfields = [('clear%s' % _capitalize(x)) for x in fields]
382
383
384 data.update({}.fromkeys(cfields, ""))
385
386
387 for values in self.query(q):
388 values = map(zem.convert, fields, values)
389 data.update(dict(zip(cfields, values)))
390
391
392
393
394 if data.get('clearid', None) and not data.get('clearEvid', None):
395 continue
396
397 data['clearOrEventSummary'] = (
398 data['clearSummary'] or data['summary'])
399
400
401
402
403 if not data.get('clearSummary', False) \
404 and data.get('ownerid', False):
405 data['clearSummary'] = data['ownerid']
406 data['clearFirstTime'] = data.get('stateChange', '')
407
408
409 device = self.dmd.Devices.findDevice(data.get('device', None))
410 data['eventUrl'] = self.getEventUrl(evid, device)
411 data['undeleteUrl'] = self.getUndeleteUrl(evid, device)
412 severity = data.get('severity', -1)
413 data['severityString'] = zem.getSeverityString(severity)
414 delcmd = self.clearstate % (evid, userid, context.getId())
415 if getattr(context, 'sendClear', True):
416 if action(context, data, True):
417 self.execute(delcmd)
418 else:
419 self.execute(delcmd)
420
421
422 - def maintenance(self, zem):
423 """Run stored procedures that maintain the events database.
424 """
425 sql = 'call age_events(%s, %s);' % (
426 zem.eventAgingHours, zem.eventAgingSeverity)
427 try:
428 self.execute(sql)
429 except ProgrammingError:
430 self.log.exception("problem with proc: '%s'" % sql)
431
432
434 """
435 Once per day delete events from history table.
436 If force then run the deletion statement regardless of when it was
437 last run (the deletion will still not run if the historyMaxAgeDays
438 setting in the event manager is not greater than zero.)
439 If deferred then we are running in a twisted reactor. Run the
440 deletion script in a non-blocking manner (if it is to be run) and
441 return a deferred (if the deletion script is run.)
442 In all cases return None if the deletion script is not run.
443 """
444 import datetime
445 import os
446 import twisted.internet.utils
447 import Products.ZenUtils.Utils as Utils
448 import transaction
449 import subprocess
450
451 def onSuccess(unused, startTime):
452 self.log.info('Done deleting historical events in %.2f seconds' %
453 (time.time() - startTime))
454 return None
455 def onError(error, startTime):
456 self.log.error('Error deleting historical events after '
457 '%s seconds: %s' % (time.time()-startTime,
458 error))
459 return None
460
461
462
463 d = None
464
465
466
467
468 try:
469 maxDays = int(self.dmd.ZenEventManager.historyMaxAgeDays)
470 except ValueError:
471 maxDays = 0
472 if maxDays > 0:
473
474
475 lastRun = getattr(self.dmd,
476 'lastDeleteHistoricalEvents_datetime', None)
477
478
479
480
481 lastAge = getattr(self.dmd,
482 'lastDeleteHistoricalEvents_days', None)
483 now = datetime.datetime.now()
484 if not lastRun \
485 or now - lastRun > datetime.timedelta(1) \
486 or lastAge != maxDays \
487 or force:
488 self.log.info('Deleting historical events older than %s days' %
489 maxDays)
490 startTime = time.time()
491 cmd = Utils.zenPath('Products', 'ZenUtils',
492 'ZenDeleteHistory.py')
493 args = ['--numDays=%s' % maxDays]
494 if deferred:
495
496 d = twisted.internet.utils.getProcessOutput(
497 cmd, args, os.environ, errortoo=True)
498 d.addCallback(onSuccess, startTime)
499 d.addErrback(onError, startTime)
500 else:
501
502 proc = subprocess.Popen(
503 [cmd]+args, stdout=subprocess.PIPE,
504 stderr=subprocess.STDOUT, env=os.environ)
505
506
507 output, _ = proc.communicate()
508 code = proc.wait()
509 if code:
510 onError(output, startTime)
511 else:
512 onSuccess(output, startTime)
513
514 self.dmd.lastDeleteHistoricalEvents_datetime = now
515 self.dmd.lastDeleteHistoricalEvents_days = maxDays
516 transaction.commit()
517 return d
518
519
537
539 """Create events for failed heartbeats.
540 """
541
542 q = ("SELECT monitor, component "
543 "FROM status WHERE eventClass = '%s'" % Status_Heartbeat)
544 heartbeatState = Set(self.query(q))
545
546
547
548
549 sel = "SELECT device, component FROM heartbeat "
550 sel += "WHERE DATE_ADD(lastTime, INTERVAL timeout SECOND) <= NOW();"
551 for monitor, comp in self.query(sel):
552 hostname = self.fetchMonitorHostname(monitor)
553 self.sendEvent(
554 Event.Event(device=hostname, component=comp,
555 eventClass=Status_Heartbeat,
556 summary="%s %s heartbeat failure" % (monitor, comp),
557 prodState=self.prodState,
558 severity=Event.Error))
559 heartbeatState.discard((monitor, comp))
560
561
562 for monitor, comp in heartbeatState:
563 hostname = self.fetchMonitorHostname(monitor)
564 self.sendEvent(
565 Event.Event(device=hostname, component=comp,
566 eventClass=Status_Heartbeat,
567 summary="%s %s heartbeat clear" % (monitor, comp),
568 prodState=self.prodState,
569 severity=Event.Clear))
570
572 try:
573 command = cmd.command
574 if clear:
575 command = cmd.clearCommand
576 if not command:
577 return True;
578 device = self.dmd.Devices.findDevice(data.get('device', ''))
579 component = None
580 if device:
581 componentName = data.get('component')
582 for c in device.getMonitoredComponents():
583 if c.id == componentName:
584 component = c
585 break
586 compiled = talesCompile('string:' + command)
587 environ = {'dev':device, 'component':component, 'evt':data }
588 res = compiled(getEngine().getContext(environ))
589 if isinstance(res, Exception):
590 raise res
591 prot = EventCommandProtocol(cmd, self)
592 if res:
593 self.log.info('Queueing %s' % res)
594 self._processQ.queueProcess('/bin/sh', ('/bin/sh', '-c', res),
595 env=None, processProtocol=prot,
596 timeout=cmd.defaultTimeout,
597 timeout_callback=prot.timedOut)
598 except Exception:
599 self.log.exception('Error running command %s', cmd.id)
600 return True
601
602
611
612
613 - def mainbody(self):
614 """main loop to run actions.
615 """
616 from twisted.internet.process import reapAllProcesses
617 reapAllProcesses()
618 zem = self.dmd.ZenEventManager
619 self.loadActionRules()
620 self.eventCommands(zem)
621 self.processRules(zem)
622 self.checkVersion(zem)
623 self.maintenance(zem)
624 self.deleteHistoricalEvents(deferred=self.options.cycle)
625 self.heartbeatEvents()
626
627
644 self.log.info("waiting for outstanding process to end")
645 d = self._processQ.stop()
646 d.addBoth(shutdown)
647
648
650 self.prodState = filter(lambda x: x.split(':')[0] == 'Production',
651 self.dmd.prodStateConversions)
652 import socket
653 self.daemonHostname = socket.getfqdn()
654 self.monitorToHost = {}
655 try:
656
657 self.prodState = int(self.prodState[0].split(':')[1])
658 except:
659 self.prodState = 1000
660
661 self._processQ = ProcessQueue(self.options.parallel)
662 def startup():
663 self._processQ.start()
664 self.schedule.start()
665 self.runCycle()
666 reactor.callWhenRunning(startup)
667 reactor.run()
668
669
671 """Send event to the system.
672 """
673 self.dmd.ZenEventManager.sendEvent(evt)
674
675
683
684
692
708
720
721 - def sendPage(self, action, data, clear = None):
722 """Send and event to a pager. Return True if we think page was sent,
723 False otherwise.
724 """
725 if self.options.cycle and not reactor.running:
726
727 return False
728
729 fmt, body = self.format(action, data, clear)
730 recipients = action.getAddresses()
731 if not recipients:
732 self.log.warning('failed to page %s on rule %s: %s',
733 action.getUser().id, action.id,
734 'Unspecified address.')
735 return True
736
737 result = False
738 for recipient in recipients:
739 success, errorMsg = Utils.sendPage(
740 recipient, fmt, self.dmd.pageCommand,
741 deferred=self.options.cycle)
742
743 if success:
744 self.log.info('sent page to %s: %s', recipient, fmt)
745
746 result = result or success
747 else:
748 self.log.info('failed to send page to %s: %s %s',
749 recipient,
750 fmt,
751 errorMsg)
752 return result
753
754
755
756 - def sendEmail(self, action, data, clear = None):
757 """Send an event to an email address.
758 Return True if we think the email was sent, False otherwise.
759 """
760 from email.MIMEText import MIMEText
761 from email.MIMEMultipart import MIMEMultipart
762 addr = action.getAddresses()
763 if not addr:
764 self.log.warning('failed to email %s on rule %s: %s',
765 action.getUser().id, action.id, 'Unspecified address.')
766 return True
767
768 fmt, htmlbody = self.format(action, data, clear)
769 htmlbody = htmlbody.replace('\n','<br/>\n')
770 body = self.stripTags(htmlbody)
771 plaintext = MIMEText(body)
772
773 emsg = None
774 if action.plainText:
775 emsg = plaintext
776 else:
777 emsg = MIMEMultipart('related')
778 emsgAlternative = MIMEMultipart('alternative')
779 emsg.attach( emsgAlternative )
780 html = MIMEText(htmlbody)
781 html.set_type('text/html')
782 emsgAlternative.attach(plaintext)
783 emsgAlternative.attach(html)
784
785 emsg['Subject'] = fmt
786 emsg['From'] = self.dmd.getEmailFrom()
787 emsg['To'] = ', '.join(addr)
788 emsg['Date'] = formatdate(None, True)
789 result, errorMsg = Utils.sendEmail(emsg, self.dmd.smtpHost,
790 self.dmd.smtpPort, self.dmd.smtpUseTLS, self.dmd.smtpUser,
791 self.dmd.smtpPass)
792 if result:
793 self.log.info("rule '%s' sent email:%s to:%s",
794 action.id, fmt, addr)
795 else:
796 self.log.info("rule '%s' failed to send email to %s: %s %s",
797 action.id, ','.join(addr), fmt, errorMsg)
798 return result
799
800
802 ZCmdBase.buildOptions(self)
803 self.parser.add_option('--cycletime',
804 dest='cycletime', default=60, type="int",
805 help="check events every cycletime seconds")
806 self.parser.add_option(
807 '--zopeurl', dest='zopeurl',
808 default='http://%s:%d' % (socket.getfqdn(), 8080),
809 help="http path to the root of the zope server")
810 self.parser.add_option("--monitor", dest="monitor",
811 default=DEFAULT_MONITOR,
812 help="Name of monitor instance to use for heartbeat "
813 " events. Default is %s." % DEFAULT_MONITOR)
814 self.parser.add_option("--parallel", dest="parallel",
815 default=10, type='int',
816 help="Number of event commands to run concurrently")
817
818
819 - def sigTerm(self, signum=None, frame=None):
820 'controlled shutdown of main loop on interrupt'
821 try:
822 ZCmdBase.sigTerm(self, signum, frame)
823 except SystemExit:
824 reactor.stop()
825
826
842
843
844 if __name__ == "__main__":
845 za = ZenActions()
846 import logging
847 logging.getLogger('zen.Events').setLevel(20)
848 za.run()
849