1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__ = """zenhub
16
17 Provide remote, authenticated, and possibly encrypted two-way
18 communications with the Model and Event databases.
19
20 """
21
22 from XmlRpcService import XmlRpcService
23
24 import time
25 import pickle
26
27 from twisted.cred import portal, checkers, credentials
28 from twisted.spread import pb, banana
29 banana.SIZE_LIMIT = 1024 * 1024 * 10
30
31 from twisted.internet import reactor, protocol, defer
32 from twisted.web import server, xmlrpc
33 from zope.interface import implements
34
35 import Globals
36
37 from Products.DataCollector.Plugins import loadPlugins
38 from Products.ZenUtils.ZCmdBase import ZCmdBase
39 from Products.ZenUtils.Utils import zenPath, getExitMessage, unused
40 from Products.ZenUtils.DaemonStats import DaemonStats
41 from Products.ZenEvents.Event import Event, EventHeartbeat
42 from Products.ZenEvents.ZenEventClasses import App_Start
43 from Products.ZenUtils.Utils import unused
44
45 from Products.ZenHub.PBDaemon import RemoteBadMonitor
46 pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor)
47
48
49
50
51
52
53
54 from Products.DataCollector.plugins.DataMaps import ObjectMap
55
56 import sys
57 sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins'))
58 import DataMaps
59 unused(DataMaps, ObjectMap)
60
61
62
63 XML_RPC_PORT = 8081
64 PB_PORT = 8789
65
67 "Provide some level of authentication for XML/RPC calls"
68
72
73
75 """
76 Call the inherited render engine after authentication succeeds.
77 See @L{XmlRpcService.XmlRpcService.Render}.
78 """
79 return XmlRpcService.render(self, request)
80
81
83 """
84 Render an XMLRPC error indicating an authentication failure.
85 @type request: HTTPRequest
86 @param request: the request for this xmlrpc call.
87 @return: None
88 """
89 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
90
91
93 """
94 Unpack the authorization header and check the credentials.
95 @type request: HTTPRequest
96 @param request: the request for this xmlrpc call.
97 @return: NOT_DONE_YET
98 """
99 auth = request.received_headers.get('authorization', None)
100 if not auth:
101 self.unauthorized(request)
102 else:
103 try:
104 type, encoded = auth.split()
105 if type not in ('Basic',):
106 self.unauthorized(request)
107 else:
108 user, passwd = encoded.decode('base64').split(':')
109 c = credentials.UsernamePassword(user, passwd)
110 d = self.checker.requestAvatarId(c)
111 d.addCallback(self.doRender, request)
112 def error(unused, request):
113 self.unauthorized(request)
114 d.addErrback(error, request)
115 except Exception:
116 self.unauthorized(request)
117 return server.NOT_DONE_YET
118
119
121 """
122 Connect collectors to their configuration Services
123 """
124
127
132 """
133 Allow a collector to find a Hub service by name. It also
134 associates the service with a collector so that changes can be
135 pushed back out to collectors.
136
137 @type serviceName: string
138 @param serviceName: a name, like 'EventService'
139 @type instance: string
140 @param instance: the collector's instance name, like 'localhost'
141 @type listener: a remote reference to the collector
142 @param listener: the callback interface to the collector
143 @return a remote reference to a service
144 """
145 service = self.hub.getService(serviceName, instance)
146 if service is not None and listener:
147 service.addListener(listener)
148 return service
149
151 """
152 Allow a worker register for work.
153
154 @type worker: a pb.RemoteReference
155 @param worker: a reference to zenhubworker
156 @return None
157 """
158 worker.busy = False
159 self.hub.workers.append(worker)
160 def removeWorker(worker):
161 if worker in self.hub.workers:
162 self.hub.workers.remove(worker)
163 reactor.callLater(1, self.hub.createWorker)
164 worker.notifyOnDisconnect(removeWorker)
165
166
168 """
169 Following the Twisted authentication framework.
170 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html
171 """
172 implements(portal.IRealm)
173
176
178 if pb.IPerspective not in interfaces:
179 raise NotImplementedError
180 return pb.IPerspective, self.hubAvitar, lambda:None
181
182
184 """Redirect service requests to one of the worker processes. Note
185 that everything else (like change notifications) go through
186 locally hosted services."""
187
188 callTime = 0.
189
193
195 "Intercept requests and send them down to workers"
196 svc = str(self.service.__class__).rsplit('.', 1)[0]
197 instance = self.service.instance
198 args = broker.unserialize(args)
199 kw = broker.unserialize(kw)
200
201
202
203 args = pickle.dumps( (args, kw) )
204 result = self.zenhub.deferToWorker( (svc, instance, message, args) )
205 return broker.serialize(result, self.perspective)
206
208 "Implement the HubService interface by forwarding to the local service"
209 return self.service.addListener(listener)
210
212 "Implement the HubService interface by forwarding to the local service"
213 return self.service.removeListener(listener)
214
216 "Implement the HubService interface by forwarding to the local service"
217 return self.service.update(object)
218
220 "Implement the HubService interface by forwarding to the local service"
221 return self.service.deleted(object)
222
223
225 """
226 Listen for changes to objects in the Zeo database and update the
227 collectors' configuration.
228
229 The remote collectors connect the ZenHub and request configuration
230 information and stay connected. When changes are detected in the
231 Zeo database, configuration updates are sent out to collectors
232 asynchronously. In this way, changes made in the web GUI can
233 affect collection immediately, instead of waiting for a
234 configuration cycle.
235
236 Each collector uses a different, pluggable service within ZenHub
237 to translate objects into configuration and data. ZenPacks can
238 add services for their collectors. Collectors communicate using
239 Twisted's Perspective Broker, which provides authenticated,
240 asynchronous, bidirectional method invocation.
241
242 ZenHub also provides an XmlRPC interface to some common services
243 to support collectors written in other languages.
244 """
245
246 totalTime = 0.
247 totalEvents = 0
248 totalCallTime = 0.
249 name = 'zenhub'
250
252 """
253 Hook ourselves up to the Zeo database and wait for collectors
254 to connect.
255 """
256 self.changes = []
257 self.workers = []
258 self.workList = []
259
260 ZCmdBase.__init__(self)
261 self.zem = self.dmd.ZenEventManager
262 loadPlugins(self.dmd)
263 self.services = {}
264
265 er = HubRealm(self)
266 checker = self.loadChecker()
267 pt = portal.Portal(er, [checker])
268 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt))
269
270 xmlsvc = AuthXmlRpcService(self.dmd, checker)
271 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc))
272
273 self.sendEvent(eventClass=App_Start,
274 summary="%s started" % self.name,
275 severity=0)
276 reactor.callLater(5, self.processQueue)
277 self.rrdStats = self.getRRDStats()
278 for i in range(int(self.options.workers)):
279 self.createWorker()
280
294
296 """
297 Override the kind of zeo connection we have so we can listen
298 to Zeo object updates. Updates comes as OID invalidations.
299
300 @return: None
301 """
302 from ZEO.cache import ClientCache as ClientCacheBase
303 class ClientCache(ClientCacheBase):
304 "A sub-class to notice object invalidation notifications"
305 def invalidate(s, oid, version, tid):
306 self.changes.insert(0, oid)
307 ClientCacheBase.invalidate(s, oid, version, tid)
308
309 from ZEO.ClientStorage import ClientStorage as ClientStorageBase
310 class ClientStorage(ClientStorageBase):
311 "Override the caching class to intercept messages"
312 ClientCacheClass = ClientCache
313
314 storage = ClientStorage((self.options.host, self.options.port),
315 client=self.options.pcachename,
316 var=self.options.pcachedir,
317 cache_size=self.options.pcachesize*1024*1024)
318 from ZODB import DB
319 self.db = DB(storage, cache_size=self.options.cachesize)
320
321
337
338
340 """
341 Perform one cycle of update notifications.
342
343 @return: None
344 """
345 while self.changes:
346 oid = self.changes.pop()
347 self.log.debug("Got oid %r" % oid)
348 obj = self.dmd._p_jar[oid]
349 self.log.debug("Object %r changed" % obj)
350 try:
351 obj = obj.__of__(self.dmd).primaryAq()
352 self.log.debug("Noticing object %s changed" % obj.getPrimaryUrlPath())
353 except AttributeError, ex:
354 self.log.debug("Noticing object %s " % obj)
355 for s in self.services.values():
356 s.deleted(obj)
357 else:
358 for s in self.services.values():
359 s.update(obj)
360
361
363 """
364 Useful method for posting events to the EventManager.
365
366 @type kw: keywords (dict)
367 @param kw: the values for an event: device, summary, etc.
368 @return: None
369 """
370 if not 'device' in kw:
371 kw['device'] = self.options.monitor
372 if not 'component' in kw:
373 kw['component'] = self.name
374 try:
375 self.zem.sendEvent(Event(**kw))
376 except:
377 self.log.exception("Unable to send an event")
378
380 """
381 Load the password file
382
383 @return: an object satisfying the ICredentialsChecker
384 interface using a password file or an empty list if the file
385 is not available. Uses the file specified in the --passwd
386 command line option.
387 """
388 try:
389 checker = checkers.FilePasswordDB(self.options.passwordfile)
390
391 u, p = checker._loadCredentials().next()
392 self.workerUsername, self.workerPassword = u, p
393 return checker
394 except Exception, ex:
395 self.log.exception("Unable to load %s", self.options.passwordfile)
396 return []
397
398
400 """
401 Helper method to load services dynamically for a collector.
402 Returned instances are cached: reconnecting collectors will
403 get the same service object.
404
405 @type name: string
406 @param name: the dotted-name of the module to load
407 (uses @L{Products.ZenUtils.Utils.importClass})
408 @param instance: string
409 @param instance: each service serves only one specific collector
410 instances (like 'localhost'). instance defines the collector's
411 instance name.
412 @return: a service loaded from ZenHub/services or one of the zenpacks.
413 """
414
415 if not self.dmd.Monitors.Performance._getOb(instance, False):
416 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \
417 self.options.monitor + " is not in the current list" )
418
419 try:
420 return self.services[name, instance]
421
422 except KeyError:
423 from Products.ZenUtils.Utils import importClass
424 try:
425 ctor = importClass(name)
426 except ImportError:
427 ctor = importClass('Products.ZenHub.services.%s' % name, name)
428 svc = ctor(self.dmd, instance)
429 if self.options.workers:
430 svc = WorkerInterceptor(self, svc)
431 self.services[name, instance] = svc
432 return svc
433
435 """Take a remote request and queue it for worker processes.
436
437 @type args: tuple
438 @param args: the arguments to the remote_execute() method in the worker
439 @return: a Deferred for the eventual results of the method call
440
441 """
442 d = defer.Deferred()
443 svcName, instance, method = args[:3]
444 service = self.getService(svcName, instance).service
445 priority = service.getMethodPriority(method)
446
447 if self.options.prioritize:
448
449 for i, job in enumerate(self.workList):
450 if priority < job[1]:
451 self.workList.insert(i, (d, priority, args) )
452 break
453 else:
454 self.workList.append( (d, priority, args) )
455 else:
456
457 self.workList.append( (d, priority, args) )
458
459 self.giveWorkToWorkers()
460 return d
461
462
464 """Parcel out a method invocation to an available worker process
465 """
466 self.log.debug("worklist has %d items", len(self.workList))
467 while self.workList:
468 for i, worker in enumerate(self.workers):
469
470 if not worker.busy:
471 job = self.getJobForWorker(i)
472 if job is None: continue
473 worker.busy = True
474 def finished(result, finishedWorker):
475 finishedWorker.busy = False
476 self.giveWorkToWorkers()
477 return result
478 self.log.debug("Giving %s to worker %d", job[2][2], i)
479 d2 = worker.callRemote('execute', *job[2])
480 d2.addBoth(finished, worker)
481 d2.chainDeferred(job[0])
482 break
483 else:
484 self.log.debug("all workers are busy")
485 break
486
488 if self.options.anyworker:
489 return self.workList.pop(0)
490 else:
491
492 lenWorkers = float(len(self.workers))
493 for i in range(len(self.workList)):
494 priority = self.workList[i][1]
495 if priority < (workerId+1) / lenWorkers:
496 return self.workList.pop(i)
497
499 """Start a worker subprocess
500
501 @return: None
502 """
503
504 if len(self.workers) >= self.options.workers:
505 return
506
507 import os, tempfile
508 fd, tmp = tempfile.mkstemp()
509 try:
510 os.write(fd, "hubport %s\n" % self.options.pbport)
511 os.write(fd, "username %s\n" % self.workerUsername)
512 os.write(fd, "password %s\n" % self.workerPassword)
513 os.write(fd, "host %s\n" % self.options.host)
514 os.write(fd, "logseverity %s\n" % self.options.logseverity)
515 os.write(fd, "cachesize %s\n" % self.options.cachesize)
516 finally:
517 os.close(fd)
518
519 exe = zenPath('bin', 'zenhubworker')
520
521
522 class WorkerRunningProtocol(protocol.ProcessProtocol):
523
524 def outReceived(s, data):
525 self.log.debug("Worker reports %s" % (data,))
526
527 def errReceived(s, data):
528 self.log.info("Worker reports %s" % (data,))
529
530 def processEnded(s, reason):
531 os.unlink(tmp)
532 self.log.warning("Worker exited with status: %d (%s)",
533 reason.value.exitCode,
534 getExitMessage(reason.value.exitCode))
535 args = (exe, 'run', '-C', tmp)
536 self.log.debug("Starting %s", ' '.join(args))
537 reactor.spawnProcess(WorkerRunningProtocol(), exe, args, os.environ)
538
540 """
541 Since we don't do anything on a regular basis, just
542 push heartbeats regularly.
543
544 @return: None
545 """
546 seconds = 30
547 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds)
548 self.zem.sendEvent(evt)
549 self.niceDoggie(seconds)
550 reactor.callLater(seconds, self.heartbeat)
551 r = self.rrdStats
552 totalTime = sum([s.callTime for s in self.services.values()])
553 self.zem.sendEvents(
554 r.counter('totalTime', seconds, int(self.totalTime * 1000)) +
555 r.counter('totalEvents', seconds, self.totalEvents) +
556 r.gauge('services', seconds, len(self.services)) +
557 r.counter('totalCallTime', seconds, totalTime) +
558 r.gauge('workListLength', seconds, len(self.workList))
559 )
560
561
563 """
564 Start the main event loop.
565 """
566 if self.options.cycle:
567 self.heartbeat()
568 reactor.run()
569 for worker in self.workers:
570 worker.transport.signalProcess('KILL')
571
572
574 """
575 Adds our command line options to ZCmdBase command line options.
576 """
577 ZCmdBase.buildOptions(self)
578 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport',
579 type='int', default=XML_RPC_PORT,
580 help='Port to use for XML-based Remote Procedure Calls (RPC)')
581 self.parser.add_option('--pbport', dest='pbport',
582 type='int', default=PB_PORT,
583 help="Port to use for Twisted's pb service")
584 self.parser.add_option('--passwd', dest='passwordfile',
585 type='string', default=zenPath('etc','hubpasswd'),
586 help='File where passwords are stored')
587 self.parser.add_option('--monitor', dest='monitor',
588 default='localhost',
589 help='Name of the distributed monitor this hub runs on')
590 self.parser.add_option('--workers', dest='workers',
591 type='int', default=0,
592 help="Number of worker instances to handle requests")
593 self.parser.add_option('--prioritize', dest='prioritize',
594 action='store_true', default=False,
595 help="Run higher priority jobs before lower priority ones")
596 self.parser.add_option('--anyworker', dest='anyworker',
597 action='store_true', default=False,
598 help='Allow any priority job to run on any worker')
599
600 if __name__ == '__main__':
601 z = ZenHub()
602 z.main()
603