1
2
3
4
5
6
7
8
9
10
11 from Products.ZenEvents.events2.fields import EventField
12 from Products.ZenEvents.interfaces import IEventIdentifierPlugin
13 from Products.ZenModel.Device import Device
14 from Products.ZenModel.DeviceComponent import DeviceComponent
15 from Products.ZenModel.DataRoot import DataRoot
16 from Products.ZenEvents.events2.proxy import ZepRawEventProxy, EventProxy
17 from Products.ZenUtils.guid.interfaces import IGUIDManager, IGlobalIdentifier
18 from Products.ZenUtils.IpUtil import ipToDecimal, IpAddressError
19 from Products.Zuul.interfaces import ICatalogTool
20 from Products.AdvancedQuery import Eq, Or
21 from zope.component import getUtilitiesFor
22 from Acquisition import aq_chain
23 from Products.ZenEvents import ZenEventClasses
24 from itertools import ifilterfalse
25
26 from zenoss.protocols.jsonformat import to_dict
27 from zenoss.protocols.protobufs.model_pb2 import DEVICE, COMPONENT
28 from zenoss.protocols.protobufs.zep_pb2 import (
29 STATUS_NEW,
30 STATUS_CLOSED,
31 STATUS_DROPPED,
32 )
33
34 import logging
35
36 log = logging.getLogger("zen.eventd")
39 - def __init__(self, message, event=None):
42
44 """
45 Raised when an event should be dropped from the queue.
46 """
47 pass
48
50 """
51 A logging adapter that adds the event UUID to the log output.
52 """
53
55 msg = '[{event_uuid}] {msg}'.format(event_uuid=self.extra['event_uuid'],
56 msg=msg)
57 return msg, kwargs
58
60 """
61 Provides lookup access to processing pipes and performs caching.
62 """
63
64 ELEMENT_TYPE_MAP = {
65 DEVICE: Device,
66 COMPONENT: DeviceComponent,
67 }
68
72
74 self._guidManager = IGUIDManager(self.dmd)
75
76 self._devices = self.dmd._getOb('Devices')
77 self._networks = self.dmd._getOb('Networks')
78 self._events = self.dmd._getOb('Events')
79
80 self._catalogs = {
81 DEVICE: self._devices,
82 }
83
86
88 try:
89 return self._events.getOrganizer(eventClassName)
90 except KeyError:
91
92 return None
93
100
102 """
103 Get a Device/Component by UUID
104 """
105 if uuid:
106 return self._guidManager.getObject(uuid)
107
109 """
110 Helper method to deal with catalog brains which are out of date. If
111 the uuid is not set on the brain, we attempt to load it from the
112 object.
113 """
114 uuid = brain.uuid
115 return uuid if uuid else IGlobalIdentifier(brain.getObject()).getGUID()
116
118 """
119 Find element by ID but only cache UUID. This forces us to lookup elements
120 each time by UUID (pretty fast) which gives us a chance to see if the element
121 has been deleted.
122 """
123 cls = self.ELEMENT_TYPE_MAP.get(element_type_id)
124 if cls:
125 catalog = catalog or self._catalogs.get(element_type_id)
126 if catalog:
127 results = ICatalogTool(catalog).search(cls,
128 query=Or(Eq('id', id),
129 Eq('name', id)),
130 filterPermissions=False,
131 limit=1)
132
133 if results.total:
134 return self.uuidFromBrain(results.results.next())
135
153
157
159 """returns a tuple ([device brains], [devices]) searching manage IP and interface IPs. limit is the maximum
160 total number in both lists."""
161 cat = ICatalogTool(self._devices)
162
163 if ipAddress:
164 try:
165 ipAddress = str(ipToDecimal(ipAddress))
166 except IpAddressError:
167 ipAddress = None
168
169 if identifier and not ipAddress:
170 try:
171 ipAddress = str(ipToDecimal(identifier))
172 except IpAddressError:
173 pass
174
175 querySet = Or(Eq('id', identifier), Eq('name', identifier))
176
177 if ipAddress:
178 querySet.addSubquery(Eq('ipAddress', ipAddress))
179
180 device_brains = list(cat.search(types=Device, query=querySet, limit=limit, filterPermissions=False))
181
182 if not ipAddress or (limit is not None and len(device_brains) >= limit):
183 return device_brains, []
184
185 querySet = Eq('ipAddress', ipAddress)
186 component_limit = None if limit is None else limit - len(device_brains)
187 component_results = cat.search(types=DeviceComponent, query=querySet, limit=component_limit, filterPermissions=False)
188 devices = [component_brain.getObject().device() for component_brain in component_results]
189 return device_brains, devices
190
192 """
193 This will return the device's
194 @type identifier: string
195 @param identifier: The IP address or id of a device
196 @type ipaddress: string
197 @param ipaddress: The known ipaddress of the device
198 """
199 device_brains, devices = self._findDevices(identifier, ipAddress, limit=1)
200 if device_brains:
201 return self.uuidFromBrain(device_brains[0])
202 if devices:
203 return self.getElementUuid(devices[0])
204 return None
205
210
212 """
213 Looks up all the UUIDs in the tree path of an Organizer
214 """
215 uuids = set()
216 acquisition_chain = []
217 for n in aq_chain(node.primaryAq()):
218 if isinstance(n, DataRoot):
219 acquisition_chain.pop()
220 break
221 acquisition_chain.append(n)
222
223 if acquisition_chain:
224 for obj in filter(None, acquisition_chain):
225 try:
226 uuids.add(self.getElementUuid(obj))
227 except TypeError:
228 log.debug("Unable to get a uuid for %s " % obj)
229
230 return filter(None, uuids)
231
232
233 -class EventContext(object):
234 """
235 Maintains the event context while processing.
236 """
237
238 - def __init__(self, log, zepRawEvent):
239 self._zepRawEvent = zepRawEvent
240 self._event = self._zepRawEvent.event
241 self._eventProxy = ZepRawEventProxy(self._zepRawEvent)
242
243
244 self._deviceObject = None
245 self._componentObject = None
246 self.log = EventLoggerAdapter(log, {'event_uuid': self._event.uuid})
247
248 - def setDeviceObject(self, device):
249 self._deviceObject = device
250
252 self._eventProxy._refreshClearClasses()
253
254 @property
255 - def deviceObject(self):
256 return self._deviceObject
257
258 - def setComponentObject(self, component):
259 self._componentObject = component
260
261 @property
262 - def componentObject(self):
263 return self._componentObject
264
265 @property
266 - def zepRawEvent(self):
267 return self._zepRawEvent
268
269 @property
272
273 @property
274 - def eventProxy(self):
275 """
276 A EventProxy that wraps the event protobuf and makes it look like an old style event.
277 """
278 return self._eventProxy
279
281 """
282 An event context handler that is called in a chain.
283 """
284 dependencies = []
285
286 - def __init__(self, manager, name=None):
292
294 """
295 Called in a chain, must return modified eventContext.
296 """
297 raise NotImplementedError()
298
322
329
331 - def _resolveElement(self, evtProcessorManager, catalog, eventContext, type_id_field,
332 identifier_field, uuid_field):
333 """
334 Lookup an element by identifier or uuid and make sure both
335 identifier and uuid are set.
336 """
337 actor = eventContext.event.actor
338 if actor.HasField(type_id_field):
339 if not (actor.HasField(identifier_field) and actor.HasField(uuid_field)):
340 if actor.HasField(uuid_field):
341 uuid = getattr(actor, uuid_field, None)
342 element = evtProcessorManager.getElementByUuid(uuid)
343 if element:
344 eventContext.log.debug('Identified element %s by uuid %s',
345 element, uuid)
346 setattr(actor, identifier_field, element.id)
347 else:
348 eventContext.log.warning('Could not find element by uuid %s'
349 , uuid)
350
351 elif actor.HasField(identifier_field):
352 type_id = getattr(actor, type_id_field, None)
353 identifier = getattr(actor, identifier_field, None)
354 if type_id == DEVICE:
355 element_uuid = evtProcessorManager.findDeviceUuid(identifier,
356 eventContext.eventProxy.ipAddress)
357 else:
358 element_uuid = evtProcessorManager.getElementUuidById(catalog,
359 type_id,
360 identifier)
361
362 if element_uuid:
363 eventContext.log.debug('Identified element %s by id %s',
364 element_uuid, identifier)
365 setattr(actor, uuid_field, element_uuid)
366 else:
367 eventContext.log.debug(
368 'Could not find element type %s with id %s', type_id
369 , identifier)
370 else:
371 if log.isEnabledFor(logging.DEBUG):
372 type_id = getattr(actor, type_id_field, None)
373 identifier = getattr(actor, identifier_field, None)
374 uuid = getattr(actor, uuid_field, None)
375 eventContext.log.debug('Element %s already fully identified by %s/%s', type_id, identifier, uuid)
376
378 """
379 Update eventContext in place, updating/resolving identifiers and respective uuid's
380 """
381 eventContext.log.debug('Identifying event (%s)' % self.__class__.__name__)
382
383
384 self._resolveElement(
385 evtProcessorManager,
386 None,
387 eventContext,
388 EventField.Actor.ELEMENT_TYPE_ID,
389 EventField.Actor.ELEMENT_IDENTIFIER,
390 EventField.Actor.ELEMENT_UUID
391 )
392
393
394 actor = eventContext.event.actor
395 if actor.HasField(EventField.Actor.ELEMENT_UUID):
396 parent = evtProcessorManager.getElementByUuid(actor.element_uuid)
397 else:
398 parent = None
399 self._resolveElement(
400 evtProcessorManager,
401 parent,
402 eventContext,
403 EventField.Actor.ELEMENT_SUB_TYPE_ID,
404 EventField.Actor.ELEMENT_SUB_IDENTIFIER,
405 EventField.Actor.ELEMENT_SUB_UUID
406 )
407
409 """
410 Resolves element uuids and identifiers to make sure both are populated.
411 """
412
413 dependencies = [CheckInputPipe]
414
434
436 """
437 Adds device and component info to the context and event proxy.
438 """
439 dependencies = [IdentifierPipe]
440
441
442 DEVICE_DEVICECLASS_TAG_KEY = EventProxy.DEVICE_CLASS_DETAIL_KEY
443 DEVICE_LOCATION_TAG_KEY = EventProxy.DEVICE_LOCATION_DETAIL_KEY
444 DEVICE_SYSTEMS_TAG_KEY = EventProxy.DEVICE_SYSTEMS_DETAIL_KEY
445 DEVICE_GROUPS_TAG_KEY = EventProxy.DEVICE_GROUPS_DETAIL_KEY
446
447 DEVICE_TAGGERS = {
448 DEVICE_DEVICECLASS_TAG_KEY : (lambda device: device.deviceClass(), 'DeviceClass'),
449 DEVICE_LOCATION_TAG_KEY : (lambda device: device.location(), 'Location'),
450 DEVICE_SYSTEMS_TAG_KEY : (lambda device: device.systems(), 'Systems'),
451 DEVICE_GROUPS_TAG_KEY : (lambda device: device.groups(), 'DeviceGroups'),
452 }
453
455 if orgtypename not in orgs:
456 return
457
458 orgnames = orgs[orgtypename]
459 if orgnames:
460 if asDelimitedList:
461 detailOrgnames = orgnames
462 proxyOrgname = '|' + '|'.join(orgnames)
463 else:
464
465 detailOrgnames = orgnames[0]
466 proxyOrgname = orgnames
467 evtproxy.setReadOnly(orgtypename, proxyOrgname)
468 evtproxy.details[proxydetailkey] = detailOrgnames
469
483
485 evtproxy = eventContext.eventProxy
486 self._addDeviceOrganizerNames(orgs, 'Location', evtproxy, EventProxy.DEVICE_LOCATION_DETAIL_KEY)
487 self._addDeviceOrganizerNames(orgs, 'DeviceClass', evtproxy, EventProxy.DEVICE_CLASS_DETAIL_KEY)
488 self._addDeviceOrganizerNames(orgs, 'DeviceGroups', evtproxy, EventProxy.DEVICE_GROUPS_DETAIL_KEY, asDelimitedList=True)
489 self._addDeviceOrganizerNames(orgs, 'Systems', evtproxy, EventProxy.DEVICE_SYSTEMS_DETAIL_KEY, asDelimitedList=True)
490
506
508 actor = eventContext.event.actor
509
510
511 element_type_id, element = self._findTypeIdAndElement(eventContext, False)
512 if element:
513 actor.element_identifier = element.id
514 elementTitle = element.titleOrId()
515 if elementTitle != actor.element_identifier:
516 try:
517 actor.element_title = elementTitle
518 except ValueError:
519 actor.element_title = elementTitle.decode('utf8')
520
521 sub_element_type_id, sub_element = self._findTypeIdAndElement(eventContext, True)
522 if sub_element:
523 actor.element_sub_identifier = sub_element.id
524 subElementTitle = sub_element.titleOrId()
525 if subElementTitle != actor.element_sub_identifier:
526 try:
527 actor.element_sub_title = subElementTitle
528 except ValueError:
529 actor.element_sub_title = subElementTitle.decode('utf8')
530
531 device = eventContext.deviceObject
532 if device is None:
533 if element_type_id == DEVICE:
534 device = element
535 elif sub_element_type_id == DEVICE:
536 device = sub_element
537
538 if device:
539 eventContext.setDeviceObject(device)
540
541
542
543 deviceOrgs = {}
544 for tagType, orgProcessValues in self.DEVICE_TAGGERS.iteritems():
545 getOrgFunc,orgTypeName = orgProcessValues
546 objList = getOrgFunc(device)
547 if objList:
548 if not isinstance(objList, list):
549 objList = [objList]
550 uuids = set(sum((self._manager.getUuidsOfPath(obj) for obj in objList), []))
551 if uuids:
552 eventContext.eventProxy.tags.addAll(tagType, uuids)
553
554
555
556 deviceOrgs[orgTypeName] = [obj.getOrganizerName() for obj in objList]
557
558 self._addDeviceContext(eventContext, device)
559 self._addDeviceOrganizers(eventContext, deviceOrgs)
560
561 component = eventContext.componentObject
562 if component is None:
563 if element_type_id == COMPONENT:
564 component = element
565 elif sub_element_type_id == COMPONENT:
566 component = sub_element
567
568 if component:
569 eventContext.setComponentObject(component)
570
571 return eventContext
572
610
611 -class SerializeContextPipe(EventProcessorPipe):
612 """
613 Takes fields added to the eventProxy that couldn't directly be mapped out of the
614 proxy and applies them to the event protobuf.
615 """
616 dependencies = [AddDeviceContextAndTagsPipe]
617
618 - def __call__(self, eventContext):
619 eventContext.log.debug('Saving context back to event')
620 return eventContext
621
623 """
624 If the event class has not yet been set by the time this pipe is reached, set
625 it to a default value.
626 """
644
679
713
765
767 - def __init__(self, manager, pluginInterface, name=''):
771
773 for name, plugin in self._eventPlugins:
774 try:
775 plugin.apply(eventContext._eventProxy, self._manager.dmd)
776 except Exception as e:
777 eventContext.log.error(
778 'Event plugin %s encountered an error -- skipping.' % name)
779 eventContext.log.exception(e)
780 continue
781
782 return eventContext
783
788
790
792 self.exceptionClass = exceptionClass
793
795 raise self.exceptionClass('Testing pipe processing failure')
796