1
2
3
4
5
6
7
8
9
10
11 import Globals
12 from Products.DataCollector.Plugins import loadPlugins
13 from Products.ZenHub import PB_PORT
14 from Products.ZenHub.zenhub import LastCallReturnValue
15 from Products.ZenHub.PBDaemon import translateError, RemoteConflictError
16 from Products.ZenUtils.Time import isoDateTime
17 from Products.ZenUtils.ZCmdBase import ZCmdBase
18 from Products.ZenUtils.Utils import unused, zenPath
19 from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory
20
21 from Products.DataCollector.plugins import DataMaps
22 unused(DataMaps)
23
24 from twisted.cred import credentials
25 from twisted.spread import pb
26 from twisted.internet import reactor, error
27 from ZODB.POSException import ConflictError
28 from collections import defaultdict
29
30 import cPickle as pickle
31 import time
32 import signal
33 import os
34
35 IDLE = "None/None"
37 """
38 Internal class for maintaining cumulative stats on frequency and runtime
39 for individual methods by service
40 """
42 self.numoccurrences = 0
43 self.totaltime = 0.0
44 self.lasttime = 0
45
47 if now is None:
48 now = time.time()
49 self.numoccurrences += 1
50 self.totaltime += elapsed
51 self.lasttime = now
52
88
91
93 now = time.time()
94 if self.current != IDLE:
95 self.log.debug("(%d) Currently performing %s, elapsed %.2f s",
96 self.pid, self.current, now-self.currentStart)
97 else:
98 self.log.debug("(%d) Currently IDLE", self.pid)
99 if self.services:
100 loglines = ["(%d) Running statistics:" % self.pid]
101 for svc,svcob in sorted(self.services.iteritems(), key=lambda kvp:(kvp[0][1], kvp[0][0].rpartition('.')[-1])):
102 svc = "%s/%s" % (svc[1], svc[0].rpartition('.')[-1])
103 for method,stats in sorted(svcob.callStats.items()):
104 loglines.append(" - %-48s %-32s %8d %12.2f %8.2f %s" %
105 (svc, method,
106 stats.numoccurrences,
107 stats.totaltime,
108 stats.totaltime/stats.numoccurrences if stats.numoccurrences else 0.0,
109 isoDateTime(stats.lasttime)))
110 self.log.debug('\n'.join(loglines))
111 else:
112 self.log.debug("no service activity statistics")
113
115 "Once we are connected to zenhub, register ourselves"
116 d = perspective.callRemote('reportingForWork', self)
117 def reportProblem(why):
118 self.log.error("Unable to report for work: %s", why)
119 reactor.stop()
120 d.addErrback(reportProblem)
121
123 """Utility method to create the service (like PingConfig)
124 for instance (like localhost)
125
126 @type name: string
127 @param name: the dotted-name of the module to load
128 (uses @L{Products.ZenUtils.Utils.importClass})
129 @param instance: string
130 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name.
131 @return: a service loaded from ZenHub/services or one of the zenpacks.
132 """
133 try:
134 return self.services[name, instance]
135 except KeyError:
136 from Products.ZenUtils.Utils import importClass
137 try:
138 ctor = importClass(name)
139 except ImportError:
140 ctor = importClass('Products.ZenHub.services.%s' % name, name)
141 svc = ctor(self.dmd, instance)
142 self.services[name, instance] = svc
143
144
145
146
147 svc.callStats = defaultdict(_CumulativeWorkerStats)
148
149 return svc
150
151 @translateError
153 """Execute requests on behalf of zenhub
154 @type service: string
155 @param service: the name of a service, like PingConfig
156
157 @type instance: string
158 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name.
159
160 @type method: string
161 @param method: the name of the called method, like getPingTree
162
163 @type args: tuple
164 @param args: arguments to the method
165
166 @type kw: dictionary
167 @param kw: keyword arguments to the method
168 """
169 svcstr = service.rpartition('.')[-1]
170 self.current = "%s/%s" % (svcstr, method)
171 self.log.debug("Servicing %s in %s", method, service)
172 now = time.time()
173 self.currentStart = now
174 try:
175 self.syncdb()
176 except RemoteConflictError, ex:
177 pass
178 service = self._getService(service, instance)
179 m = getattr(service, 'remote_' + method)
180
181 joinedArgs = "".join(args)
182 args, kw = pickle.loads(joinedArgs)
183
184
185 self.numCalls += 1
186 lastCall = self.numCalls >= self.options.calllimit
187
188 def runOnce():
189 self.syncdb()
190 res = m(*args, **kw)
191 if lastCall:
192 res = LastCallReturnValue(res)
193 pickled_res = pickle.dumps(res, pickle.HIGHEST_PROTOCOL)
194 chunkedres=[]
195 chunkSize = 102400
196 while pickled_res:
197 chunkedres.append(pickled_res[:chunkSize])
198 pickled_res = pickled_res[chunkSize:]
199 return chunkedres
200 try:
201 for i in range(4):
202 try:
203 return runOnce()
204 except RemoteConflictError, ex:
205 pass
206
207 return runOnce()
208 finally:
209 finishTime = time.time()
210 secs = finishTime - now
211 self.log.debug("Time in %s: %.2f", method, secs)
212
213 service.callStats[method].addOccurrence(secs, finishTime)
214 service.callTime += secs
215 self.current = IDLE
216
217 if lastCall:
218 reactor.callLater(1, self._shutdown)
219
221 self.log.info("Shutting down")
222 self.reportStats()
223 self.log.info("Stopping reactor")
224 try:
225 reactor.stop()
226 except error.ReactorNotRunning:
227 pass
228
230 """Options, mostly to find where zenhub lives
231 These options should be passed (by file) from zenhub.
232 """
233 ZCmdBase.buildOptions(self)
234 self.parser.add_option('--hubhost',
235 dest='hubhost',
236 default='localhost',
237 help="Host to use for connecting to ZenHub")
238 self.parser.add_option('--hubport',
239 dest='hubport',
240 type='int',
241 help="Port to use for connecting to ZenHub",
242 default=PB_PORT)
243 self.parser.add_option('--username',
244 dest='username',
245 help="Login name to use when connecting to ZenHub",
246 default='zenoss')
247 self.parser.add_option('--password',
248 dest='password',
249 help="password to use when connecting to ZenHub",
250 default='zenoss')
251 self.parser.add_option('--calllimit',
252 dest='calllimit',
253 type='int',
254 help="Maximum number of remote calls before restarting worker",
255 default=200)
256
257 if __name__ == '__main__':
258 zhw = zenhubworker()
259 reactor.run()
260