Package Products :: Package ZenEvents :: Module zeneventd
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zeneventd

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2010, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  from twisted.internet import reactor 
 12  from twisted.internet import defer 
 13   
 14  import time 
 15  from datetime import datetime, timedelta 
 16   
 17  import Globals 
 18  from zope.component import getUtility, adapter 
 19  from zope.interface import implements 
 20  from zope.component.event import objectEventNotify 
 21   
 22  from Products.ZenCollector.utils.maintenance import MaintenanceCycle, maintenanceBuildOptions, QueueHeartbeatSender 
 23  from Products.ZenMessaging.queuemessaging.interfaces import IQueueConsumerTask 
 24  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 25  from Products.ZenUtils.guid import guid 
 26  from zenoss.protocols.interfaces import IAMQPConnectionInfo, IQueueSchema 
 27  from zenoss.protocols.protobufs.zep_pb2 import ZepRawEvent, Event, STATUS_DROPPED 
 28  from zenoss.protocols.jsonformat import from_dict, to_dict 
 29  from zenoss.protocols import hydrateQueueMessage 
 30  from Products.ZenMessaging.queuemessaging.QueueConsumer import QueueConsumer 
 31  from Products.ZenEvents.events2.processing import (Manager, EventPluginPipe, CheckInputPipe, IdentifierPipe, 
 32      AddDeviceContextAndTagsPipe, TransformAndReidentPipe, TransformPipe, UpdateDeviceContextAndTagsPipe, 
 33      AssignDefaultEventClassAndTagPipe, FingerprintPipe, SerializeContextPipe, ClearClassRefreshPipe, 
 34      EventContext, DropEvent, ProcessingException) 
 35  from Products.ZenEvents.interfaces import IPreEventPlugin, IPostEventPlugin 
 36  from Products.ZenEvents.daemonlifecycle import DaemonCreatedEvent, SigTermEvent, SigUsr1Event 
 37  from Products.ZenEvents.daemonlifecycle import DaemonStartRunEvent, BuildOptionsEvent 
 38   
 39  import logging 
 40  log = logging.getLogger("zen.eventd") 
 41   
 42  EXCHANGE_ZEP_ZEN_EVENTS = '$ZepZenEvents' 
 43  QUEUE_RAW_ZEN_EVENTS = '$RawZenEvents' 
44 45 -class EventPipelineProcessor(object):
46 47 SYNC_EVERY_EVENT = False 48
49 - def __init__(self, dmd):
50 self.dmd = dmd 51 self._manager = Manager(self.dmd) 52 self._pipes = ( 53 EventPluginPipe(self._manager, IPreEventPlugin, 'PreEventPluginPipe'), 54 CheckInputPipe(self._manager), 55 IdentifierPipe(self._manager), 56 AddDeviceContextAndTagsPipe(self._manager), 57 TransformAndReidentPipe(self._manager, 58 TransformPipe(self._manager), 59 [ 60 IdentifierPipe(self._manager), 61 UpdateDeviceContextAndTagsPipe(self._manager), 62 ]), 63 AssignDefaultEventClassAndTagPipe(self._manager), 64 FingerprintPipe(self._manager), 65 SerializeContextPipe(self._manager), 66 EventPluginPipe(self._manager, IPostEventPlugin, 'PostEventPluginPipe'), 67 ClearClassRefreshPipe(self._manager), 68 ) 69 70 if not self.SYNC_EVERY_EVENT: 71 # don't call sync() more often than 1 every 0.5 sec - helps throughput 72 # when receiving events in bursts 73 self.nextSync = datetime.now() 74 self.syncInterval = timedelta(0,0,500000)
75
76 - def processMessage(self, message):
77 """ 78 Handles a queue message, can call "acknowledge" on the Queue Consumer 79 class when it is done with the message 80 """ 81 82 if self.SYNC_EVERY_EVENT: 83 doSync = True 84 else: 85 # sync() db if it has been longer than self.syncInterval since the last time 86 currentTime = datetime.now() 87 doSync = currentTime > self.nextSync 88 self.nextSync = currentTime + self.syncInterval 89 90 if doSync: 91 self.dmd._p_jar.sync() 92 93 try: 94 retry = True 95 processed = False 96 while not processed: 97 try: 98 # extract event from message body 99 zepevent = ZepRawEvent() 100 zepevent.event.CopyFrom(message) 101 if log.isEnabledFor(logging.DEBUG): 102 log.debug("Received event: %s", to_dict(zepevent.event)) 103 104 eventContext = EventContext(log, zepevent) 105 106 for pipe in self._pipes: 107 eventContext = pipe(eventContext) 108 if log.isEnabledFor(logging.DEBUG): 109 log.debug('After pipe %s, event context is %s' % ( pipe.name, to_dict(eventContext.zepRawEvent) )) 110 if eventContext.event.status == STATUS_DROPPED: 111 raise DropEvent('Dropped by %s' % pipe, eventContext.event) 112 113 processed = True 114 115 except AttributeError: 116 # _manager throws Attribute errors if connection to zope is lost - reset 117 # and retry ONE time 118 if retry: 119 retry=False 120 log.debug("Resetting connection to catalogs") 121 self._manager.reset() 122 else: 123 raise 124 125 except DropEvent: 126 # we want these to propagate out 127 raise 128 except Exception as e: 129 log.info("Failed to process event, forward original raw event: %s", to_dict(zepevent.event)) 130 # Pipes and plugins may raise ProcessingException's for their own reasons - only log unexpected 131 # exceptions of other type (will insert stack trace in log) 132 if not isinstance(e, ProcessingException): 133 log.exception(e) 134 135 # construct wrapper event to report this event processing failure (including content of the 136 # original event) 137 origzepevent = ZepRawEvent() 138 origzepevent.event.CopyFrom(message) 139 failReportEvent = dict( 140 uuid = guid.generate(), 141 created_time = int(time.time()*1000), 142 fingerprint='|'.join(['zeneventd', 'processMessage', repr(e)]), 143 # Don't send the *same* event class or we trash and and crash endlessly 144 eventClass='/', 145 summary='Internal exception processing event: %r' % e, 146 message='Internal exception processing event: %r/%s' % (e, to_dict(origzepevent.event)), 147 severity=4, 148 ) 149 zepevent = ZepRawEvent() 150 zepevent.event.CopyFrom(from_dict(Event, failReportEvent)) 151 eventContext = EventContext(log, zepevent) 152 eventContext.eventProxy.device = 'zeneventd' 153 eventContext.eventProxy.component = 'processMessage' 154 155 if log.isEnabledFor(logging.DEBUG): 156 log.debug("Publishing event: %s", to_dict(eventContext.zepRawEvent)) 157 158 return eventContext.zepRawEvent
159
160 -class BaseQueueConsumerTask(object):
161 162 implements(IQueueConsumerTask) 163
164 - def __init__(self, processor):
165 self.processor = processor 166 self._queueSchema = getUtility(IQueueSchema) 167 self.dest_routing_key_prefix = 'zenoss.zenevent' 168 self._dest_exchange = self._queueSchema.getExchange(EXCHANGE_ZEP_ZEN_EVENTS)
169
170 - def _routing_key(self, event):
171 return (self.dest_routing_key_prefix + 172 event.event.event_class.replace('/', '.').lower())
173
174 -class TwistedQueueConsumerTask(BaseQueueConsumerTask):
175
176 - def __init__(self, processor):
177 BaseQueueConsumerTask.__init__(self, processor) 178 self.queue = self._queueSchema.getQueue(QUEUE_RAW_ZEN_EVENTS)
179 180 @defer.inlineCallbacks
181 - def processMessage(self, message):
182 try: 183 hydrated = hydrateQueueMessage(message, self._queueSchema) 184 except Exception as e: 185 log.error("Failed to hydrate raw event: %s", e) 186 yield self.queueConsumer.acknowledge(message) 187 else: 188 try: 189 zepRawEvent = self.processor.processMessage(hydrated) 190 if log.isEnabledFor(logging.DEBUG): 191 log.debug("Publishing event: %s", to_dict(zepRawEvent)) 192 yield self.queueConsumer.publishMessage(EXCHANGE_ZEP_ZEN_EVENTS, 193 self._routing_key(zepRawEvent), zepRawEvent, declareExchange=False) 194 yield self.queueConsumer.acknowledge(message) 195 except DropEvent as e: 196 if log.isEnabledFor(logging.DEBUG): 197 log.debug('%s - %s' % (e.message, to_dict(e.event))) 198 yield self.queueConsumer.acknowledge(message) 199 except ProcessingException as e: 200 log.error('%s - %s' % (e.message, to_dict(e.event))) 201 log.exception(e) 202 yield self.queueConsumer.reject(message) 203 except Exception as e: 204 log.exception(e) 205 yield self.queueConsumer.reject(message)
206
207 208 -class EventDTwistedWorker(object):
209 - def __init__(self, dmd):
210 super(EventDTwistedWorker, self).__init__() 211 self._amqpConnectionInfo = getUtility(IAMQPConnectionInfo) 212 self._queueSchema = getUtility(IQueueSchema) 213 self._consumer_task = TwistedQueueConsumerTask(EventPipelineProcessor(dmd)) 214 self._consumer = QueueConsumer(self._consumer_task, dmd)
215
216 - def run(self):
217 reactor.callWhenRunning(self._start) 218 reactor.run()
219
220 - def _start(self):
221 reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) 222 self._consumer.run()
223 224 @defer.inlineCallbacks
225 - def _shutdown(self):
226 if self._consumer: 227 yield self._consumer.shutdown()
228
229 -class ZenEventD(ZCmdBase):
230
231 - def __init__(self, *args, **kwargs):
232 super(ZenEventD, self).__init__(*args, **kwargs) 233 EventPipelineProcessor.SYNC_EVERY_EVENT = self.options.syncEveryEvent 234 self._heartbeatSender = QueueHeartbeatSender('localhost', 235 'zeneventd', 236 self.options.maintenancecycle *3) 237 self._maintenanceCycle = MaintenanceCycle(self.options.maintenancecycle, 238 self._heartbeatSender) 239 objectEventNotify(DaemonCreatedEvent(self))
240
241 - def sigTerm(self, signum=None, frame=None):
242 log.info("Shutting down...") 243 self._maintenanceCycle.stop() 244 objectEventNotify(SigTermEvent(self)) 245 super(ZenEventD, self).sigTerm(signum, frame)
246
247 - def run(self):
248 if self.options.daemon: 249 self._maintenanceCycle.start() 250 objectEventNotify(DaemonStartRunEvent(self))
251
252 - def sighandler_USR1(self, signum, frame):
253 super(ZenEventD, self).sighandler_USR1(signum, frame) 254 log.debug('sighandler_USR1 called %s' % signum) 255 objectEventNotify(SigUsr1Event(self, signum))
256
257 - def buildOptions(self):
258 super(ZenEventD, self).buildOptions() 259 maintenanceBuildOptions(self.parser) 260 self.parser.add_option('--synceveryevent', dest='syncEveryEvent', 261 action="store_true", default=False, 262 help='Force sync() before processing every event; default is to sync() no more often ' 263 'than once every 1/2 second.') 264 objectEventNotify(BuildOptionsEvent(self))
265
266 267 @adapter(ZenEventD, DaemonStartRunEvent) 268 -def onDaemonStartRun(daemon, event):
269 """ 270 Start up an EventDWorker. 271 """ 272 EventDTwistedWorker(daemon.dmd).run()
273 274 if __name__ == '__main__': 275 # explicit import of ZenEventD to activate enterprise extensions 276 from Products.ZenEvents.zeneventd import ZenEventD 277 zed = ZenEventD() 278 zed.run() 279