Package Products :: Package ZenHub :: Module invalidations
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.invalidations

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2011, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  import logging 
 12  from zope.interface import implements, providedBy 
 13  from zope.component import adapter, getGlobalSiteManager 
 14  from twisted.internet import defer, reactor, task 
 15  from BTrees.IIBTree import IITreeSet 
 16  from ZODB.utils import u64 
 17  from Products.ZenRelations.PrimaryPathObjectManager import PrimaryPathObjectManager 
 18  from Products.ZenModel.DeviceComponent import DeviceComponent 
 19  from Products.ZenUtils.Utils import giveTimeToReactor 
 20  from .interfaces import IInvalidationProcessor, IHubCreatedEvent 
 21  from .zodb import UpdateEvent, DeletionEvent 
 22   
 23  log = logging.getLogger('zen.ZenHub') 
24 25 @defer.inlineCallbacks 26 -def betterObjectEventNotify(event):
27 """ 28 This method re-implements zope.component.event.objectEventNotify to give 29 more time back to the reactor. It is slightly different, but works exactly 30 the same for our specific use case. 31 """ 32 gsm = getGlobalSiteManager() 33 subscriptions = gsm.adapters.subscriptions(map(providedBy, (event.object, event)), None) 34 for subscription in subscriptions: 35 yield giveTimeToReactor(subscription, event.object, event)
36
37 -def handle_oid(dmd, oid):
38 # Go pull the object out of the database 39 obj = dmd._p_jar[oid] 40 # Don't bother with all the catalog stuff; we're depending on primaryAq 41 # existing anyway, so only deal with it if it actually has primaryAq. 42 if (isinstance(obj, PrimaryPathObjectManager) 43 or isinstance(obj, DeviceComponent)): 44 try: 45 # Try to get the object 46 obj = obj.__of__(dmd).primaryAq() 47 except (AttributeError, KeyError), ex: 48 # Object has been removed from its primary path (i.e. was 49 # deleted), so make a DeletionEvent 50 log.debug("Notifying services that %r has been deleted" % obj) 51 event = DeletionEvent(obj, oid) 52 else: 53 # Object was updated, so make an UpdateEvent 54 log.debug("Notifying services that %r has been updated" % obj) 55 event = UpdateEvent(obj, oid) 56 # Fire the event for all interested services to pick up 57 return betterObjectEventNotify(event)
58
59 60 61 -class InvalidationProcessor(object):
62 """ 63 Registered as a global utility. Given a database hook and a list of oids, 64 handles pushing updated objects to the appropriate services, which in turn 65 cause collectors to be pushed updates. 66 """ 67 implements(IInvalidationProcessor) 68 69 _invalidation_queue = None 70 _hub = None 71 _hub_ready = None 72
73 - def __init__(self):
74 self._invalidation_queue = IITreeSet() 75 self._hub_ready = defer.Deferred() 76 getGlobalSiteManager().registerHandler(self.onHubCreated)
77 78 @adapter(IHubCreatedEvent)
79 - def onHubCreated(self, event):
80 self._hub = event.hub 81 self._hub_ready.callback(self._hub)
82 83 @defer.inlineCallbacks
84 - def processQueue(self, oids):
85 yield self._hub_ready 86 i = 0 87 queue = self._invalidation_queue 88 if self._hub.dmd.pauseHubNotifications: 89 log.debug('notifications are currently paused') 90 return 91 for i, oid in enumerate(oids): 92 ioid = u64(oid) 93 # Try pushing it into the queue, which is an IITreeSet. If it inserted 94 # successfully it returns 1, else 0. 95 if queue.insert(ioid): 96 # Get the deferred that does the notification 97 d = self._dispatch(self._hub.dmd, oid, ioid, queue) 98 yield d 99 defer.returnValue(i)
100
101 - def _dispatch(self, dmd, oid, ioid, queue):
102 """ 103 Send to all the services that care by firing events. 104 """ 105 try: 106 return handle_oid(dmd, oid) 107 finally: 108 queue.remove(ioid)
109