Package Products :: Package ZenHub :: Module notify
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.notify

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2011, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  __doc__ = """Provides a batch notifier to break up the expensive, blocking IO 
 12  involved with calls to DeviceOrganizer.getSubDevices which can call getObject 
 13  on the brains of every device in the system. Processes batches of 10 devices 
 14  giving way to the event loop between each batch. See ticket #26626.""" 
 15   
 16  import logging 
 17  import collections 
 18  from twisted.internet import reactor, defer 
 19   
 20  LOG = logging.getLogger("zen.hub.notify") 
 21   
22 -class NotifyItem(object):
23 """These items are held in the BatchNotifier's queue. They contain all the 24 context needed to process the subdevices of a specific device class. This 25 context includes... 26 27 device class UID: e.g. /zport/dmd/Devices/Server/Linux) 28 subdevices: an iterator over the device classes subdevices) 29 notify_functions: a dictionary mapping Service UID to notifyAll 30 function. An example Service UID is 31 ('CommandPerformanceConfig', 'localhost') 32 d: the deferred for this item. Always has the following callback 33 chain: 34 Slot Callback Errback 35 1 BatchNotifier._callback None 36 2 None BatchNotifier._errback 37 """ 38
39 - def __init__(self, device_class_uid, subdevices):
40 self.device_class_uid = device_class_uid 41 self.subdevices = subdevices 42 # keys are service_uids eg ('CommandPerformanceConfig', 'localhost') 43 self.notify_functions = {} 44 self.d = None
45
46 - def __repr__(self):
47 args = (self.device_class_uid, self.notify_functions.keys()) 48 return "<NotifyItem(device_class_uid=%s, notify_functions=%s)>" % args
49
50 -class BatchNotifier(object):
51 """Processes the expensive getSubDevices call in batches. A singleton 52 instance is registered as a utility in zcml. The queue contains NotifyItem 53 instances. If notify_subdevices is called and an item exists in the queue 54 for the same device class, then the new service UID and notify function 55 are appended to the existing item. Once an item is moved from the queue to 56 _current_item member, it is being processed and further notify_subdevices 57 calls for the same device class will append a new item to the queue. 58 """ 59 60 _BATCH_SIZE = 10 61 _DELAY = 0.05 62
63 - def __init__(self):
64 self._empty = None 65 self._current_item = None 66 self._queue = collections.deque() 67 self._stopping = False
68
69 - def notify_subdevices(self, device_class, service_uid, notify_function):
70 if not self._stopping: 71 LOG.debug("BatchNotifier.notify_subdevices: %r, %s" % (device_class, service_uid)) 72 item = self._find_or_create_item(device_class) 73 item.notify_functions[service_uid] = notify_function 74 else: 75 LOG.debug("notify_subdevices received a call while " 76 "stopping: %r, %s" % (device_class, service_uid))
77
78 - def _find_or_create_item(self, device_class):
79 device_class_uid = device_class.getPrimaryId() 80 for item in self._queue: 81 if item.device_class_uid == device_class_uid: 82 retval = item 83 break 84 else: 85 subdevices = device_class.getSubDevicesGen() 86 retval = NotifyItem(device_class_uid, subdevices) 87 retval.d = self._create_deferred() 88 if not self._queue and self._current_item is None: 89 self._call_later(retval.d) 90 self._queue.appendleft(retval) 91 return retval
92
93 - def _create_deferred(self):
94 d = defer.Deferred() 95 d.addCallback(self._callback) 96 d.addErrback(self._errback) 97 return d
98
99 - def _call_later(self, d):
100 reactor.callLater(BATCH_NOTIFIER._DELAY, d.callback, None)
101
102 - def _switch_to_next_item(self):
103 self._current_item = self._queue.pop() if self._queue else None
104
105 - def _call_notify_functions(self, device):
106 for service_uid, notify_function in self._current_item.notify_functions.items(): 107 try: 108 notify_function(device) 109 except Exception, e: 110 args = (service_uid, device.getPrimaryId(), type(e).__name__, e) 111 LOG.error("%s failed to notify %s: %s: %s" % args)
112
113 - def _callback(self, result):
114 if self._current_item is None: 115 self._current_item = self._queue.pop() 116 batch_count = 0 117 try: 118 for device in self._current_item.subdevices: 119 self._call_notify_functions(device) 120 batch_count += 1 121 if batch_count == BatchNotifier._BATCH_SIZE: 122 self._current_item.d = self._create_deferred() 123 break 124 else: 125 LOG.debug("BatchNotifier._callback: no more devices, %s in queue", len(self._queue)) 126 self._switch_to_next_item() 127 except Exception, e: 128 args = (self._current_item.device_class_uid, type(e).__name__, e) 129 LOG.warn("Failed to get subdevice of %s: %s: %s" % args) 130 self._switch_to_next_item() 131 if self._current_item is not None: 132 self._call_later(self._current_item.d) 133 elif self._empty and not self._queue: 134 empty, self._empty = (self._empty, None) 135 empty.callback(None)
136 137
138 - def _errback(self, failure):
139 LOG.error("Failure in batch notifier: %s: %s" % (failure.type.__name__, failure.value)) 140 LOG.debug("BatchNotifier._errback: failure=%s" % failure)
141
142 - def stop(self):
143 self._stopping = True 144 return defer.DeferredList([item.d for item in self._queue])
145
146 - def whenEmpty(self):
147 isEmptyDeferred = self._empty 148 def printEmpty(val): 149 LOG.debug("Notifier is now empty")
150 if not self._queue and not self._current_item: 151 isEmptyDeferred = defer.Deferred() 152 LOG.debug("Notifier is currently empty") 153 isEmptyDeferred.callback(None) 154 elif not self._empty: 155 LOG.debug("Notifier is not currently empty, returning deferred") 156 isEmptyDeferred = self._empty = defer.Deferred() 157 isEmptyDeferred.addCallback(printEmpty) 158 return isEmptyDeferred
159 160 BATCH_NOTIFIER = BatchNotifier() 161