1
2
3
4
5
6
7
8
9
10
11 import Globals
12 from traceback import format_exc
13 from email.MIMEText import MIMEText
14 from email.MIMEMultipart import MIMEMultipart
15 from email.Utils import formatdate
16 from twisted.internet import reactor, defer
17
18 from zenoss.protocols.queueschema import SchemaException
19 from zenoss.protocols import hydrateQueueMessage
20 from zenoss.protocols.interfaces import IQueueSchema
21 from Products.ZenCallHome.transport.cycler import CallHomeCycler
22 from Products.ZenCollector.utils.maintenance import MaintenanceCycle, maintenanceBuildOptions, QueueHeartbeatSender
23 from Products.ZenCollector.utils.workers import ProcessWorkers, workersBuildOptions, exec_worker
24
25 from Products.ZenEvents.Schedule import Schedule
26 from Products.ZenUtils.ZCmdBase import ZCmdBase
27 from Products.ZenUtils.Utils import getDefaultZopeUrl
28 from Products.ZenUtils.guid.interfaces import IGlobalIdentifier
29 from Products.ZenUtils.guid.guid import GUIDManager
30
31 from Products.ZenModel.NotificationSubscription import NotificationSubscriptionManager
32 from Products.ZenModel.actions import ActionMissingException, TargetableAction, ActionExecutionException
33 from Products.ZenModel.interfaces import IAction
34 from Products.ZenEvents.Event import Event
35 from Products.ZenMessaging.queuemessaging.QueueConsumer import QueueConsumer
36 from Products.ZenMessaging.queuemessaging.interfaces import IQueueConsumerTask
37 from Products.ZenEvents.ZenEventClasses import Warning as SEV_WARNING
38 from zope.component import getUtility, getUtilitiesFor
39 from zope.component.interfaces import ComponentLookupError
40 from zope.interface import implements
41
42
43 import logging
44 log = logging.getLogger("zen.zenactiond")
45
46
47 DEFAULT_MONITOR = "localhost"
55
57 self.dmd._p_jar.sync()
58 return self.notification_manager.getChildNodes()
59
61 """
62 Given a signal, find which notifications match this signal. In order to
63 match, a notification must be active (enabled and if has maintenance
64 windows, at least one must be active) and must be subscribed to the
65 signal.
66
67 @param signal: The signal for which to get subscribers.
68 @type signal: protobuf zep.Signal
69 """
70 active_matching_notifications = []
71 for notification in self.getNotifications():
72 if notification.isActive():
73 if self.notificationSubscribesToSignal(notification, signal):
74 active_matching_notifications.append(notification)
75 log.debug('Found matching notification: %s' % notification)
76 else:
77 log.debug('Notification "%s" does not subscribe to this signal.' % notification)
78 else:
79 log.debug('Notification "%s" is not active.' % notification)
80
81 return active_matching_notifications
82
84 """
85 Determine if the notification matches the specified signal.
86
87 @param notification: The notification to check
88 @type notification: NotificationSubscription
89 @param signal: The signal to match.
90 @type signal: zenoss.protocols.protbufs.zep_pb2.Signal
91
92 @rtype boolean
93 """
94 return signal.subscriber_uuid == IGlobalIdentifier(notification).getGUID()
95
97 implements(IQueueConsumerTask)
98
100 self.notificationDao = notificationDao
101
102
103 self.queueConsumer = None
104
105 self.schema = getUtility(IQueueSchema)
106 self.queue = self.schema.getQueue("$Signals")
107
113
115 """
116 Handles a queue message, can call "acknowledge" on the Queue Consumer
117 class when it is done with the message
118 """
119 log.debug('processing message.')
120
121 if message.content.body == self.queueConsumer.MARKER:
122 log.info("Received MARKER sentinel, exiting message loop")
123 self.queueConsumer.acknowledge(message)
124 return
125 try:
126 signal = hydrateQueueMessage(message, self.schema)
127 self.processSignal(signal)
128 log.debug('Done processing signal.')
129 except SchemaException:
130 log.error("Unable to hydrate protobuf %s. " % message.content.body)
131 self.queueConsumer.acknowledge(message)
132 except Exception, e:
133 log.exception(e)
134
135 log.error('Acknowledging broken message.')
136 self.queueConsumer.acknowledge(message)
137 else:
138 log.debug('Acknowledging message. (%s)' % signal.message)
139 self.queueConsumer.acknowledge(message)
140
142 matches = self.notificationDao.getSignalNotifications(signal)
143 log.debug('Found these matching notifications: %s' % matches)
144
145 trigger = self.notificationDao.guidManager.getObject(signal.trigger_uuid)
146 audit_event_trigger_info = "Event:'%s' Trigger:%s" % (
147 signal.event.occurrence[0].fingerprint,
148 trigger.id)
149 for notification in matches:
150 if signal.clear and not notification.send_clear:
151 log.debug('Ignoring clearing signal since send_clear is set to False on this subscription %s' % notification.id)
152 continue
153 try:
154 target = signal.subscriber_uuid or '<none>'
155 action = self.getAction(notification.action)
156 action.setupAction(notification.dmd)
157 if isinstance(action, TargetableAction):
158 target = ','.join(action.getTargets(notification))
159 action.execute(notification, signal)
160 except ActionMissingException, e:
161 log.error('Error finding action: {action}'.format(action = notification.action))
162 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % (
163 audit_event_trigger_info, notification.action, "FAIL", target, "<action not found>")
164 except ActionExecutionException, aee:
165 log.error('Error executing action: {action} on notification {notification}'.format(
166 action = notification.action,
167 notification = notification.id,
168 ))
169 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % (
170 audit_event_trigger_info, notification.action, "FAIL", target, aee)
171 except Exception, e:
172 msg = 'Error executing action {notification}'.format(
173 notification = notification.id,
174 )
175 log.exception(e)
176 log.error(msg)
177 traceback = format_exc()
178 event = Event(device="localhost",
179 eventClass="/App/Failed",
180 summary=msg,
181 message=traceback,
182 severity=SEV_WARNING, component="zenactiond")
183 self.dmd.ZenEventManager.sendEvent(event)
184 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % (
185 audit_event_trigger_info, notification.action, "FAIL", target, action.getInfo(notification))
186 else:
187
188 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % (
189 audit_event_trigger_info, notification.action, "SUCCESS", target, action.getInfo(notification))
190 log.info(audit_msg)
191 log.debug('Done processing signal. (%s)' % signal.message)
192
195 super(ZenActionD, self).__init__()
196 self._consumer = None
197 self._workers = ProcessWorkers(self.options.workers - 1,
198 exec_worker,
199 "zenactiond worker")
200 self._heartbeatSender = QueueHeartbeatSender('localhost',
201 'zenactiond',
202 self.options.maintenancecycle *3)
203
204 self._maintenanceCycle = MaintenanceCycle(self.options.maintenancecycle,
205 self._heartbeatSender)
206 self._callHomeCycler = CallHomeCycler(self.dmd)
207 self._schedule = Schedule(self.options, self.dmd)
208 self._schedule.sendEvent = self.dmd.ZenEventManager.sendEvent
209 self._schedule.monitor = self.options.monitor
210
212 super(ZenActionD, self).buildOptions()
213 maintenanceBuildOptions(self.parser)
214 workersBuildOptions(self.parser, 1)
215
216 default_max_commands = 10
217 self.parser.add_option('--maxcommands', dest="maxCommands", type="int", default=default_max_commands,
218 help='Max number of action commands to perform concurrently (default: %d)' % \
219 default_max_commands)
220 default_url = getDefaultZopeUrl()
221 self.parser.add_option('--zopeurl', dest='zopeurl', default=default_url,
222 help="http path to the root of the zope server (default: %s)" % default_url)
223 self.parser.add_option("--monitor", dest="monitor",
224 default=DEFAULT_MONITOR,
225 help="Name of monitor instance to use for heartbeat "
226 " events. Default is %s." % DEFAULT_MONITOR)
227 self.parser.add_option('--maintenance-window-cycletime',
228 dest='maintenceWindowCycletime', default=60, type="int",
229 help="How often to check to see if there are any maintenance windows to execute")
230
249
251 log.info('starting zenactiond consumer.')
252 reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
253 self._consumer.run()
254
255
256 @defer.inlineCallbacks
258 log.info("Shutting down...")
259 self._maintenanceCycle.stop()
260 self._workers.shutdown()
261 if self._consumer:
262 yield self._consumer.shutdown()
263
264
265 if __name__ == '__main__':
266 zad = ZenActionD()
267 zad.run()
268