1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__ = """zenhub
16
17 Provide remote, authenticated, and possibly encrypted two-way
18 communications with the Model and Event databases.
19
20 """
21
22 from XmlRpcService import XmlRpcService
23
24 import time
25 import pickle
26
27 from twisted.cred import portal, checkers, credentials
28 from twisted.spread import pb, banana
29 banana.SIZE_LIMIT = 1024 * 1024 * 10
30
31 from twisted.internet import reactor, protocol, defer
32 from twisted.web import server, xmlrpc
33 from zope.interface import implements
34
35 import Globals
36
37 from Products.DataCollector.Plugins import loadPlugins
38 from Products.ZenUtils.ZCmdBase import ZCmdBase
39 from Products.ZenUtils.Utils import zenPath, getExitMessage, unused
40 from Products.ZenUtils.DaemonStats import DaemonStats
41 from Products.ZenEvents.Event import Event, EventHeartbeat
42 from Products.ZenEvents.ZenEventClasses import App_Start
43 from Products.ZenUtils.Utils import unused
44
45 from Products.ZenHub.PBDaemon import RemoteBadMonitor
46 pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor)
47
48
49
50
51
52
53
54 from Products.DataCollector.plugins.DataMaps import ObjectMap
55
56 import sys
57 sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins'))
58 import DataMaps
59 unused(DataMaps, ObjectMap)
60
61
62
63 XML_RPC_PORT = 8081
64 PB_PORT = 8789
65
67 "Provide some level of authentication for XML/RPC calls"
68
72
73
75 """
76 Call the inherited render engine after authentication succeeds.
77 See @L{XmlRpcService.XmlRpcService.Render}.
78 """
79 return XmlRpcService.render(self, request)
80
81
83 """
84 Render an XMLRPC error indicating an authentication failure.
85 @type request: HTTPRequest
86 @param request: the request for this xmlrpc call.
87 @return: None
88 """
89 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
90
91
93 """
94 Unpack the authorization header and check the credentials.
95 @type request: HTTPRequest
96 @param request: the request for this xmlrpc call.
97 @return: NOT_DONE_YET
98 """
99 auth = request.received_headers.get('authorization', None)
100 if not auth:
101 self.unauthorized(request)
102 else:
103 try:
104 type, encoded = auth.split()
105 if type not in ('Basic',):
106 self.unauthorized(request)
107 else:
108 user, passwd = encoded.decode('base64').split(':')
109 c = credentials.UsernamePassword(user, passwd)
110 d = self.checker.requestAvatarId(c)
111 d.addCallback(self.doRender, request)
112 def error(unused, request):
113 self.unauthorized(request)
114 d.addErrback(error, request)
115 except Exception:
116 self.unauthorized(request)
117 return server.NOT_DONE_YET
118
119
121 """
122 Connect collectors to their configuration Services
123 """
124
127
132 """
133 Allow a collector to find a Hub service by name. It also
134 associates the service with a collector so that changes can be
135 pushed back out to collectors.
136
137 @type serviceName: string
138 @param serviceName: a name, like 'EventService'
139 @type instance: string
140 @param instance: the collector's instance name, like 'localhost'
141 @type listener: a remote reference to the collector
142 @param listener: the callback interface to the collector
143 @return a remote reference to a service
144 """
145 service = self.hub.getService(serviceName, instance)
146 if service is not None and listener:
147 service.addListener(listener)
148 return service
149
151 """
152 Allow a worker register for work.
153
154 @type worker: a pb.RemoteReference
155 @param worker: a reference to zenhubworker
156 @return None
157 """
158 worker.busy = False
159 self.hub.workers.append(worker)
160 def removeWorker(worker):
161 if worker in self.hub.workers:
162 self.hub.workers.remove(worker)
163 reactor.callLater(1, self.hub.createWorker)
164 worker.notifyOnDisconnect(removeWorker)
165
166
168 """
169 Following the Twisted authentication framework.
170 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html
171 """
172 implements(portal.IRealm)
173
176
178 if pb.IPerspective not in interfaces:
179 raise NotImplementedError
180 return pb.IPerspective, self.hubAvitar, lambda:None
181
182
184 """Redirect service requests to one of the worker processes. Note
185 that everything else (like change notifications) go through
186 locally hosted services."""
187
188 callTime = 0.
189
193
195 "Intercept requests and send them down to workers"
196 svc = str(self.service.__class__).rsplit('.', 1)[0]
197 instance = self.service.instance
198 args = broker.unserialize(args)
199 kw = broker.unserialize(kw)
200
201
202
203 args = pickle.dumps( (args, kw) )
204 result = self.zenhub.deferToWorker( (svc, instance, message, args) )
205 return broker.serialize(result, self.perspective)
206
208 "Implement the HubService interface by forwarding to the local service"
209 return self.service.addListener(listener)
210
212 "Implement the HubService interface by forwarding to the local service"
213 return self.service.removeListener(listener)
214
216 "Implement the HubService interface by forwarding to the local service"
217 return self.service.update(object)
218
220 "Implement the HubService interface by forwarding to the local service"
221 return self.service.deleted(object)
222
223
225 """
226 Listen for changes to objects in the Zeo database and update the
227 collectors' configuration.
228
229 The remote collectors connect the ZenHub and request configuration
230 information and stay connected. When changes are detected in the
231 Zeo database, configuration updates are sent out to collectors
232 asynchronously. In this way, changes made in the web GUI can
233 affect collection immediately, instead of waiting for a
234 configuration cycle.
235
236 Each collector uses a different, pluggable service within ZenHub
237 to translate objects into configuration and data. ZenPacks can
238 add services for their collectors. Collectors communicate using
239 Twisted's Perspective Broker, which provides authenticated,
240 asynchronous, bidirectional method invocation.
241
242 ZenHub also provides an XmlRPC interface to some common services
243 to support collectors written in other languages.
244 """
245
246 totalTime = 0.
247 totalEvents = 0
248 totalCallTime = 0.
249 name = 'zenhub'
250
252 """
253 Hook ourselves up to the Zeo database and wait for collectors
254 to connect.
255 """
256 self.changes = []
257 self.workers = []
258 self.workList = []
259
260 ZCmdBase.__init__(self)
261 self.zem = self.dmd.ZenEventManager
262 loadPlugins(self.dmd)
263 self.services = {}
264
265 er = HubRealm(self)
266 checker = self.loadChecker()
267 pt = portal.Portal(er, [checker])
268 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt))
269
270 xmlsvc = AuthXmlRpcService(self.dmd, checker)
271 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc))
272
273 self.sendEvent(eventClass=App_Start,
274 summary="%s started" % self.name,
275 severity=0)
276 reactor.callLater(5, self.processQueue)
277 self.rrdStats = self.getRRDStats()
278 for i in range(int(self.options.workers)):
279 self.createWorker()
280
282 """
283 Return the most recent RRD statistic information.
284 """
285 rrdStats = DaemonStats()
286 perfConf = self.dmd.Monitors.Performance._getOb(self.options.monitor, None)
287 rrdStats.configWithMonitor('zenhub', perfConf)
288 return rrdStats
289
291 """
292 Override the kind of zeo connection we have so we can listen
293 to Zeo object updates. Updates comes as OID invalidations.
294
295 @return: None
296 """
297 from ZEO.cache import ClientCache as ClientCacheBase
298 class ClientCache(ClientCacheBase):
299 "A sub-class to notice object invalidation notifications"
300 def invalidate(s, oid, version, tid):
301 self.changes.insert(0, oid)
302 ClientCacheBase.invalidate(s, oid, version, tid)
303
304 from ZEO.ClientStorage import ClientStorage as ClientStorageBase
305 class ClientStorage(ClientStorageBase):
306 "Override the caching class to intercept messages"
307 ClientCacheClass = ClientCache
308
309 storage = ClientStorage((self.options.host, self.options.port),
310 client=self.options.pcachename,
311 var=self.options.pcachedir,
312 cache_size=self.options.pcachesize*1024*1024)
313 from ZODB import DB
314 self.db = DB(storage, cache_size=self.options.cachesize)
315
316
332
333
335 """
336 Perform one cycle of update notifications.
337
338 @return: None
339 """
340 while self.changes:
341 oid = self.changes.pop()
342 self.log.debug("Got oid %r" % oid)
343 obj = self.dmd._p_jar[oid]
344 self.log.debug("Object %r changed" % obj)
345 try:
346 obj = obj.__of__(self.dmd).primaryAq()
347 self.log.debug("Noticing object %s changed" % obj.getPrimaryUrlPath())
348 except AttributeError, ex:
349 self.log.debug("Noticing object %s " % obj)
350 for s in self.services.values():
351 s.deleted(obj)
352 else:
353 for s in self.services.values():
354 s.update(obj)
355
356
358 """
359 Useful method for posting events to the EventManager.
360
361 @type kw: keywords (dict)
362 @param kw: the values for an event: device, summary, etc.
363 @return: None
364 """
365 if not 'device' in kw:
366 kw['device'] = self.options.monitor
367 if not 'component' in kw:
368 kw['component'] = self.name
369 try:
370 self.zem.sendEvent(Event(**kw))
371 except:
372 self.log.exception("Unable to send an event")
373
375 """
376 Load the password file
377
378 @return: an object satisfying the ICredentialsChecker
379 interface using a password file or an empty list if the file
380 is not available. Uses the file specified in the --passwd
381 command line option.
382 """
383 try:
384 checker = checkers.FilePasswordDB(self.options.passwordfile)
385
386 u, p = checker._loadCredentials().next()
387 self.workerUsername, self.workerPassword = u, p
388 return checker
389 except Exception, ex:
390 self.log.exception("Unable to load %s", self.options.passwordfile)
391 return []
392
393
395 """
396 Helper method to load services dynamically for a collector.
397 Returned instances are cached: reconnecting collectors will
398 get the same service object.
399
400 @type name: string
401 @param name: the dotted-name of the module to load
402 (uses @L{Products.ZenUtils.Utils.importClass})
403 @param instance: string
404 @param instance: each service serves only one specific collector
405 instances (like 'localhost'). instance defines the collector's
406 instance name.
407 @return: a service loaded from ZenHub/services or one of the zenpacks.
408 """
409
410 if not self.dmd.Monitors.Performance._getOb(instance, False):
411 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \
412 self.options.monitor + " is not in the current list" )
413
414 try:
415 return self.services[name, instance]
416
417 except KeyError:
418 from Products.ZenUtils.Utils import importClass
419 try:
420 ctor = importClass(name)
421 except ImportError:
422 ctor = importClass('Products.ZenHub.services.%s' % name, name)
423 svc = ctor(self.dmd, instance)
424 if self.options.workers:
425 svc = WorkerInterceptor(self, svc)
426 self.services[name, instance] = svc
427 return svc
428
430 """Take a remote request and queue it for worker processes.
431
432 @type args: tuple
433 @param args: the arguments to the remote_execute() method in the worker
434 @return: a Deferred for the eventual results of the method call
435
436 """
437 d = defer.Deferred()
438 svcName, instance, method = args[:3]
439 service = self.getService(svcName, instance).service
440 priority = service.getMethodPriority(method)
441
442 if self.options.prioritize:
443
444 for i, job in enumerate(self.workList):
445 if priority < job[1]:
446 self.workList.insert(i, (d, priority, args) )
447 break
448 else:
449 self.workList.append( (d, priority, args) )
450 else:
451
452 self.workList.append( (d, priority, args) )
453
454 self.giveWorkToWorkers()
455 return d
456
457
459 """Parcel out a method invocation to an available worker process
460 """
461 self.log.debug("worklist has %d items", len(self.workList))
462 while self.workList:
463 for i, worker in enumerate(self.workers):
464
465 if not worker.busy:
466 job = self.getJobForWorker(i)
467 if job is None: continue
468 worker.busy = True
469 def finished(result, finishedWorker):
470 finishedWorker.busy = False
471 self.giveWorkToWorkers()
472 return result
473 self.log.debug("Giving %s to worker %d", job[2][2], i)
474 d2 = worker.callRemote('execute', *job[2])
475 d2.addBoth(finished, worker)
476 d2.chainDeferred(job[0])
477 break
478 else:
479 self.log.debug("all workers are busy")
480 break
481
483 if self.options.anyworker:
484 return self.workList.pop(0)
485 else:
486
487 lenWorkers = float(len(self.workers))
488 for i in range(len(self.workList)):
489 priority = self.workList[i][1]
490 if priority < (workerId+1) / lenWorkers:
491 return self.workList.pop(i)
492
494 """Start a worker subprocess
495
496 @return: None
497 """
498
499 if len(self.workers) >= self.options.workers:
500 return
501
502 import os, tempfile
503 fd, tmp = tempfile.mkstemp()
504 try:
505 os.write(fd, "hubport %s\n" % self.options.pbport)
506 os.write(fd, "username %s\n" % self.workerUsername)
507 os.write(fd, "password %s\n" % self.workerPassword)
508 os.write(fd, "host %s\n" % self.options.host)
509 os.write(fd, "logseverity %s\n" % self.options.logseverity)
510 os.write(fd, "cachesize %s\n" % self.options.cachesize)
511 finally:
512 os.close(fd)
513
514 exe = zenPath('bin', 'zenhubworker')
515
516
517 class WorkerRunningProtocol(protocol.ProcessProtocol):
518
519 def outReceived(s, data):
520 self.log.debug("Worker reports %s" % (data,))
521
522 def errReceived(s, data):
523 self.log.info("Worker reports %s" % (data,))
524
525 def processEnded(s, reason):
526 os.unlink(tmp)
527 self.log.warning("Worker exited with status: %d (%s)",
528 reason.value.exitCode,
529 getExitMessage(reason.value.exitCode))
530 args = (exe, 'run', '-C', tmp)
531 self.log.debug("Starting %s", ' '.join(args))
532 reactor.spawnProcess(WorkerRunningProtocol(), exe, args, os.environ)
533
535 """
536 Since we don't do anything on a regular basis, just
537 push heartbeats regularly.
538
539 @return: None
540 """
541 seconds = 30
542 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds)
543 self.zem.sendEvent(evt)
544 self.niceDoggie(seconds)
545 reactor.callLater(seconds, self.heartbeat)
546 r = self.rrdStats
547 totalTime = sum([s.callTime for s in self.services.values()])
548 self.zem.sendEvents(
549 r.counter('totalTime', seconds, int(self.totalTime * 1000)) +
550 r.counter('totalEvents', seconds, self.totalEvents) +
551 r.gauge('services', seconds, len(self.services)) +
552 r.counter('totalCallTime', seconds, totalTime) +
553 r.gauge('workListLength', seconds, len(self.workList))
554 )
555
556
558 """
559 Start the main event loop.
560 """
561 if self.options.cycle:
562 self.heartbeat()
563 reactor.run()
564 for worker in self.workers:
565 worker.transport.signalProcess('KILL')
566
567
569 """
570 Adds our command line options to ZCmdBase command line options.
571 """
572 ZCmdBase.buildOptions(self)
573 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport',
574 type='int', default=XML_RPC_PORT,
575 help='Port to use for XML-based Remote Procedure Calls (RPC)')
576 self.parser.add_option('--pbport', dest='pbport',
577 type='int', default=PB_PORT,
578 help="Port to use for Twisted's pb service")
579 self.parser.add_option('--passwd', dest='passwordfile',
580 type='string', default=zenPath('etc','hubpasswd'),
581 help='File where passwords are stored')
582 self.parser.add_option('--monitor', dest='monitor',
583 default='localhost',
584 help='Name of the distributed monitor this hub runs on')
585 self.parser.add_option('--workers', dest='workers',
586 type='int', default=0,
587 help="Number of worker instances to handle requests")
588 self.parser.add_option('--prioritize', dest='prioritize',
589 action='store_true', default=False,
590 help="Run higher priority jobs before lower priority ones")
591 self.parser.add_option('--anyworker', dest='anyworker',
592 action='store_true', default=False,
593 help='Allow any priority job to run on any worker')
594
595 if __name__ == '__main__':
596 z = ZenHub()
597 z.main()
598