1
2
3
4
5
6
7
8
9
10
11 from twisted.internet import reactor
12 from twisted.internet import defer
13
14 import time
15 from datetime import datetime, timedelta
16
17 import Globals
18 from zope.component import getUtility, adapter
19 from zope.interface import implements
20 from zope.component.event import objectEventNotify
21
22 from Products.ZenCollector.utils.maintenance import MaintenanceCycle, maintenanceBuildOptions, QueueHeartbeatSender
23 from Products.ZenMessaging.queuemessaging.interfaces import IQueueConsumerTask
24 from Products.ZenUtils.ZCmdBase import ZCmdBase
25 from Products.ZenUtils.guid import guid
26 from zenoss.protocols.interfaces import IAMQPConnectionInfo, IQueueSchema
27 from zenoss.protocols.protobufs.zep_pb2 import ZepRawEvent, Event, STATUS_DROPPED
28 from zenoss.protocols.jsonformat import from_dict, to_dict
29 from zenoss.protocols import hydrateQueueMessage
30 from Products.ZenMessaging.queuemessaging.QueueConsumer import QueueConsumer
31 from Products.ZenEvents.events2.processing import (Manager, EventPluginPipe, CheckInputPipe, IdentifierPipe,
32 AddDeviceContextAndTagsPipe, TransformAndReidentPipe, TransformPipe, UpdateDeviceContextAndTagsPipe,
33 AssignDefaultEventClassAndTagPipe, FingerprintPipe, SerializeContextPipe, ClearClassRefreshPipe,
34 EventContext, DropEvent, ProcessingException)
35 from Products.ZenEvents.interfaces import IPreEventPlugin, IPostEventPlugin
36 from Products.ZenEvents.daemonlifecycle import DaemonCreatedEvent, SigTermEvent, SigUsr1Event
37 from Products.ZenEvents.daemonlifecycle import DaemonStartRunEvent, BuildOptionsEvent
38
39 import logging
40 log = logging.getLogger("zen.eventd")
41
42 EXCHANGE_ZEP_ZEN_EVENTS = '$ZepZenEvents'
43 QUEUE_RAW_ZEN_EVENTS = '$RawZenEvents'
46
47 SYNC_EVERY_EVENT = False
48
50 self.dmd = dmd
51 self._manager = Manager(self.dmd)
52 self._pipes = (
53 EventPluginPipe(self._manager, IPreEventPlugin, 'PreEventPluginPipe'),
54 CheckInputPipe(self._manager),
55 IdentifierPipe(self._manager),
56 AddDeviceContextAndTagsPipe(self._manager),
57 TransformAndReidentPipe(self._manager,
58 TransformPipe(self._manager),
59 [
60 IdentifierPipe(self._manager),
61 UpdateDeviceContextAndTagsPipe(self._manager),
62 ]),
63 AssignDefaultEventClassAndTagPipe(self._manager),
64 FingerprintPipe(self._manager),
65 SerializeContextPipe(self._manager),
66 EventPluginPipe(self._manager, IPostEventPlugin, 'PostEventPluginPipe'),
67 ClearClassRefreshPipe(self._manager),
68 )
69
70 if not self.SYNC_EVERY_EVENT:
71
72
73 self.nextSync = datetime.now()
74 self.syncInterval = timedelta(0,0,500000)
75
77 """
78 Handles a queue message, can call "acknowledge" on the Queue Consumer
79 class when it is done with the message
80 """
81
82 if self.SYNC_EVERY_EVENT:
83 doSync = True
84 else:
85
86 currentTime = datetime.now()
87 doSync = currentTime > self.nextSync
88 self.nextSync = currentTime + self.syncInterval
89
90 if doSync:
91 self.dmd._p_jar.sync()
92
93 try:
94 retry = True
95 processed = False
96 while not processed:
97 try:
98
99 zepevent = ZepRawEvent()
100 zepevent.event.CopyFrom(message)
101 if log.isEnabledFor(logging.DEBUG):
102 log.debug("Received event: %s", to_dict(zepevent.event))
103
104 eventContext = EventContext(log, zepevent)
105
106 for pipe in self._pipes:
107 eventContext = pipe(eventContext)
108 if log.isEnabledFor(logging.DEBUG):
109 log.debug('After pipe %s, event context is %s' % ( pipe.name, to_dict(eventContext.zepRawEvent) ))
110 if eventContext.event.status == STATUS_DROPPED:
111 raise DropEvent('Dropped by %s' % pipe, eventContext.event)
112
113 processed = True
114
115 except AttributeError:
116
117
118 if retry:
119 retry=False
120 log.debug("Resetting connection to catalogs")
121 self._manager.reset()
122 else:
123 raise
124
125 except DropEvent:
126
127 raise
128 except Exception as e:
129 log.info("Failed to process event, forward original raw event: %s", to_dict(zepevent.event))
130
131
132 if not isinstance(e, ProcessingException):
133 log.exception(e)
134
135
136
137 origzepevent = ZepRawEvent()
138 origzepevent.event.CopyFrom(message)
139 failReportEvent = dict(
140 uuid = guid.generate(),
141 created_time = int(time.time()*1000),
142 fingerprint='|'.join(['zeneventd', 'processMessage', repr(e)]),
143
144 eventClass='/',
145 summary='Internal exception processing event: %r' % e,
146 message='Internal exception processing event: %r/%s' % (e, to_dict(origzepevent.event)),
147 severity=4,
148 )
149 zepevent = ZepRawEvent()
150 zepevent.event.CopyFrom(from_dict(Event, failReportEvent))
151 eventContext = EventContext(log, zepevent)
152 eventContext.eventProxy.device = 'zeneventd'
153 eventContext.eventProxy.component = 'processMessage'
154
155 if log.isEnabledFor(logging.DEBUG):
156 log.debug("Publishing event: %s", to_dict(eventContext.zepRawEvent))
157
158 return eventContext.zepRawEvent
159
161
162 implements(IQueueConsumerTask)
163
165 self.processor = processor
166 self._queueSchema = getUtility(IQueueSchema)
167 self.dest_routing_key_prefix = 'zenoss.zenevent'
168 self._dest_exchange = self._queueSchema.getExchange(EXCHANGE_ZEP_ZEN_EVENTS)
169
171 return (self.dest_routing_key_prefix +
172 event.event.event_class.replace('/', '.').lower())
173
175
179
180 @defer.inlineCallbacks
206
215
217 reactor.callWhenRunning(self._start)
218 reactor.run()
219
221 reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
222 self._consumer.run()
223
224 @defer.inlineCallbacks
226 if self._consumer:
227 yield self._consumer.shutdown()
228
230
232 super(ZenEventD, self).__init__(*args, **kwargs)
233 EventPipelineProcessor.SYNC_EVERY_EVENT = self.options.syncEveryEvent
234 self._heartbeatSender = QueueHeartbeatSender('localhost',
235 'zeneventd',
236 self.options.maintenancecycle *3)
237 self._maintenanceCycle = MaintenanceCycle(self.options.maintenancecycle,
238 self._heartbeatSender)
239 objectEventNotify(DaemonCreatedEvent(self))
240
241 - def sigTerm(self, signum=None, frame=None):
246
251
256
258 super(ZenEventD, self).buildOptions()
259 maintenanceBuildOptions(self.parser)
260 self.parser.add_option('--synceveryevent', dest='syncEveryEvent',
261 action="store_true", default=False,
262 help='Force sync() before processing every event; default is to sync() no more often '
263 'than once every 1/2 second.')
264 objectEventNotify(BuildOptionsEvent(self))
265
273
274 if __name__ == '__main__':
275
276 from Products.ZenEvents.zeneventd import ZenEventD
277 zed = ZenEventD()
278 zed.run()
279