1
2
3
4
5
6
7
8
9
10
11 __doc__ = """Provides a batch notifier to break up the expensive, blocking IO
12 involved with calls to DeviceOrganizer.getSubDevices which can call getObject
13 on the brains of every device in the system. Processes batches of 10 devices
14 giving way to the event loop between each batch. See ticket #26626."""
15
16 import logging
17 import collections
18 from twisted.internet import reactor, defer
19
20 LOG = logging.getLogger("zen.hub.notify")
21
23 """These items are held in the BatchNotifier's queue. They contain all the
24 context needed to process the subdevices of a specific device class. This
25 context includes...
26
27 device class UID: e.g. /zport/dmd/Devices/Server/Linux)
28 subdevices: an iterator over the device classes subdevices)
29 notify_functions: a dictionary mapping Service UID to notifyAll
30 function. An example Service UID is
31 ('CommandPerformanceConfig', 'localhost')
32 d: the deferred for this item. Always has the following callback
33 chain:
34 Slot Callback Errback
35 1 BatchNotifier._callback None
36 2 None BatchNotifier._errback
37 """
38
39 - def __init__(self, device_class_uid, subdevices):
40 self.device_class_uid = device_class_uid
41 self.subdevices = subdevices
42
43 self.notify_functions = {}
44 self.d = None
45
47 args = (self.device_class_uid, self.notify_functions.keys())
48 return "<NotifyItem(device_class_uid=%s, notify_functions=%s)>" % args
49
51 """Processes the expensive getSubDevices call in batches. A singleton
52 instance is registered as a utility in zcml. The queue contains NotifyItem
53 instances. If notify_subdevices is called and an item exists in the queue
54 for the same device class, then the new service UID and notify function
55 are appended to the existing item. Once an item is moved from the queue to
56 _current_item member, it is being processed and further notify_subdevices
57 calls for the same device class will append a new item to the queue.
58 """
59
60 _BATCH_SIZE = 10
61 _DELAY = 0.05
62
64 self._empty = None
65 self._current_item = None
66 self._queue = collections.deque()
67 self._stopping = False
68
70 if not self._stopping:
71 LOG.debug("BatchNotifier.notify_subdevices: %r, %s" % (device_class, service_uid))
72 item = self._find_or_create_item(device_class)
73 item.notify_functions[service_uid] = notify_function
74 else:
75 LOG.debug("notify_subdevices received a call while "
76 "stopping: %r, %s" % (device_class, service_uid))
77
79 device_class_uid = device_class.getPrimaryId()
80 for item in self._queue:
81 if item.device_class_uid == device_class_uid:
82 retval = item
83 break
84 else:
85 subdevices = device_class.getSubDevicesGen()
86 retval = NotifyItem(device_class_uid, subdevices)
87 retval.d = self._create_deferred()
88 if not self._queue and self._current_item is None:
89 self._call_later(retval.d)
90 self._queue.appendleft(retval)
91 return retval
92
98
101
103 self._current_item = self._queue.pop() if self._queue else None
104
106 for service_uid, notify_function in self._current_item.notify_functions.items():
107 try:
108 notify_function(device)
109 except Exception, e:
110 args = (service_uid, device.getPrimaryId(), type(e).__name__, e)
111 LOG.error("%s failed to notify %s: %s: %s" % args)
112
114 if self._current_item is None:
115 self._current_item = self._queue.pop()
116 batch_count = 0
117 try:
118 for device in self._current_item.subdevices:
119 self._call_notify_functions(device)
120 batch_count += 1
121 if batch_count == BatchNotifier._BATCH_SIZE:
122 self._current_item.d = self._create_deferred()
123 break
124 else:
125 LOG.debug("BatchNotifier._callback: no more devices, %s in queue", len(self._queue))
126 self._switch_to_next_item()
127 except Exception, e:
128 args = (self._current_item.device_class_uid, type(e).__name__, e)
129 LOG.warn("Failed to get subdevice of %s: %s: %s" % args)
130 self._switch_to_next_item()
131 if self._current_item is not None:
132 self._call_later(self._current_item.d)
133 elif self._empty and not self._queue:
134 empty, self._empty = (self._empty, None)
135 empty.callback(None)
136
137
141
143 self._stopping = True
144 return defer.DeferredList([item.d for item in self._queue])
145
147 isEmptyDeferred = self._empty
148 def printEmpty(val):
149 LOG.debug("Notifier is now empty")
150 if not self._queue and not self._current_item:
151 isEmptyDeferred = defer.Deferred()
152 LOG.debug("Notifier is currently empty")
153 isEmptyDeferred.callback(None)
154 elif not self._empty:
155 LOG.debug("Notifier is not currently empty, returning deferred")
156 isEmptyDeferred = self._empty = defer.Deferred()
157 isEmptyDeferred.addCallback(printEmpty)
158 return isEmptyDeferred
159
160 BATCH_NOTIFIER = BatchNotifier()
161