Package ZenHub :: Module zenhubworker
[hide private]
[frames] | no frames]

Source Code for Module ZenHub.zenhubworker

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2008, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  import Globals 
 14  from Products.DataCollector.Plugins import loadPlugins 
 15  from Products.ZenHub.zenhub import PB_PORT 
 16  from Products.ZenHub.PBDaemon import translateError, RemoteConflictError 
 17  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 18  from Products.ZenUtils.Utils import unused 
 19  from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory 
 20  # required to allow modeling with zenhubworker 
 21  from Products.DataCollector.plugins import DataMaps 
 22  unused(DataMaps) 
 23   
 24  from twisted.cred import credentials 
 25  from twisted.spread import pb 
 26  from twisted.internet import reactor 
 27  from ZODB.POSException import ConflictError 
 28  from transaction import commit 
 29   
 30  import pickle 
 31  import time 
 32   
33 -class zenhubworker(ZCmdBase, pb.Referenceable):
34 "Execute ZenHub requests in separate process" 35
36 - def __init__(self):
37 ZCmdBase.__init__(self) 38 self.zem = self.dmd.ZenEventManager 39 loadPlugins(self.dmd) 40 self.services = {} 41 factory = ReconnectingPBClientFactory() 42 self.log.debug("Connecting to %s:%d", 43 self.options.hubhost, 44 self.options.hubport) 45 reactor.connectTCP(self.options.hubhost, self.options.hubport, factory) 46 self.log.debug("Logging in as %s", self.options.username) 47 c = credentials.UsernamePassword(self.options.username, 48 self.options.password) 49 factory.gotPerspective = self.gotPerspective 50 def stop(*args): 51 reactor.callLater(0, reactor.stop)
52 factory.clientConnectionLost = stop 53 factory.startLogin(c)
54
55 - def gotPerspective(self, perspective):
56 "Once we are connected to zenhub, register ourselves" 57 d = perspective.callRemote('reportingForWork', self) 58 def reportProblem(why): 59 self.log.error("Unable to report for work: %s", why) 60 reactor.stop()
61 d.addErrback(reportProblem) 62
63 - def _getService(self, name, instance):
64 """Utility method to create the service (like PingConfig) 65 for instance (like localhost) 66 67 @type name: string 68 @param name: the dotted-name of the module to load 69 (uses @L{Products.ZenUtils.Utils.importClass}) 70 @param instance: string 71 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name. 72 @return: a service loaded from ZenHub/services or one of the zenpacks. 73 """ 74 try: 75 return self.services[name, instance] 76 except KeyError: 77 from Products.ZenUtils.Utils import importClass 78 try: 79 ctor = importClass(name) 80 except ImportError: 81 ctor = importClass('Products.ZenHub.services.%s' % name, name) 82 svc = ctor(self.dmd, instance) 83 self.services[name, instance] = svc 84 return svc
85 86 @translateError
87 - def remote_execute(self, service, instance, method, args):
88 """Execute requests on behalf of zenhub 89 @type service: string 90 @param service: the name of a service, like PingConfig 91 92 @type instance: string 93 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name. 94 95 @type method: string 96 @param method: the name of the called method, like getPingTree 97 98 @type args: tuple 99 @param args: arguments to the method 100 101 @type kw: dictionary 102 @param kw: keyword arguments to the method 103 """ 104 self.log.debug("Servicing %s in %s", method, service) 105 now = time.time() 106 service = self._getService(service, instance) 107 m = getattr(service, 'remote_' + method) 108 # now that the service is loaded, we can unpack the arguments 109 args, kw = pickle.loads(args) 110 def runOnce(): 111 self.syncdb() 112 res = m(*args, **kw) 113 commit() 114 return res
115 try: 116 for i in range(4): 117 try: 118 return runOnce() 119 except RemoteConflictError, ex: 120 pass 121 # one last try, but don't hide the exception 122 return runOnce() 123 finally: 124 secs = time.time() - now 125 self.log.debug("Time in %s: %.2f", method, secs) 126 service.callTime += secs 127
128 - def buildOptions(self):
129 """Options, mostly to find where zenhub lives 130 These options should be passed (by file) from zenhub. 131 """ 132 ZCmdBase.buildOptions(self) 133 self.parser.add_option('--hubhost', 134 dest='hubhost', 135 default='localhost', 136 help="Host to use for connecting to ZenHub") 137 self.parser.add_option('--hubport', 138 dest='hubport', 139 type='int', 140 help="Port to use for connecting to ZenHub", 141 default=PB_PORT) 142 self.parser.add_option('--username', 143 dest='username', 144 help="Login name to use when connecting to ZenHub", 145 default='zenoss') 146 self.parser.add_option('--password', 147 dest='password', 148 help="password to use when connecting to ZenHub", 149 default='zenoss')
150 151 if __name__ == '__main__': 152 zhw = zenhubworker() 153 reactor.run() 154