Package Products :: Package ZenEvents :: Module zenactiond
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zenactiond

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2010, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  import Globals 
 12  from traceback import format_exc 
 13  from email.MIMEText import MIMEText 
 14  from email.MIMEMultipart import MIMEMultipart 
 15  from email.Utils import formatdate 
 16  from twisted.internet import reactor, defer 
 17   
 18  from zenoss.protocols.queueschema import SchemaException 
 19  from zenoss.protocols import hydrateQueueMessage 
 20  from zenoss.protocols.interfaces import IQueueSchema 
 21  from Products.ZenCallHome.transport.cycler import CallHomeCycler 
 22  from Products.ZenCollector.utils.maintenance import MaintenanceCycle, maintenanceBuildOptions, QueueHeartbeatSender 
 23  from Products.ZenCollector.utils.workers import ProcessWorkers, workersBuildOptions, exec_worker 
 24   
 25  from Products.ZenEvents.Schedule import Schedule 
 26  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 27  from Products.ZenUtils.Utils import getDefaultZopeUrl 
 28  from Products.ZenUtils.guid.interfaces import IGlobalIdentifier 
 29  from Products.ZenUtils.guid.guid import GUIDManager 
 30   
 31  from Products.ZenModel.NotificationSubscription import NotificationSubscriptionManager 
 32  from Products.ZenModel.actions import ActionMissingException, TargetableAction, ActionExecutionException 
 33  from Products.ZenModel.interfaces import IAction 
 34  from Products.ZenEvents.Event import Event 
 35  from Products.ZenMessaging.queuemessaging.QueueConsumer import QueueConsumer 
 36  from Products.ZenMessaging.queuemessaging.interfaces import IQueueConsumerTask 
 37  from Products.ZenEvents.ZenEventClasses import Warning as SEV_WARNING 
 38  from zope.component import getUtility, getUtilitiesFor 
 39  from zope.component.interfaces import ComponentLookupError 
 40  from zope.interface import implements 
 41   
 42   
 43  import logging 
 44  log = logging.getLogger("zen.zenactiond") 
 45   
 46   
 47  DEFAULT_MONITOR = "localhost" 
48 49 50 -class NotificationDao(object):
51 - def __init__(self, dmd):
52 self.dmd = dmd 53 self.notification_manager = self.dmd.getDmdRoot(NotificationSubscriptionManager.root) 54 self.guidManager = GUIDManager(dmd)
55
56 - def getNotifications(self):
57 self.dmd._p_jar.sync() 58 return self.notification_manager.getChildNodes()
59
60 - def getSignalNotifications(self, signal):
61 """ 62 Given a signal, find which notifications match this signal. In order to 63 match, a notification must be active (enabled and if has maintenance 64 windows, at least one must be active) and must be subscribed to the 65 signal. 66 67 @param signal: The signal for which to get subscribers. 68 @type signal: protobuf zep.Signal 69 """ 70 active_matching_notifications = [] 71 for notification in self.getNotifications(): 72 if notification.isActive(): 73 if self.notificationSubscribesToSignal(notification, signal): 74 active_matching_notifications.append(notification) 75 log.debug('Found matching notification: %s' % notification) 76 else: 77 log.debug('Notification "%s" does not subscribe to this signal.' % notification) 78 else: 79 log.debug('Notification "%s" is not active.' % notification) 80 81 return active_matching_notifications
82
83 - def notificationSubscribesToSignal(self, notification, signal):
84 """ 85 Determine if the notification matches the specified signal. 86 87 @param notification: The notification to check 88 @type notification: NotificationSubscription 89 @param signal: The signal to match. 90 @type signal: zenoss.protocols.protbufs.zep_pb2.Signal 91 92 @rtype boolean 93 """ 94 return signal.subscriber_uuid == IGlobalIdentifier(notification).getGUID()
95
96 -class ProcessSignalTask(object):
97 implements(IQueueConsumerTask) 98
99 - def __init__(self, notificationDao):
100 self.notificationDao = notificationDao 101 102 # set by the constructor of queueConsumer 103 self.queueConsumer = None 104 105 self.schema = getUtility(IQueueSchema) 106 self.queue = self.schema.getQueue("$Signals")
107
108 - def getAction(self, action):
109 try: 110 return getUtility(IAction, action) 111 except ComponentLookupError, e: 112 raise ActionMissingException(action)
113
114 - def processMessage(self, message):
115 """ 116 Handles a queue message, can call "acknowledge" on the Queue Consumer 117 class when it is done with the message 118 """ 119 log.debug('processing message.') 120 121 if message.content.body == self.queueConsumer.MARKER: 122 log.info("Received MARKER sentinel, exiting message loop") 123 self.queueConsumer.acknowledge(message) 124 return 125 try: 126 signal = hydrateQueueMessage(message, self.schema) 127 self.processSignal(signal) 128 log.debug('Done processing signal.') 129 except SchemaException: 130 log.error("Unable to hydrate protobuf %s. " % message.content.body) 131 self.queueConsumer.acknowledge(message) 132 except Exception, e: 133 log.exception(e) 134 # FIXME: Send to an error queue instead of acknowledge. 135 log.error('Acknowledging broken message.') 136 self.queueConsumer.acknowledge(message) 137 else: 138 log.debug('Acknowledging message. (%s)' % signal.message) 139 self.queueConsumer.acknowledge(message)
140
141 - def processSignal(self, signal):
142 matches = self.notificationDao.getSignalNotifications(signal) 143 log.debug('Found these matching notifications: %s' % matches) 144 145 trigger = self.notificationDao.guidManager.getObject(signal.trigger_uuid) 146 audit_event_trigger_info = "Event:'%s' Trigger:%s" % ( 147 signal.event.occurrence[0].fingerprint, 148 trigger.id) 149 for notification in matches: 150 if signal.clear and not notification.send_clear: 151 log.debug('Ignoring clearing signal since send_clear is set to False on this subscription %s' % notification.id) 152 continue 153 try: 154 target = signal.subscriber_uuid or '<none>' 155 action = self.getAction(notification.action) 156 action.setupAction(notification.dmd) 157 if isinstance(action, TargetableAction): 158 target = ','.join(action.getTargets(notification)) 159 action.execute(notification, signal) 160 except ActionMissingException, e: 161 log.error('Error finding action: {action}'.format(action = notification.action)) 162 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % ( 163 audit_event_trigger_info, notification.action, "FAIL", target, "<action not found>") 164 except ActionExecutionException, aee: 165 log.error('Error executing action: {action} on notification {notification}'.format( 166 action = notification.action, 167 notification = notification.id, 168 )) 169 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % ( 170 audit_event_trigger_info, notification.action, "FAIL", target, aee) 171 except Exception, e: 172 msg = 'Error executing action {notification}'.format( 173 notification = notification.id, 174 ) 175 log.exception(e) 176 log.error(msg) 177 traceback = format_exc() 178 event = Event(device="localhost", 179 eventClass="/App/Failed", 180 summary=msg, 181 message=traceback, 182 severity=SEV_WARNING, component="zenactiond") 183 self.dmd.ZenEventManager.sendEvent(event) 184 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % ( 185 audit_event_trigger_info, notification.action, "FAIL", target, action.getInfo(notification)) 186 else: 187 # audit trail of performed actions 188 audit_msg = "%s Action:%s Status:%s Target:%s Info:%s" % ( 189 audit_event_trigger_info, notification.action, "SUCCESS", target, action.getInfo(notification)) 190 log.info(audit_msg) 191 log.debug('Done processing signal. (%s)' % signal.message)
192
193 -class ZenActionD(ZCmdBase):
194 - def __init__(self):
195 super(ZenActionD, self).__init__() 196 self._consumer = None 197 self._workers = ProcessWorkers(self.options.workers - 1, 198 exec_worker, 199 "zenactiond worker") 200 self._heartbeatSender = QueueHeartbeatSender('localhost', 201 'zenactiond', 202 self.options.maintenancecycle *3) 203 204 self._maintenanceCycle = MaintenanceCycle(self.options.maintenancecycle, 205 self._heartbeatSender) 206 self._callHomeCycler = CallHomeCycler(self.dmd) 207 self._schedule = Schedule(self.options, self.dmd) 208 self._schedule.sendEvent = self.dmd.ZenEventManager.sendEvent 209 self._schedule.monitor = self.options.monitor
210
211 - def buildOptions(self):
212 super(ZenActionD, self).buildOptions() 213 maintenanceBuildOptions(self.parser) 214 workersBuildOptions(self.parser, 1) 215 216 default_max_commands = 10 217 self.parser.add_option('--maxcommands', dest="maxCommands", type="int", default=default_max_commands, 218 help='Max number of action commands to perform concurrently (default: %d)' % \ 219 default_max_commands) 220 default_url = getDefaultZopeUrl() 221 self.parser.add_option('--zopeurl', dest='zopeurl', default=default_url, 222 help="http path to the root of the zope server (default: %s)" % default_url) 223 self.parser.add_option("--monitor", dest="monitor", 224 default=DEFAULT_MONITOR, 225 help="Name of monitor instance to use for heartbeat " 226 " events. Default is %s." % DEFAULT_MONITOR) 227 self.parser.add_option('--maintenance-window-cycletime', 228 dest='maintenceWindowCycletime', default=60, type="int", 229 help="How often to check to see if there are any maintenance windows to execute")
230
231 - def run(self):
232 # Configure all actions with the command-line options 233 options_dict = dict(vars(self.options)) 234 for name, action in getUtilitiesFor(IAction): 235 action.configure(options_dict) 236 237 task = ProcessSignalTask(NotificationDao(self.dmd)) 238 239 self._callHomeCycler.start() 240 self._schedule.start() # maintenance windows 241 if self.options.daemon: 242 self._maintenanceCycle.start() # heartbeats, etc. 243 if self.options.daemon and self.options.workers > 1: 244 self._workers.startWorkers() 245 246 self._consumer = QueueConsumer(task, self.dmd) 247 reactor.callWhenRunning(self._start) 248 reactor.run()
249
250 - def _start(self):
251 log.info('starting zenactiond consumer.') 252 reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) 253 self._consumer.run()
254 255 256 @defer.inlineCallbacks
257 - def _shutdown(self, *ignored):
258 log.info("Shutting down...") 259 self._maintenanceCycle.stop() 260 self._workers.shutdown() 261 if self._consumer: 262 yield self._consumer.shutdown()
263 264 265 if __name__ == '__main__': 266 zad = ZenActionD() 267 zad.run() 268