| Trees | Indices | Help |
|
|---|
|
|
1 ##############################################################################
2 #
3 # Copyright (C) Zenoss, Inc. 2011, all rights reserved.
4 #
5 # This content is made available according to terms specified in
6 # License.zenoss under the directory where your Zenoss product is installed.
7 #
8 ##############################################################################
9
10
11 import logging
12 from zope.interface import implements, providedBy
13 from zope.component import adapter, getGlobalSiteManager
14 from twisted.internet import defer, reactor, task
15 from BTrees.IIBTree import IITreeSet
16 from ZODB.utils import u64
17 from Products.ZenRelations.PrimaryPathObjectManager import PrimaryPathObjectManager
18 from Products.ZenModel.DeviceComponent import DeviceComponent
19 from Products.ZenUtils.Utils import giveTimeToReactor
20 from .interfaces import IInvalidationProcessor, IHubCreatedEvent
21 from .zodb import UpdateEvent, DeletionEvent
22
23 log = logging.getLogger('zen.ZenHub')
27 """
28 This method re-implements zope.component.event.objectEventNotify to give
29 more time back to the reactor. It is slightly different, but works exactly
30 the same for our specific use case.
31 """
32 gsm = getGlobalSiteManager()
33 subscriptions = gsm.adapters.subscriptions(map(providedBy, (event.object, event)), None)
34 for subscription in subscriptions:
35 yield giveTimeToReactor(subscription, event.object, event)
36
38 # Go pull the object out of the database
39 obj = dmd._p_jar[oid]
40 # Don't bother with all the catalog stuff; we're depending on primaryAq
41 # existing anyway, so only deal with it if it actually has primaryAq.
42 if (isinstance(obj, PrimaryPathObjectManager)
43 or isinstance(obj, DeviceComponent)):
44 try:
45 # Try to get the object
46 obj = obj.__of__(dmd).primaryAq()
47 except (AttributeError, KeyError), ex:
48 # Object has been removed from its primary path (i.e. was
49 # deleted), so make a DeletionEvent
50 log.debug("Notifying services that %r has been deleted" % obj)
51 event = DeletionEvent(obj, oid)
52 else:
53 # Object was updated, so make an UpdateEvent
54 log.debug("Notifying services that %r has been updated" % obj)
55 event = UpdateEvent(obj, oid)
56 # Fire the event for all interested services to pick up
57 return betterObjectEventNotify(event)
58
62 """
63 Registered as a global utility. Given a database hook and a list of oids,
64 handles pushing updated objects to the appropriate services, which in turn
65 cause collectors to be pushed updates.
66 """
67 implements(IInvalidationProcessor)
68
69 _invalidation_queue = None
70 _hub = None
71 _hub_ready = None
72
74 self._invalidation_queue = IITreeSet()
75 self._hub_ready = defer.Deferred()
76 getGlobalSiteManager().registerHandler(self.onHubCreated)
77
78 @adapter(IHubCreatedEvent)
82
83 @defer.inlineCallbacks
85 yield self._hub_ready
86 i = 0
87 queue = self._invalidation_queue
88 if self._hub.dmd.pauseHubNotifications:
89 log.debug('notifications are currently paused')
90 return
91 for i, oid in enumerate(oids):
92 ioid = u64(oid)
93 # Try pushing it into the queue, which is an IITreeSet. If it inserted
94 # successfully it returns 1, else 0.
95 if queue.insert(ioid):
96 # Get the deferred that does the notification
97 d = self._dispatch(self._hub.dmd, oid, ioid, queue)
98 yield d
99 defer.returnValue(i)
100
102 """
103 Send to all the services that care by firing events.
104 """
105 try:
106 return handle_oid(dmd, oid)
107 finally:
108 queue.remove(ioid)
109
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1.1812 on Mon Jul 30 17:11:23 2012 | http://epydoc.sourceforge.net |