Package Products :: Package ZenHub :: Module zenhubworker
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.zenhubworker

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2008, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  import Globals 
 12  from Products.DataCollector.Plugins import loadPlugins 
 13  from Products.ZenHub import PB_PORT 
 14  from Products.ZenHub.zenhub import LastCallReturnValue 
 15  from Products.ZenHub.PBDaemon import translateError, RemoteConflictError 
 16  from Products.ZenUtils.Time import isoDateTime 
 17  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 18  from Products.ZenUtils.Utils import unused, zenPath 
 19  from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory 
 20  # required to allow modeling with zenhubworker 
 21  from Products.DataCollector.plugins import DataMaps 
 22  unused(DataMaps) 
 23   
 24  from twisted.cred import credentials 
 25  from twisted.spread import pb 
 26  from twisted.internet import reactor, error 
 27  from ZODB.POSException import ConflictError 
 28  from collections import defaultdict 
 29   
 30  import cPickle as pickle 
 31  import time 
 32  import signal 
 33  import os 
 34   
 35  IDLE = "None/None" 
36 -class _CumulativeWorkerStats(object):
37 """ 38 Internal class for maintaining cumulative stats on frequency and runtime 39 for individual methods by service 40 """
41 - def __init__(self):
42 self.numoccurrences = 0 43 self.totaltime = 0.0 44 self.lasttime = 0
45
46 - def addOccurrence(self, elapsed, now=None):
47 if now is None: 48 now = time.time() 49 self.numoccurrences += 1 50 self.totaltime += elapsed 51 self.lasttime = now
52
53 -class zenhubworker(ZCmdBase, pb.Referenceable):
54 "Execute ZenHub requests in separate process" 55
56 - def __init__(self):
57 ZCmdBase.__init__(self) 58 59 self.current = IDLE 60 self.currentStart = 0 61 self.numCalls = 0 62 try: 63 self.log.debug("establishing SIGUSR2 signal handler") 64 signal.signal(signal.SIGUSR2, self.sighandler_USR2) 65 except ValueError: 66 # If we get called multiple times, this will generate an exception: 67 # ValueError: signal only works in main thread 68 # Ignore it as we've already set up the signal handler. 69 pass 70 71 self.zem = self.dmd.ZenEventManager 72 loadPlugins(self.dmd) 73 self.pid = os.getpid() 74 self.services = {} 75 factory = ReconnectingPBClientFactory() 76 self.log.debug("Connecting to %s:%d", 77 self.options.hubhost, 78 self.options.hubport) 79 reactor.connectTCP(self.options.hubhost, self.options.hubport, factory) 80 self.log.debug("Logging in as %s", self.options.username) 81 c = credentials.UsernamePassword(self.options.username, 82 self.options.password) 83 factory.gotPerspective = self.gotPerspective 84 def stop(*args): 85 reactor.callLater(0, reactor.stop)
86 factory.clientConnectionLost = stop 87 factory.startLogin(c)
88
89 - def sighandler_USR2(self, *args):
90 self.reportStats()
91
92 - def reportStats(self):
93 now = time.time() 94 if self.current != IDLE: 95 self.log.debug("(%d) Currently performing %s, elapsed %.2f s", 96 self.pid, self.current, now-self.currentStart) 97 else: 98 self.log.debug("(%d) Currently IDLE", self.pid) 99 if self.services: 100 loglines = ["(%d) Running statistics:" % self.pid] 101 for svc,svcob in sorted(self.services.iteritems(), key=lambda kvp:(kvp[0][1], kvp[0][0].rpartition('.')[-1])): 102 svc = "%s/%s" % (svc[1], svc[0].rpartition('.')[-1]) 103 for method,stats in sorted(svcob.callStats.items()): 104 loglines.append(" - %-48s %-32s %8d %12.2f %8.2f %s" % 105 (svc, method, 106 stats.numoccurrences, 107 stats.totaltime, 108 stats.totaltime/stats.numoccurrences if stats.numoccurrences else 0.0, 109 isoDateTime(stats.lasttime))) 110 self.log.debug('\n'.join(loglines)) 111 else: 112 self.log.debug("no service activity statistics")
113
114 - def gotPerspective(self, perspective):
115 "Once we are connected to zenhub, register ourselves" 116 d = perspective.callRemote('reportingForWork', self) 117 def reportProblem(why): 118 self.log.error("Unable to report for work: %s", why) 119 reactor.stop()
120 d.addErrback(reportProblem) 121
122 - def _getService(self, name, instance):
123 """Utility method to create the service (like PingConfig) 124 for instance (like localhost) 125 126 @type name: string 127 @param name: the dotted-name of the module to load 128 (uses @L{Products.ZenUtils.Utils.importClass}) 129 @param instance: string 130 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name. 131 @return: a service loaded from ZenHub/services or one of the zenpacks. 132 """ 133 try: 134 return self.services[name, instance] 135 except KeyError: 136 from Products.ZenUtils.Utils import importClass 137 try: 138 ctor = importClass(name) 139 except ImportError: 140 ctor = importClass('Products.ZenHub.services.%s' % name, name) 141 svc = ctor(self.dmd, instance) 142 self.services[name, instance] = svc 143 144 # dict for tracking statistics on method calls invoked on this service, 145 # including number of times called and total elapsed time, keyed 146 # by method name 147 svc.callStats = defaultdict(_CumulativeWorkerStats) 148 149 return svc
150 151 @translateError
152 - def remote_execute(self, service, instance, method, args):
153 """Execute requests on behalf of zenhub 154 @type service: string 155 @param service: the name of a service, like PingConfig 156 157 @type instance: string 158 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name. 159 160 @type method: string 161 @param method: the name of the called method, like getPingTree 162 163 @type args: tuple 164 @param args: arguments to the method 165 166 @type kw: dictionary 167 @param kw: keyword arguments to the method 168 """ 169 svcstr = service.rpartition('.')[-1] 170 self.current = "%s/%s" % (svcstr, method) 171 self.log.debug("Servicing %s in %s", method, service) 172 now = time.time() 173 self.currentStart = now 174 try: 175 self.syncdb() 176 except RemoteConflictError, ex: 177 pass 178 service = self._getService(service, instance) 179 m = getattr(service, 'remote_' + method) 180 # now that the service is loaded, we can unpack the arguments 181 joinedArgs = "".join(args) 182 args, kw = pickle.loads(joinedArgs) 183 184 # see if this is our last call 185 self.numCalls += 1 186 lastCall = self.numCalls >= self.options.calllimit 187 188 def runOnce(): 189 self.syncdb() 190 res = m(*args, **kw) 191 if lastCall: 192 res = LastCallReturnValue(res) 193 pickled_res = pickle.dumps(res, pickle.HIGHEST_PROTOCOL) 194 chunkedres=[] 195 chunkSize = 102400 196 while pickled_res: 197 chunkedres.append(pickled_res[:chunkSize]) 198 pickled_res = pickled_res[chunkSize:] 199 return chunkedres
200 try: 201 for i in range(4): 202 try: 203 return runOnce() 204 except RemoteConflictError, ex: 205 pass 206 # one last try, but don't hide the exception 207 return runOnce() 208 finally: 209 finishTime = time.time() 210 secs = finishTime - now 211 self.log.debug("Time in %s: %.2f", method, secs) 212 # update call stats for this method on this service 213 service.callStats[method].addOccurrence(secs, finishTime) 214 service.callTime += secs 215 self.current = IDLE 216 217 if lastCall: 218 reactor.callLater(1, self._shutdown) 219
220 - def _shutdown(self):
221 self.log.info("Shutting down") 222 self.reportStats() 223 self.log.info("Stopping reactor") 224 try: 225 reactor.stop() 226 except error.ReactorNotRunning: 227 pass
228
229 - def buildOptions(self):
230 """Options, mostly to find where zenhub lives 231 These options should be passed (by file) from zenhub. 232 """ 233 ZCmdBase.buildOptions(self) 234 self.parser.add_option('--hubhost', 235 dest='hubhost', 236 default='localhost', 237 help="Host to use for connecting to ZenHub") 238 self.parser.add_option('--hubport', 239 dest='hubport', 240 type='int', 241 help="Port to use for connecting to ZenHub", 242 default=PB_PORT) 243 self.parser.add_option('--username', 244 dest='username', 245 help="Login name to use when connecting to ZenHub", 246 default='zenoss') 247 self.parser.add_option('--password', 248 dest='password', 249 help="password to use when connecting to ZenHub", 250 default='zenoss') 251 self.parser.add_option('--calllimit', 252 dest='calllimit', 253 type='int', 254 help="Maximum number of remote calls before restarting worker", 255 default=200)
256 257 if __name__ == '__main__': 258 zhw = zenhubworker() 259 reactor.run() 260