Package Products :: Package ZenEvents :: Package events2 :: Module processing
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.events2.processing

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2010, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  from Products.ZenEvents.events2.fields import EventField 
 12  from Products.ZenEvents.interfaces import IEventIdentifierPlugin 
 13  from Products.ZenModel.Device import Device 
 14  from Products.ZenModel.DeviceComponent import DeviceComponent 
 15  from Products.ZenModel.DataRoot import DataRoot 
 16  from Products.ZenEvents.events2.proxy import ZepRawEventProxy, EventProxy 
 17  from Products.ZenUtils.guid.interfaces import IGUIDManager, IGlobalIdentifier 
 18  from Products.ZenUtils.IpUtil import ipToDecimal, IpAddressError 
 19  from Products.Zuul.interfaces import ICatalogTool 
 20  from Products.AdvancedQuery import Eq, Or 
 21  from zope.component import getUtilitiesFor 
 22  from Acquisition import aq_chain 
 23  from Products.ZenEvents import ZenEventClasses 
 24  from itertools import ifilterfalse 
 25   
 26  from zenoss.protocols.jsonformat import to_dict 
 27  from zenoss.protocols.protobufs.model_pb2 import DEVICE, COMPONENT 
 28  from zenoss.protocols.protobufs.zep_pb2 import ( 
 29      STATUS_NEW, 
 30      STATUS_CLOSED, 
 31      STATUS_DROPPED, 
 32      ) 
 33   
 34  import logging 
 35   
 36  log = logging.getLogger("zen.eventd") 
37 38 -class ProcessingException(Exception):
39 - def __init__(self, message, event=None):
40 super(ProcessingException, self).__init__(message) 41 self.event = event
42
43 -class DropEvent(ProcessingException):
44 """ 45 Raised when an event should be dropped from the queue. 46 """ 47 pass
48
49 -class EventLoggerAdapter(logging.LoggerAdapter):
50 """ 51 A logging adapter that adds the event UUID to the log output. 52 """ 53
54 - def process(self, msg, kwargs):
55 msg = '[{event_uuid}] {msg}'.format(event_uuid=self.extra['event_uuid'], 56 msg=msg) 57 return msg, kwargs
58
59 -class Manager(object):
60 """ 61 Provides lookup access to processing pipes and performs caching. 62 """ 63 64 ELEMENT_TYPE_MAP = { 65 DEVICE: Device, 66 COMPONENT: DeviceComponent, 67 } 68
69 - def __init__(self, dmd):
70 self.dmd = dmd 71 self._initCatalogs()
72
73 - def _initCatalogs(self):
74 self._guidManager = IGUIDManager(self.dmd) 75 76 self._devices = self.dmd._getOb('Devices') 77 self._networks = self.dmd._getOb('Networks') 78 self._events = self.dmd._getOb('Events') 79 80 self._catalogs = { 81 DEVICE: self._devices, 82 }
83
84 - def reset(self):
85 self._initCatalogs()
86
87 - def getEventClassOrganizer(self, eventClassName):
88 try: 89 return self._events.getOrganizer(eventClassName) 90 except KeyError: 91 # Unknown organizer 92 return None
93
94 - def lookupEventClass(self, eventContext):
95 """ 96 Find a Device's EventClass 97 """ 98 return self._events.lookup(eventContext.eventProxy, 99 eventContext.deviceObject)
100
101 - def getElementByUuid(self, uuid):
102 """ 103 Get a Device/Component by UUID 104 """ 105 if uuid: 106 return self._guidManager.getObject(uuid)
107
108 - def uuidFromBrain(self, brain):
109 """ 110 Helper method to deal with catalog brains which are out of date. If 111 the uuid is not set on the brain, we attempt to load it from the 112 object. 113 """ 114 uuid = brain.uuid 115 return uuid if uuid else IGlobalIdentifier(brain.getObject()).getGUID()
116
117 - def getElementUuidById(self, catalog, element_type_id, id):
118 """ 119 Find element by ID but only cache UUID. This forces us to lookup elements 120 each time by UUID (pretty fast) which gives us a chance to see if the element 121 has been deleted. 122 """ 123 cls = self.ELEMENT_TYPE_MAP.get(element_type_id) 124 if cls: 125 catalog = catalog or self._catalogs.get(element_type_id) 126 if catalog: 127 results = ICatalogTool(catalog).search(cls, 128 query=Or(Eq('id', id), 129 Eq('name', id)), 130 filterPermissions=False, 131 limit=1) 132 133 if results.total: 134 return self.uuidFromBrain(results.results.next())
135
136 - def getElementById(self, catalog, element_type_id, id):
137 """ 138 Find element by ID, first checking a cache for UUIDs then using that UUID 139 to load the element. If the element can't be found by UUID, the UUID 140 cache is cleared and lookup tried again. 141 """ 142 uuid = self.getElementUuidById(catalog, element_type_id, id) 143 if uuid: 144 element = self.getElementByUuid(uuid) 145 if not element: 146 # Lookup cache must be invalid, try looking up again 147 self.getElementUuidById.clear() 148 log.warning( 149 'Clearing ElementUuidById cache becase we could not find %s' % uuid) 150 uuid = self.getElementUuidById(catalog, element_type_id, id) 151 element = self.getElementByUuid(uuid) 152 return element
153
154 - def getElementUuid(self, obj):
155 if obj: 156 return IGlobalIdentifier(obj).getGUID()
157
158 - def _findDevices(self, identifier, ipAddress, limit=None):
159 """returns a tuple ([device brains], [devices]) searching manage IP and interface IPs. limit is the maximum 160 total number in both lists.""" 161 cat = ICatalogTool(self._devices) 162 163 if ipAddress: 164 try: 165 ipAddress = str(ipToDecimal(ipAddress)) 166 except IpAddressError: 167 ipAddress = None 168 169 if identifier and not ipAddress: 170 try: 171 ipAddress = str(ipToDecimal(identifier)) 172 except IpAddressError: 173 pass 174 175 querySet = Or(Eq('id', identifier), Eq('name', identifier)) 176 177 if ipAddress: 178 querySet.addSubquery(Eq('ipAddress', ipAddress)) 179 180 device_brains = list(cat.search(types=Device, query=querySet, limit=limit, filterPermissions=False)) 181 182 if not ipAddress or (limit is not None and len(device_brains) >= limit): 183 return device_brains, [] 184 185 querySet = Eq('ipAddress', ipAddress) 186 component_limit = None if limit is None else limit - len(device_brains) 187 component_results = cat.search(types=DeviceComponent, query=querySet, limit=component_limit, filterPermissions=False) 188 devices = [component_brain.getObject().device() for component_brain in component_results] 189 return device_brains, devices
190
191 - def findDeviceUuid(self, identifier, ipAddress):
192 """ 193 This will return the device's 194 @type identifier: string 195 @param identifier: The IP address or id of a device 196 @type ipaddress: string 197 @param ipaddress: The known ipaddress of the device 198 """ 199 device_brains, devices = self._findDevices(identifier, ipAddress, limit=1) 200 if device_brains: 201 return self.uuidFromBrain(device_brains[0]) 202 if devices: 203 return self.getElementUuid(devices[0]) 204 return None
205
206 - def findDevice(self, identifier, ipAddress):
207 uuid = self.findDeviceUuid(identifier, ipAddress) 208 if uuid: 209 return self.getElementByUuid(uuid)
210
211 - def getUuidsOfPath(self, node):
212 """ 213 Looks up all the UUIDs in the tree path of an Organizer 214 """ 215 uuids = set() 216 acquisition_chain = [] 217 for n in aq_chain(node.primaryAq()): 218 if isinstance(n, DataRoot): 219 acquisition_chain.pop() 220 break 221 acquisition_chain.append(n) 222 223 if acquisition_chain: 224 for obj in filter(None, acquisition_chain): 225 try: 226 uuids.add(self.getElementUuid(obj)) 227 except TypeError: 228 log.debug("Unable to get a uuid for %s " % obj) 229 230 return filter(None, uuids)
231
232 233 -class EventContext(object):
234 """ 235 Maintains the event context while processing. 236 """ 237
238 - def __init__(self, log, zepRawEvent):
239 self._zepRawEvent = zepRawEvent 240 self._event = self._zepRawEvent.event 241 self._eventProxy = ZepRawEventProxy(self._zepRawEvent) 242 243 # If this event is for a device, it will be attached here 244 self._deviceObject = None 245 self._componentObject = None 246 self.log = EventLoggerAdapter(log, {'event_uuid': self._event.uuid})
247
248 - def setDeviceObject(self, device):
249 self._deviceObject = device
250
251 - def refreshClearClasses(self):
252 self._eventProxy._refreshClearClasses()
253 254 @property
255 - def deviceObject(self):
256 return self._deviceObject
257
258 - def setComponentObject(self, component):
259 self._componentObject = component
260 261 @property
262 - def componentObject(self):
263 return self._componentObject
264 265 @property
266 - def zepRawEvent(self):
267 return self._zepRawEvent
268 269 @property
270 - def event(self):
271 return self._event
272 273 @property
274 - def eventProxy(self):
275 """ 276 A EventProxy that wraps the event protobuf and makes it look like an old style event. 277 """ 278 return self._eventProxy
279
280 -class EventProcessorPipe(object):
281 """ 282 An event context handler that is called in a chain. 283 """ 284 dependencies = [] 285
286 - def __init__(self, manager, name=None):
287 self._manager = manager 288 if name: 289 self.name = name 290 else: 291 self.name = self.__class__.__name__
292
293 - def __call__(self, eventContext):
294 """ 295 Called in a chain, must return modified eventContext. 296 """ 297 raise NotImplementedError()
298
299 -class CheckInputPipe(EventProcessorPipe):
300 """ 301 Validates that the event has required fields. 302 """ 303 REQUIRED_EVENT_FIELDS = ( 304 EventField.ACTOR, EventField.SUMMARY, EventField.SEVERITY) 305
306 - def __call__(self, eventContext):
307 308 # Make sure summary and message are populated 309 if not eventContext.event.HasField( 310 'message') and eventContext.event.HasField('summary'): 311 eventContext.event.message = eventContext.event.summary 312 elif not eventContext.event.HasField( 313 'summary') and eventContext.event.HasField('message'): 314 eventContext.event.summary = eventContext.event.message[:255] 315 316 missingFields = ','.join(ifilterfalse(eventContext.event.HasField, self.REQUIRED_EVENT_FIELDS)) 317 if missingFields: 318 raise DropEvent('Required event fields %s not found' % missingFields, 319 eventContext.event) 320 321 return eventContext
322
323 -class EventIdentifierPluginException(ProcessingException):
324 pass
325 -class EventIdentifierPluginFailure(EventIdentifierPluginException):
326 pass
327 -class EventIdentifierPluginAbort(EventIdentifierPluginException):
328 pass
329
330 -class BaseEventIdentifierPlugin(object):
331 - def _resolveElement(self, evtProcessorManager, catalog, eventContext, type_id_field, 332 identifier_field, uuid_field):
333 """ 334 Lookup an element by identifier or uuid and make sure both 335 identifier and uuid are set. 336 """ 337 actor = eventContext.event.actor 338 if actor.HasField(type_id_field): 339 if not (actor.HasField(identifier_field) and actor.HasField(uuid_field)): 340 if actor.HasField(uuid_field): 341 uuid = getattr(actor, uuid_field, None) 342 element = evtProcessorManager.getElementByUuid(uuid) 343 if element: 344 eventContext.log.debug('Identified element %s by uuid %s', 345 element, uuid) 346 setattr(actor, identifier_field, element.id) 347 else: 348 eventContext.log.warning('Could not find element by uuid %s' 349 , uuid) 350 351 elif actor.HasField(identifier_field): 352 type_id = getattr(actor, type_id_field, None) 353 identifier = getattr(actor, identifier_field, None) 354 if type_id == DEVICE: 355 element_uuid = evtProcessorManager.findDeviceUuid(identifier, 356 eventContext.eventProxy.ipAddress) 357 else: 358 element_uuid = evtProcessorManager.getElementUuidById(catalog, 359 type_id, 360 identifier) 361 362 if element_uuid: 363 eventContext.log.debug('Identified element %s by id %s', 364 element_uuid, identifier) 365 setattr(actor, uuid_field, element_uuid) 366 else: 367 eventContext.log.debug( 368 'Could not find element type %s with id %s', type_id 369 , identifier) 370 else: 371 if log.isEnabledFor(logging.DEBUG): 372 type_id = getattr(actor, type_id_field, None) 373 identifier = getattr(actor, identifier_field, None) 374 uuid = getattr(actor, uuid_field, None) 375 eventContext.log.debug('Element %s already fully identified by %s/%s', type_id, identifier, uuid)
376
377 - def resolveIdentifiers(self, eventContext, evtProcessorManager):
378 """ 379 Update eventContext in place, updating/resolving identifiers and respective uuid's 380 """ 381 eventContext.log.debug('Identifying event (%s)' % self.__class__.__name__) 382 383 # Get element, most likely a Device 384 self._resolveElement( 385 evtProcessorManager, 386 None, 387 eventContext, 388 EventField.Actor.ELEMENT_TYPE_ID, 389 EventField.Actor.ELEMENT_IDENTIFIER, 390 EventField.Actor.ELEMENT_UUID 391 ) 392 393 # Get element, most likely a Component 394 actor = eventContext.event.actor 395 if actor.HasField(EventField.Actor.ELEMENT_UUID): 396 parent = evtProcessorManager.getElementByUuid(actor.element_uuid) 397 else: 398 parent = None 399 self._resolveElement( 400 evtProcessorManager, 401 parent, 402 eventContext, 403 EventField.Actor.ELEMENT_SUB_TYPE_ID, 404 EventField.Actor.ELEMENT_SUB_IDENTIFIER, 405 EventField.Actor.ELEMENT_SUB_UUID 406 )
407
408 -class IdentifierPipe(EventProcessorPipe):
409 """ 410 Resolves element uuids and identifiers to make sure both are populated. 411 """ 412 413 dependencies = [CheckInputPipe] 414
415 - def __call__(self, eventContext):
416 eventContext.log.debug('Identifying event') 417 418 # get list of defined IEventIdentifierPlugins (add default identifier to the end) 419 evtIdentifierPlugins = list(getUtilitiesFor(IEventIdentifierPlugin)) 420 evtIdentifierPlugins.append(('default',BaseEventIdentifierPlugin())) 421 422 # iterate over all event identifier plugins 423 for name, plugin in evtIdentifierPlugins: 424 try: 425 eventContext.log.debug("running identifier plugin %s" % name) 426 plugin.resolveIdentifiers(eventContext, self._manager) 427 except EventIdentifierPluginAbort as e: 428 eventContext.log.debug(e) 429 raise 430 except EventIdentifierPluginException as e: 431 eventContext.log.debug(e) 432 433 return eventContext
434
435 -class AddDeviceContextAndTagsPipe(EventProcessorPipe):
436 """ 437 Adds device and component info to the context and event proxy. 438 """ 439 dependencies = [IdentifierPipe] 440 441 # use defined detail keys for consistent tag names 442 DEVICE_DEVICECLASS_TAG_KEY = EventProxy.DEVICE_CLASS_DETAIL_KEY 443 DEVICE_LOCATION_TAG_KEY = EventProxy.DEVICE_LOCATION_DETAIL_KEY 444 DEVICE_SYSTEMS_TAG_KEY = EventProxy.DEVICE_SYSTEMS_DETAIL_KEY 445 DEVICE_GROUPS_TAG_KEY = EventProxy.DEVICE_GROUPS_DETAIL_KEY 446 447 DEVICE_TAGGERS = { 448 DEVICE_DEVICECLASS_TAG_KEY : (lambda device: device.deviceClass(), 'DeviceClass'), 449 DEVICE_LOCATION_TAG_KEY : (lambda device: device.location(), 'Location'), 450 DEVICE_SYSTEMS_TAG_KEY : (lambda device: device.systems(), 'Systems'), 451 DEVICE_GROUPS_TAG_KEY : (lambda device: device.groups(), 'DeviceGroups'), 452 } 453
454 - def _addDeviceOrganizerNames(self, orgs, orgtypename, evtproxy, proxydetailkey, asDelimitedList=False):
455 if orgtypename not in orgs: 456 return 457 458 orgnames = orgs[orgtypename] 459 if orgnames: 460 if asDelimitedList: 461 detailOrgnames = orgnames 462 proxyOrgname = '|' + '|'.join(orgnames) 463 else: 464 # just use 0'th element 465 detailOrgnames = orgnames[0] 466 proxyOrgname = orgnames 467 evtproxy.setReadOnly(orgtypename, proxyOrgname) 468 evtproxy.details[proxydetailkey] = detailOrgnames
469
470 - def _addDeviceContext(self, eventContext, device):
471 evtproxy = eventContext.eventProxy 472 ipAddress = evtproxy.ipAddress or device.manageIp 473 if ipAddress: 474 evtproxy.ipAddress = ipAddress 475 476 prodState = device.productionState 477 if prodState: 478 evtproxy.prodState = prodState 479 480 devicePriority = device.getPriority() 481 if devicePriority: 482 evtproxy.DevicePriority = devicePriority
483
484 - def _addDeviceOrganizers(self, eventContext, orgs):
485 evtproxy = eventContext.eventProxy 486 self._addDeviceOrganizerNames(orgs, 'Location', evtproxy, EventProxy.DEVICE_LOCATION_DETAIL_KEY) 487 self._addDeviceOrganizerNames(orgs, 'DeviceClass', evtproxy, EventProxy.DEVICE_CLASS_DETAIL_KEY) 488 self._addDeviceOrganizerNames(orgs, 'DeviceGroups', evtproxy, EventProxy.DEVICE_GROUPS_DETAIL_KEY, asDelimitedList=True) 489 self._addDeviceOrganizerNames(orgs, 'Systems', evtproxy, EventProxy.DEVICE_SYSTEMS_DETAIL_KEY, asDelimitedList=True)
490
491 - def _findTypeIdAndElement(self, eventContext, sub_element):
492 actor = eventContext.event.actor 493 if sub_element: 494 type_id_field = EventField.Actor.ELEMENT_SUB_TYPE_ID 495 uuid_field = EventField.Actor.ELEMENT_SUB_UUID 496 else: 497 type_id_field = EventField.Actor.ELEMENT_TYPE_ID 498 uuid_field = EventField.Actor.ELEMENT_UUID 499 type_id = None 500 element = None 501 if actor.HasField(type_id_field): 502 type_id = getattr(actor, type_id_field) 503 if actor.HasField(uuid_field): 504 element = self._manager.getElementByUuid(getattr(actor, uuid_field)) 505 return type_id, element
506
507 - def __call__(self, eventContext):
508 actor = eventContext.event.actor 509 510 # Set identifier and title based on resolved object 511 element_type_id, element = self._findTypeIdAndElement(eventContext, False) 512 if element: 513 actor.element_identifier = element.id 514 elementTitle = element.titleOrId() 515 if elementTitle != actor.element_identifier: 516 try: 517 actor.element_title = elementTitle 518 except ValueError: 519 actor.element_title = elementTitle.decode('utf8') 520 521 sub_element_type_id, sub_element = self._findTypeIdAndElement(eventContext, True) 522 if sub_element: 523 actor.element_sub_identifier = sub_element.id 524 subElementTitle = sub_element.titleOrId() 525 if subElementTitle != actor.element_sub_identifier: 526 try: 527 actor.element_sub_title = subElementTitle 528 except ValueError: 529 actor.element_sub_title = subElementTitle.decode('utf8') 530 531 device = eventContext.deviceObject 532 if device is None: 533 if element_type_id == DEVICE: 534 device = element 535 elif sub_element_type_id == DEVICE: 536 device = sub_element 537 538 if device: 539 eventContext.setDeviceObject(device) 540 541 # find all organizers for this device, and add their uuids to 542 # the appropriate event tags 543 deviceOrgs = {} 544 for tagType, orgProcessValues in self.DEVICE_TAGGERS.iteritems(): 545 getOrgFunc,orgTypeName = orgProcessValues 546 objList = getOrgFunc(device) 547 if objList: 548 if not isinstance(objList, list): 549 objList = [objList] 550 uuids = set(sum((self._manager.getUuidsOfPath(obj) for obj in objList), [])) 551 if uuids: 552 eventContext.eventProxy.tags.addAll(tagType, uuids) 553 554 # save this list of organizers names of this type, to add their names 555 # to the device event context 556 deviceOrgs[orgTypeName] = [obj.getOrganizerName() for obj in objList] 557 558 self._addDeviceContext(eventContext, device) 559 self._addDeviceOrganizers(eventContext, deviceOrgs) 560 561 component = eventContext.componentObject 562 if component is None: 563 if element_type_id == COMPONENT: 564 component = element 565 elif sub_element_type_id == COMPONENT: 566 component = sub_element 567 568 if component: 569 eventContext.setComponentObject(component) 570 571 return eventContext
572
573 -class UpdateDeviceContextAndTagsPipe(AddDeviceContextAndTagsPipe):
574
575 - def __call__(self, eventContext):
576 evtproxy = eventContext.eventProxy 577 578 if eventContext.deviceObject is None: 579 # Clear title fields 580 actor = eventContext.event.actor 581 actor.ClearField(EventField.Actor.ELEMENT_TITLE) 582 actor.ClearField(EventField.Actor.ELEMENT_UUID) 583 actor.ClearField(EventField.Actor.ELEMENT_SUB_TITLE) 584 585 eventContext.log.debug("device was cleared, must purge references in current event: %s" % to_dict(eventContext._zepRawEvent)) 586 # clear out device-specific tags and details 587 deviceOrganizerTypeNames = list(type for function,type in self.DEVICE_TAGGERS.values()) 588 deviceDetailNames = set(deviceOrganizerTypeNames + 589 self.DEVICE_TAGGERS.keys() + 590 [ 591 EventProxy.DEVICE_IP_ADDRESS_DETAIL_KEY, 592 EventProxy.DEVICE_PRIORITY_DETAIL_KEY, 593 EventProxy.PRODUCTION_STATE_DETAIL_KEY, 594 ]) 595 596 # clear device context details 597 for detail in deviceDetailNames: 598 evtproxy.resetReadOnly(detail) 599 if detail in evtproxy.details: 600 del evtproxy.details[detail] 601 602 # clear device-dependent tags 603 evtproxy.tags.clearType(self.DEVICE_TAGGERS.keys()) 604 eventContext.log.debug("reset device values in event before reidentifying: %s" % to_dict(eventContext._zepRawEvent)) 605 606 return super(UpdateDeviceContextAndTagsPipe, self).__call__(eventContext) 607 608 else: 609 return eventContext
610
611 -class SerializeContextPipe(EventProcessorPipe):
612 """ 613 Takes fields added to the eventProxy that couldn't directly be mapped out of the 614 proxy and applies them to the event protobuf. 615 """ 616 dependencies = [AddDeviceContextAndTagsPipe] 617
618 - def __call__(self, eventContext):
619 eventContext.log.debug('Saving context back to event') 620 return eventContext
621
622 -class AssignDefaultEventClassAndTagPipe(EventProcessorPipe):
623 """ 624 If the event class has not yet been set by the time this pipe is reached, set 625 it to a default value. 626 """
627 - def __call__(self, eventContext):
628 eventClassName = eventContext.eventProxy.eventClass 629 # Set event class to Unknown if not specified 630 if not eventClassName: 631 eventContext.eventProxy.eventClass = eventClassName = ZenEventClasses.Unknown 632 633 # Define tags for this event class 634 eventClass = self._manager.getEventClassOrganizer(eventClassName) 635 if eventClass and not eventContext.eventProxy.tags.getByType(TransformPipe.EVENT_CLASS_TAG): 636 try: 637 eventClassUuids = self._manager.getUuidsOfPath(eventClass) 638 if eventClassUuids: 639 eventContext.eventProxy.tags.addAll(TransformPipe.EVENT_CLASS_TAG, eventClassUuids) 640 except (KeyError, AttributeError): 641 log.info("Event has nonexistent event class %s." % eventClass) 642 643 return eventContext
644
645 -class FingerprintPipe(EventProcessorPipe):
646 """ 647 Calculates event's fingerprint/dedupid. 648 """ 649 650 DEFAULT_FINGERPRINT_FIELDS = ( 651 'device', 'component', 'eventClass', 'eventKey', 'severity') 652 NO_EVENT_KEY_FINGERPRINT_FIELDS = ( 653 'device', 'component', 'eventClass', 'severity', 'summary') 654 655 dependencies = [AddDeviceContextAndTagsPipe] 656
657 - def __call__(self, eventContext):
658 event = eventContext.event 659 660 if event.HasField(EventField.FINGERPRINT): 661 fp = event.fingerprint 662 eventContext.eventProxy.dedupid = fp 663 eventContext.log.debug("incoming event has a preset fingerprint %s" % fp) 664 else: 665 dedupFields = self.DEFAULT_FINGERPRINT_FIELDS 666 if not (event.HasField(EventField.EVENT_KEY) and 667 getattr(event, EventField.EVENT_KEY, None)): 668 dedupFields = self.NO_EVENT_KEY_FINGERPRINT_FIELDS 669 670 dedupIdList = [str(getattr(eventContext.eventProxy, field, '')) for 671 field in dedupFields] 672 673 eventContext.eventProxy.dedupid = '|'.join(dedupIdList) 674 675 eventContext.log.debug('Created dedupid of %s from %s', 676 eventContext.eventProxy.dedupid, dedupIdList) 677 678 return eventContext
679
680 -class TransformAndReidentPipe(EventProcessorPipe):
681 dependencies = [AddDeviceContextAndTagsPipe] 682
683 - def __init__(self, manager, transformpipe, reidentpipes):
684 super(TransformAndReidentPipe, self).__init__(manager) 685 self.transformPipe = transformpipe 686 self.reidentpipes = reidentpipes
687
688 - def __call__(self, eventContext):
689 # save original values of device and component, to see if they get modified in the transform 690 original_device = eventContext.eventProxy.device 691 original_component = eventContext.eventProxy.component 692 693 # perform transform 694 eventContext = self.transformPipe(eventContext) 695 696 # see if we need to rerun indent/context pipes 697 if (eventContext.eventProxy.device != original_device or 698 eventContext.eventProxy.component != original_component): 699 700 # clear object references if device/components change 701 if eventContext.eventProxy.device != original_device: 702 eventContext.setDeviceObject(None) 703 eventContext.setComponentObject(None) 704 705 if eventContext.eventProxy.component != original_component: 706 eventContext.setComponentObject(None) 707 708 # rerun any pipes necessary to reidentify event 709 for pipe in self.reidentpipes: 710 eventContext = pipe(eventContext) 711 712 return eventContext
713
714 -class TransformPipe(EventProcessorPipe):
715 716 EVENT_CLASS_TAG = 'zenoss.event.event_class' 717 718 ACTION_HISTORY = 'history' 719 ACTION_DROP = 'drop' 720 ACTION_STATUS = 'status' 721 ACTION_HEARTBEAT = 'heartbeat' 722 ACTION_LOG = 'log' 723 ACTION_ALERT_STATE = 'alert_state' 724 ACTION_DETAIL = 'detail' 725 726 ACTION_STATUS_MAP = { 727 ACTION_HISTORY: STATUS_CLOSED, 728 ACTION_STATUS: STATUS_NEW, 729 ACTION_DROP: STATUS_DROPPED, 730 } 731
732 - def _tagEventClasses(self, eventContext, eventClass):
733 """ 734 Adds a set of tags for the hierarchy of event classes for this event 735 NOTE: We must tag the event classes at this part of the pipeline 736 before a mapping has been applied otherwise the mapping instance 737 won't be tagged, just the Event Class that was mapped. 738 """ 739 try: 740 eventClassUuids = self._manager.getUuidsOfPath(eventClass) 741 if eventClassUuids: 742 eventContext.eventProxy.tags.addAll(self.EVENT_CLASS_TAG, eventClassUuids) 743 except (KeyError, AttributeError): 744 log.info("Event has nonexistent event class %s." % eventClass)
745
746 - def __call__(self, eventContext):
747 eventContext.log.debug('Mapping and Transforming event') 748 apply_transforms = getattr(eventContext.event, 'apply_transforms', True) 749 if not apply_transforms: 750 eventContext.log.debug('Not applying transforms, regexes or zProperties because apply_transforms was false') 751 evtclass = self._manager.lookupEventClass(eventContext) 752 if evtclass: 753 self._tagEventClasses(eventContext, evtclass) 754 755 if apply_transforms: 756 evtclass.applyExtraction(eventContext.eventProxy) 757 evtclass.applyValues(eventContext.eventProxy) 758 if eventContext.eventProxy.eventClassMapping: 759 eventContext.event.event_class_mapping_uuid = IGlobalIdentifier(evtclass).getGUID() 760 if apply_transforms: 761 evtclass.applyTransform(eventContext.eventProxy, 762 eventContext.deviceObject, 763 eventContext.componentObject) 764 return eventContext
765
766 -class EventPluginPipe(EventProcessorPipe):
767 - def __init__(self, manager, pluginInterface, name=''):
768 super(EventPluginPipe, self).__init__(manager, name) 769 770 self._eventPlugins = tuple(getUtilitiesFor(pluginInterface))
771
772 - def __call__(self, eventContext):
773 for name, plugin in self._eventPlugins: 774 try: 775 plugin.apply(eventContext._eventProxy, self._manager.dmd) 776 except Exception as e: 777 eventContext.log.error( 778 'Event plugin %s encountered an error -- skipping.' % name) 779 eventContext.log.exception(e) 780 continue 781 782 return eventContext
783
784 -class ClearClassRefreshPipe(EventProcessorPipe):
785 - def __call__(self, eventContext):
786 eventContext.refreshClearClasses() 787 return eventContext
788
789 -class TestPipeExceptionPipe(EventProcessorPipe):
790 # pipe used for testing exception handling in event processor
791 - def __init__(self, exceptionClass=ProcessingException):
792 self.exceptionClass = exceptionClass
793
794 - def __call__(self, eventContext):
795 raise self.exceptionClass('Testing pipe processing failure')
796