1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__ = """zenhub
16
17 Provide remote, authenticated, and possibly encrypted two-way
18 communications with the Model and Event databases.
19
20 """
21
22 from XmlRpcService import XmlRpcService
23
24 import time
25 import pickle
26
27 from twisted.cred import portal, checkers, credentials
28 from twisted.spread import pb, banana
29 banana.SIZE_LIMIT = 1024 * 1024 * 10
30
31 from twisted.internet import reactor, protocol, defer
32 from twisted.web import server, xmlrpc
33 from zope.interface import implements
34
35 import Globals
36
37 from Products.DataCollector.Plugins import loadPlugins
38 from Products.ZenUtils.ZCmdBase import ZCmdBase
39 from Products.ZenUtils.Utils import zenPath, getExitMessage, unused
40 from Products.ZenUtils.DaemonStats import DaemonStats
41 from Products.ZenEvents.Event import Event, EventHeartbeat
42 from Products.ZenEvents.ZenEventClasses import App_Start
43 from Products.ZenUtils.Utils import unused
44
45 from Products.ZenHub.PBDaemon import RemoteBadMonitor
46 pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor)
47
48
49
50
51
52
53
54 from Products.DataCollector.plugins.DataMaps import ObjectMap
55
56 import sys
57 sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins'))
58 import DataMaps
59 unused(DataMaps, ObjectMap)
60
61
62
63 XML_RPC_PORT = 8081
64 PB_PORT = 8789
65
67 "Provide some level of authentication for XML/RPC calls"
68
72
73
75 """
76 Call the inherited render engine after authentication succeeds.
77 See @L{XmlRpcService.XmlRpcService.Render}.
78 """
79 return XmlRpcService.render(self, request)
80
81
83 """
84 Render an XMLRPC error indicating an authentication failure.
85 @type request: HTTPRequest
86 @param request: the request for this xmlrpc call.
87 @return: None
88 """
89 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
90
91
93 """
94 Unpack the authorization header and check the credentials.
95 @type request: HTTPRequest
96 @param request: the request for this xmlrpc call.
97 @return: NOT_DONE_YET
98 """
99 auth = request.received_headers.get('authorization', None)
100 if not auth:
101 self.unauthorized(request)
102 else:
103 try:
104 type, encoded = auth.split()
105 if type not in ('Basic',):
106 self.unauthorized(request)
107 else:
108 user, passwd = encoded.decode('base64').split(':')
109 c = credentials.UsernamePassword(user, passwd)
110 d = self.checker.requestAvatarId(c)
111 d.addCallback(self.doRender, request)
112 def error(unused, request):
113 self.unauthorized(request)
114 d.addErrback(error, request)
115 except Exception:
116 self.unauthorized(request)
117 return server.NOT_DONE_YET
118
119
121 """
122 Connect collectors to their configuration Services
123 """
124
127
132 """
133 Allow a collector to find a Hub service by name. It also
134 associates the service with a collector so that changes can be
135 pushed back out to collectors.
136
137 @type serviceName: string
138 @param serviceName: a name, like 'EventService'
139 @type instance: string
140 @param instance: the collector's instance name, like 'localhost'
141 @type listener: a remote reference to the collector
142 @param listener: the callback interface to the collector
143 @return a remote reference to a service
144 """
145 service = self.hub.getService(serviceName, instance)
146 if service is not None and listener:
147 service.addListener(listener)
148 return service
149
151 """
152 Allow a worker register for work.
153
154 @type worker: a pb.RemoteReference
155 @param worker: a reference to zenhubworker
156 @return None
157 """
158 worker.busy = False
159 self.hub.workers.append(worker)
160 def removeWorker(worker):
161 if worker in self.hub.workers:
162 self.hub.workers.remove(worker)
163 reactor.callLater(1, self.hub.createWorker)
164 worker.notifyOnDisconnect(removeWorker)
165
166
168 """
169 Following the Twisted authentication framework.
170 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html
171 """
172 implements(portal.IRealm)
173
176
178 if pb.IPerspective not in interfaces:
179 raise NotImplementedError
180 return pb.IPerspective, self.hubAvitar, lambda:None
181
182
184 """Redirect service requests to one of the worker processes. Note
185 that everything else (like change notifications) go through
186 locally hosted services."""
187
188 callTime = 0.
189
193
195 "Intercept requests and send them down to workers"
196 svc = str(self.service.__class__).rsplit('.', 1)[0]
197 instance = self.service.instance
198 args = broker.unserialize(args)
199 kw = broker.unserialize(kw)
200
201
202
203 args = pickle.dumps( (args, kw) )
204 result = self.zenhub.deferToWorker( (svc, instance, message, args) )
205 return broker.serialize(result, self.perspective)
206
208 "Implement the HubService interface by forwarding to the local service"
209 return self.service.addListener(listener)
210
212 "Implement the HubService interface by forwarding to the local service"
213 return self.service.removeListener(listener)
214
216 "Implement the HubService interface by forwarding to the local service"
217 return self.service.update(object)
218
220 "Implement the HubService interface by forwarding to the local service"
221 return self.service.deleted(object)
222
223
225 """
226 Listen for changes to objects in the Zeo database and update the
227 collectors' configuration.
228
229 The remote collectors connect the ZenHub and request configuration
230 information and stay connected. When changes are detected in the
231 Zeo database, configuration updates are sent out to collectors
232 asynchronously. In this way, changes made in the web GUI can
233 affect collection immediately, instead of waiting for a
234 configuration cycle.
235
236 Each collector uses a different, pluggable service within ZenHub
237 to translate objects into configuration and data. ZenPacks can
238 add services for their collectors. Collectors communicate using
239 Twisted's Perspective Broker, which provides authenticated,
240 asynchronous, bidirectional method invocation.
241
242 ZenHub also provides an XmlRPC interface to some common services
243 to support collectors written in other languages.
244 """
245
246 totalTime = 0.
247 totalEvents = 0
248 totalCallTime = 0.
249 name = 'zenhub'
250
252 """
253 Hook ourselves up to the Zeo database and wait for collectors
254 to connect.
255 """
256 self.changes = []
257 self.workers = []
258 self.workList = []
259
260 ZCmdBase.__init__(self)
261 import Products
262 Products.Five.zcml.load_config('event.zcml', Products.Five)
263 self.zem = self.dmd.ZenEventManager
264 loadPlugins(self.dmd)
265 self.services = {}
266
267 er = HubRealm(self)
268 checker = self.loadChecker()
269 pt = portal.Portal(er, [checker])
270 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt))
271
272 xmlsvc = AuthXmlRpcService(self.dmd, checker)
273 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc))
274
275 self.sendEvent(eventClass=App_Start,
276 summary="%s started" % self.name,
277 severity=0)
278 reactor.callLater(5, self.processQueue)
279 self.rrdStats = self.getRRDStats()
280 for i in range(int(self.options.workers)):
281 self.createWorker()
282
296
298 """
299 Override the kind of zeo connection we have so we can listen
300 to Zeo object updates. Updates comes as OID invalidations.
301
302 @return: None
303 """
304 from ZEO.cache import ClientCache as ClientCacheBase
305 class ClientCache(object):
306 """
307 A ClientCache wrapper that hooks into invalidation so that zenhub
308 can notice when they occur.
309 """
310 def __init__(s, *args, **kwargs):
311 s._cache = ClientCacheBase(*args, **kwargs)
312
313 def __getattr__(s, name):
314 return getattr(s._cache, name)
315
316 def invalidate(s, oid, version, tid, server_invalidation=True):
317 self.changes.insert(0, oid)
318 return s._cache.invalidate(oid, version, tid,
319 server_invalidation)
320
321 from ZEO.ClientStorage import ClientStorage as ClientStorageBase
322 class ClientStorage(ClientStorageBase):
323 "Override the caching class to intercept messages"
324 ClientCacheClass = ClientCache
325
326 storage = ClientStorage((self.options.host, self.options.port),
327 client=self.options.pcachename,
328 var=self.options.pcachedir,
329 cache_size=self.options.pcachesize*1024*1024)
330 from ZODB import DB
331 self.db = DB(storage, cache_size=self.options.cachesize)
332
333
349
350
352 """
353 Perform one cycle of update notifications.
354
355 @return: None
356 """
357 while self.changes:
358 oid = self.changes.pop()
359 self.log.debug("Got oid %r" % oid)
360 obj = self.dmd._p_jar[oid]
361 self.log.debug("Object %r changed" % obj)
362 try:
363 obj = obj.__of__(self.dmd).primaryAq()
364 self.log.debug("Noticing object %s changed" % obj.getPrimaryUrlPath())
365 except (AttributeError, KeyError), ex:
366 self.log.debug("Noticing object %s " % obj)
367 for s in self.services.values():
368 s.deleted(obj)
369 else:
370 for s in self.services.values():
371 s.update(obj)
372
373
375 """
376 Useful method for posting events to the EventManager.
377
378 @type kw: keywords (dict)
379 @param kw: the values for an event: device, summary, etc.
380 @return: None
381 """
382 if not 'device' in kw:
383 kw['device'] = self.options.monitor
384 if not 'component' in kw:
385 kw['component'] = self.name
386 try:
387 self.zem.sendEvent(Event(**kw))
388 except:
389 self.log.exception("Unable to send an event")
390
392 """
393 Load the password file
394
395 @return: an object satisfying the ICredentialsChecker
396 interface using a password file or an empty list if the file
397 is not available. Uses the file specified in the --passwd
398 command line option.
399 """
400 try:
401 checker = checkers.FilePasswordDB(self.options.passwordfile)
402
403 u, p = checker._loadCredentials().next()
404 self.workerUsername, self.workerPassword = u, p
405 return checker
406 except Exception, ex:
407 self.log.exception("Unable to load %s", self.options.passwordfile)
408 return []
409
410
412 """
413 Helper method to load services dynamically for a collector.
414 Returned instances are cached: reconnecting collectors will
415 get the same service object.
416
417 @type name: string
418 @param name: the dotted-name of the module to load
419 (uses @L{Products.ZenUtils.Utils.importClass})
420 @param instance: string
421 @param instance: each service serves only one specific collector
422 instances (like 'localhost'). instance defines the collector's
423 instance name.
424 @return: a service loaded from ZenHub/services or one of the zenpacks.
425 """
426
427 if not self.dmd.Monitors.Performance._getOb(instance, False):
428 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \
429 self.options.monitor + " is not in the current list" )
430
431 try:
432 return self.services[name, instance]
433
434 except KeyError:
435 from Products.ZenUtils.Utils import importClass
436 try:
437 ctor = importClass(name)
438 except ImportError:
439 ctor = importClass('Products.ZenHub.services.%s' % name, name)
440 svc = ctor(self.dmd, instance)
441 if self.options.workers:
442 svc = WorkerInterceptor(self, svc)
443 self.services[name, instance] = svc
444 return svc
445
447 """Take a remote request and queue it for worker processes.
448
449 @type args: tuple
450 @param args: the arguments to the remote_execute() method in the worker
451 @return: a Deferred for the eventual results of the method call
452
453 """
454 d = defer.Deferred()
455 svcName, instance, method = args[:3]
456 service = self.getService(svcName, instance).service
457 priority = service.getMethodPriority(method)
458
459 if self.options.prioritize:
460
461 for i, job in enumerate(self.workList):
462 if priority < job[1]:
463 self.workList.insert(i, (d, priority, args) )
464 break
465 else:
466 self.workList.append( (d, priority, args) )
467 else:
468
469 self.workList.append( (d, priority, args) )
470
471 self.giveWorkToWorkers()
472 return d
473
474
476 """Parcel out a method invocation to an available worker process
477 """
478 self.log.debug("worklist has %d items", len(self.workList))
479 while self.workList:
480 for i, worker in enumerate(self.workers):
481
482 if not worker.busy:
483 job = self.getJobForWorker(i)
484 if job is None: continue
485 worker.busy = True
486 def finished(result, finishedWorker):
487 finishedWorker.busy = False
488 self.giveWorkToWorkers()
489 return result
490 self.log.debug("Giving %s to worker %d", job[2][2], i)
491 d2 = worker.callRemote('execute', *job[2])
492 d2.addBoth(finished, worker)
493 d2.chainDeferred(job[0])
494 break
495 else:
496 self.log.debug("all workers are busy")
497 break
498
500 if self.options.anyworker:
501 return self.workList.pop(0)
502 else:
503
504 lenWorkers = float(len(self.workers))
505 for i in range(len(self.workList)):
506 priority = self.workList[i][1]
507 if priority < (workerId+1) / lenWorkers:
508 return self.workList.pop(i)
509
511 """Start a worker subprocess
512
513 @return: None
514 """
515
516 if len(self.workers) >= self.options.workers:
517 return
518
519 import os, tempfile
520 fd, tmp = tempfile.mkstemp()
521 try:
522 os.write(fd, "hubport %s\n" % self.options.pbport)
523 os.write(fd, "username %s\n" % self.workerUsername)
524 os.write(fd, "password %s\n" % self.workerPassword)
525 os.write(fd, "host %s\n" % self.options.host)
526 os.write(fd, "logseverity %s\n" % self.options.logseverity)
527 os.write(fd, "cachesize %s\n" % self.options.cachesize)
528 finally:
529 os.close(fd)
530
531 exe = zenPath('bin', 'zenhubworker')
532
533
534 class WorkerRunningProtocol(protocol.ProcessProtocol):
535
536 def outReceived(s, data):
537 self.log.debug("Worker reports %s" % (data,))
538
539 def errReceived(s, data):
540 self.log.info("Worker reports %s" % (data,))
541
542 def processEnded(s, reason):
543 os.unlink(tmp)
544 self.log.warning("Worker exited with status: %d (%s)",
545 reason.value.exitCode,
546 getExitMessage(reason.value.exitCode))
547 args = (exe, 'run', '-C', tmp)
548 self.log.debug("Starting %s", ' '.join(args))
549 reactor.spawnProcess(WorkerRunningProtocol(), exe, args, os.environ)
550
552 """
553 Since we don't do anything on a regular basis, just
554 push heartbeats regularly.
555
556 @return: None
557 """
558 seconds = 30
559 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds)
560 self.zem.sendEvent(evt)
561 self.niceDoggie(seconds)
562 reactor.callLater(seconds, self.heartbeat)
563 r = self.rrdStats
564 totalTime = sum([s.callTime for s in self.services.values()])
565 self.zem.sendEvents(
566 r.counter('totalTime', seconds, int(self.totalTime * 1000)) +
567 r.counter('totalEvents', seconds, self.totalEvents) +
568 r.gauge('services', seconds, len(self.services)) +
569 r.counter('totalCallTime', seconds, totalTime) +
570 r.gauge('workListLength', seconds, len(self.workList))
571 )
572
573
575 """
576 Start the main event loop.
577 """
578 if self.options.cycle:
579 self.heartbeat()
580 reactor.run()
581 for worker in self.workers:
582 worker.transport.signalProcess('KILL')
583
584
586 """
587 Adds our command line options to ZCmdBase command line options.
588 """
589 ZCmdBase.buildOptions(self)
590 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport',
591 type='int', default=XML_RPC_PORT,
592 help='Port to use for XML-based Remote Procedure Calls (RPC)')
593 self.parser.add_option('--pbport', dest='pbport',
594 type='int', default=PB_PORT,
595 help="Port to use for Twisted's pb service")
596 self.parser.add_option('--passwd', dest='passwordfile',
597 type='string', default=zenPath('etc','hubpasswd'),
598 help='File where passwords are stored')
599 self.parser.add_option('--monitor', dest='monitor',
600 default='localhost',
601 help='Name of the distributed monitor this hub runs on')
602 self.parser.add_option('--workers', dest='workers',
603 type='int', default=0,
604 help="Number of worker instances to handle requests")
605 self.parser.add_option('--prioritize', dest='prioritize',
606 action='store_true', default=False,
607 help="Run higher priority jobs before lower priority ones")
608 self.parser.add_option('--anyworker', dest='anyworker',
609 action='store_true', default=False,
610 help='Allow any priority job to run on any worker')
611
612 if __name__ == '__main__':
613 z = ZenHub()
614 z.main()
615