Trees | Indices | Help |
|
---|
|
1 ############################################################################## 2 # 3 # Copyright (C) Zenoss, Inc. 2011, all rights reserved. 4 # 5 # This content is made available according to terms specified in 6 # License.zenoss under the directory where your Zenoss product is installed. 7 # 8 ############################################################################## 9 10 11 import logging 12 from zope.interface import implements, providedBy 13 from zope.component import adapter, getGlobalSiteManager 14 from twisted.internet import defer, reactor, task 15 from BTrees.IIBTree import IITreeSet 16 from ZODB.utils import u64 17 from Products.ZenRelations.PrimaryPathObjectManager import PrimaryPathObjectManager 18 from Products.ZenModel.DeviceComponent import DeviceComponent 19 from Products.ZenUtils.Utils import giveTimeToReactor 20 from .interfaces import IInvalidationProcessor, IHubCreatedEvent 21 from .zodb import UpdateEvent, DeletionEvent 22 23 log = logging.getLogger('zen.ZenHub')27 """ 28 This method re-implements zope.component.event.objectEventNotify to give 29 more time back to the reactor. It is slightly different, but works exactly 30 the same for our specific use case. 31 """ 32 gsm = getGlobalSiteManager() 33 subscriptions = gsm.adapters.subscriptions(map(providedBy, (event.object, event)), None) 34 for subscription in subscriptions: 35 yield giveTimeToReactor(subscription, event.object, event)3638 # Go pull the object out of the database 39 obj = dmd._p_jar[oid] 40 # Don't bother with all the catalog stuff; we're depending on primaryAq 41 # existing anyway, so only deal with it if it actually has primaryAq. 42 if (isinstance(obj, PrimaryPathObjectManager) 43 or isinstance(obj, DeviceComponent)): 44 try: 45 # Try to get the object 46 obj = obj.__of__(dmd).primaryAq() 47 except (AttributeError, KeyError), ex: 48 # Object has been removed from its primary path (i.e. was 49 # deleted), so make a DeletionEvent 50 log.debug("Notifying services that %r has been deleted" % obj) 51 event = DeletionEvent(obj, oid) 52 else: 53 # Object was updated, so make an UpdateEvent 54 log.debug("Notifying services that %r has been updated" % obj) 55 event = UpdateEvent(obj, oid) 56 # Fire the event for all interested services to pick up 57 return betterObjectEventNotify(event)5862 """ 63 Registered as a global utility. Given a database hook and a list of oids, 64 handles pushing updated objects to the appropriate services, which in turn 65 cause collectors to be pushed updates. 66 """ 67 implements(IInvalidationProcessor) 68 69 _invalidation_queue = None 70 _hub = None 71 _hub_ready = None 7210974 self._invalidation_queue = IITreeSet() 75 self._hub_ready = defer.Deferred() 76 getGlobalSiteManager().registerHandler(self.onHubCreated)77 78 @adapter(IHubCreatedEvent) 82 83 @defer.inlineCallbacks85 yield self._hub_ready 86 i = 0 87 queue = self._invalidation_queue 88 if self._hub.dmd.pauseHubNotifications: 89 log.debug('notifications are currently paused') 90 return 91 for i, oid in enumerate(oids): 92 ioid = u64(oid) 93 # Try pushing it into the queue, which is an IITreeSet. If it inserted 94 # successfully it returns 1, else 0. 95 if queue.insert(ioid): 96 # Get the deferred that does the notification 97 d = self._dispatch(self._hub.dmd, oid, ioid, queue) 98 yield d 99 defer.returnValue(i)100102 """ 103 Send to all the services that care by firing events. 104 """ 105 try: 106 return handle_oid(dmd, oid) 107 finally: 108 queue.remove(ioid)
Trees | Indices | Help |
|
---|
Generated by Epydoc 3.0.1.1812 on Mon Jul 30 17:11:23 2012 | http://epydoc.sourceforge.net |