Package ZenHub :: Module zenhub
[hide private]
[frames] | no frames]

Source Code for Module ZenHub.zenhub

  1  #! /usr/bin/env python  
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__ = """zenhub 
 16   
 17  Provide remote, authenticated, and possibly encrypted two-way 
 18  communications with the Model and Event databases. 
 19   
 20  """ 
 21   
 22  from XmlRpcService import XmlRpcService 
 23   
 24  import time 
 25  import pickle 
 26   
 27  from twisted.cred import portal, checkers, credentials 
 28  from twisted.spread import pb, banana 
 29  banana.SIZE_LIMIT = 1024 * 1024 * 10 
 30   
 31  from twisted.internet import reactor, protocol, defer 
 32  from twisted.web import server, xmlrpc 
 33  from zope.interface import implements 
 34   
 35  import Globals 
 36   
 37  from Products.DataCollector.Plugins import loadPlugins 
 38  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 39  from Products.ZenUtils.Utils import zenPath, getExitMessage, unused 
 40  from Products.ZenUtils.DaemonStats import DaemonStats 
 41  from Products.ZenEvents.Event import Event, EventHeartbeat 
 42  from Products.ZenEvents.ZenEventClasses import App_Start 
 43  from Products.ZenUtils.Utils import unused 
 44   
 45  from Products.ZenHub.PBDaemon import RemoteBadMonitor 
 46  pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor) 
 47   
 48  # Due to the manipulation of sys.path during the loading of plugins, 
 49  # we can get ObjectMap imported both as DataMaps.ObjectMap and the 
 50  # full-path from Products.  The following gets the class registered 
 51  # with the jelly serialization engine under both names: 
 52  # 
 53  #  1st: get Products.DataCollector.plugins.DataMaps.ObjectMap 
 54  from Products.DataCollector.plugins.DataMaps import ObjectMap 
 55  #  2nd: get DataMaps.ObjectMap 
 56  import sys 
 57  sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins')) 
 58  import DataMaps 
 59  unused(DataMaps, ObjectMap) 
 60   
 61   
 62   
 63  XML_RPC_PORT = 8081 
 64  PB_PORT = 8789 
 65   
66 -class AuthXmlRpcService(XmlRpcService):
67 "Provide some level of authentication for XML/RPC calls" 68
69 - def __init__(self, dmd, checker):
70 XmlRpcService.__init__(self, dmd) 71 self.checker = checker
72 73
74 - def doRender(self, unused, request):
75 """ 76 Call the inherited render engine after authentication succeeds. 77 See @L{XmlRpcService.XmlRpcService.Render}. 78 """ 79 return XmlRpcService.render(self, request)
80 81
82 - def unauthorized(self, request):
83 """ 84 Render an XMLRPC error indicating an authentication failure. 85 @type request: HTTPRequest 86 @param request: the request for this xmlrpc call. 87 @return: None 88 """ 89 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
90 91
92 - def render(self, request):
93 """ 94 Unpack the authorization header and check the credentials. 95 @type request: HTTPRequest 96 @param request: the request for this xmlrpc call. 97 @return: NOT_DONE_YET 98 """ 99 auth = request.received_headers.get('authorization', None) 100 if not auth: 101 self.unauthorized(request) 102 else: 103 try: 104 type, encoded = auth.split() 105 if type not in ('Basic',): 106 self.unauthorized(request) 107 else: 108 user, passwd = encoded.decode('base64').split(':') 109 c = credentials.UsernamePassword(user, passwd) 110 d = self.checker.requestAvatarId(c) 111 d.addCallback(self.doRender, request) 112 def error(unused, request): 113 self.unauthorized(request)
114 d.addErrback(error, request) 115 except Exception: 116 self.unauthorized(request) 117 return server.NOT_DONE_YET
118 119
120 -class HubAvitar(pb.Avatar):
121 """ 122 Connect collectors to their configuration Services 123 """ 124
125 - def __init__(self, hub):
126 self.hub = hub
127
128 - def perspective_getService(self, 129 serviceName, 130 instance = None, 131 listener = None):
132 """ 133 Allow a collector to find a Hub service by name. It also 134 associates the service with a collector so that changes can be 135 pushed back out to collectors. 136 137 @type serviceName: string 138 @param serviceName: a name, like 'EventService' 139 @type instance: string 140 @param instance: the collector's instance name, like 'localhost' 141 @type listener: a remote reference to the collector 142 @param listener: the callback interface to the collector 143 @return a remote reference to a service 144 """ 145 service = self.hub.getService(serviceName, instance) 146 if service is not None and listener: 147 service.addListener(listener) 148 return service
149
150 - def perspective_reportingForWork(self, worker):
151 """ 152 Allow a worker register for work. 153 154 @type worker: a pb.RemoteReference 155 @param worker: a reference to zenhubworker 156 @return None 157 """ 158 worker.busy = False 159 self.hub.workers.append(worker) 160 def removeWorker(worker): 161 if worker in self.hub.workers: 162 self.hub.workers.remove(worker) 163 reactor.callLater(1, self.hub.createWorker)
164 worker.notifyOnDisconnect(removeWorker)
165 166
167 -class HubRealm(object):
168 """ 169 Following the Twisted authentication framework. 170 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html 171 """ 172 implements(portal.IRealm) 173
174 - def __init__(self, hub):
175 self.hubAvitar = HubAvitar(hub)
176
177 - def requestAvatar(self, collName, mind, *interfaces):
178 if pb.IPerspective not in interfaces: 179 raise NotImplementedError 180 return pb.IPerspective, self.hubAvitar, lambda:None
181 182
183 -class WorkerInterceptor(pb.Referenceable):
184 """Redirect service requests to one of the worker processes. Note 185 that everything else (like change notifications) go through 186 locally hosted services.""" 187 188 callTime = 0. 189
190 - def __init__(self, zenhub, service):
191 self.zenhub = zenhub 192 self.service = service
193
194 - def remoteMessageReceived(self, broker, message, args, kw):
195 "Intercept requests and send them down to workers" 196 svc = str(self.service.__class__).rsplit('.', 1)[0] 197 instance = self.service.instance 198 args = broker.unserialize(args) 199 kw = broker.unserialize(kw) 200 # hide the types in the args: subverting the jelly protection mechanism, 201 # but the types just passed through and the worker may not have loaded 202 # the required service before we try passing types for that service 203 args = pickle.dumps( (args, kw) ) 204 result = self.zenhub.deferToWorker( (svc, instance, message, args) ) 205 return broker.serialize(result, self.perspective)
206
207 - def addListener(self, listener):
208 "Implement the HubService interface by forwarding to the local service" 209 return self.service.addListener(listener)
210
211 - def removeListener(self, listener):
212 "Implement the HubService interface by forwarding to the local service" 213 return self.service.removeListener(listener)
214
215 - def update(self, object):
216 "Implement the HubService interface by forwarding to the local service" 217 return self.service.update(object)
218
219 - def deleted(self, object):
220 "Implement the HubService interface by forwarding to the local service" 221 return self.service.deleted(object)
222 223
224 -class ZenHub(ZCmdBase):
225 """ 226 Listen for changes to objects in the Zeo database and update the 227 collectors' configuration. 228 229 The remote collectors connect the ZenHub and request configuration 230 information and stay connected. When changes are detected in the 231 Zeo database, configuration updates are sent out to collectors 232 asynchronously. In this way, changes made in the web GUI can 233 affect collection immediately, instead of waiting for a 234 configuration cycle. 235 236 Each collector uses a different, pluggable service within ZenHub 237 to translate objects into configuration and data. ZenPacks can 238 add services for their collectors. Collectors communicate using 239 Twisted's Perspective Broker, which provides authenticated, 240 asynchronous, bidirectional method invocation. 241 242 ZenHub also provides an XmlRPC interface to some common services 243 to support collectors written in other languages. 244 """ 245 246 totalTime = 0. 247 totalEvents = 0 248 totalCallTime = 0. 249 name = 'zenhub' 250
251 - def __init__(self):
252 """ 253 Hook ourselves up to the Zeo database and wait for collectors 254 to connect. 255 """ 256 self.changes = [] 257 self.workers = [] 258 self.workList = [] 259 260 ZCmdBase.__init__(self) 261 self.zem = self.dmd.ZenEventManager 262 loadPlugins(self.dmd) 263 self.services = {} 264 265 er = HubRealm(self) 266 checker = self.loadChecker() 267 pt = portal.Portal(er, [checker]) 268 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt)) 269 270 xmlsvc = AuthXmlRpcService(self.dmd, checker) 271 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc)) 272 273 self.sendEvent(eventClass=App_Start, 274 summary="%s started" % self.name, 275 severity=0) 276 reactor.callLater(5, self.processQueue) 277 self.rrdStats = self.getRRDStats() 278 for i in range(int(self.options.workers)): 279 self.createWorker()
280
281 - def getRRDStats(self):
282 """ 283 Return the most recent RRD statistic information. 284 """ 285 rrdStats = DaemonStats() 286 perfConf = self.dmd.Monitors.Performance._getOb(self.options.monitor, None) 287 rrdStats.configWithMonitor('zenhub', perfConf) 288 return rrdStats
289
290 - def zeoConnect(self):
291 """ 292 Override the kind of zeo connection we have so we can listen 293 to Zeo object updates. Updates comes as OID invalidations. 294 295 @return: None 296 """ 297 from ZEO.cache import ClientCache as ClientCacheBase 298 class ClientCache(ClientCacheBase): 299 "A sub-class to notice object invalidation notifications" 300 def invalidate(s, oid, version, tid): 301 self.changes.insert(0, oid) 302 ClientCacheBase.invalidate(s, oid, version, tid)
303 304 from ZEO.ClientStorage import ClientStorage as ClientStorageBase 305 class ClientStorage(ClientStorageBase): 306 "Override the caching class to intercept messages" 307 ClientCacheClass = ClientCache 308 309 storage = ClientStorage((self.options.host, self.options.port), 310 client=self.options.pcachename, 311 var=self.options.pcachedir, 312 cache_size=self.options.pcachesize*1024*1024) 313 from ZODB import DB 314 self.db = DB(storage, cache_size=self.options.cachesize) 315 316
317 - def processQueue(self):
318 """ 319 Periodically (once a second) process database changes 320 321 @return: None 322 """ 323 now = time.time() 324 self.syncdb() # reads the object invalidations 325 try: 326 self.doProcessQueue() 327 except Exception, ex: 328 self.log.exception(ex) 329 reactor.callLater(1, self.processQueue) 330 self.totalEvents += 1 331 self.totalTime += time.time() - now
332 333
334 - def doProcessQueue(self):
335 """ 336 Perform one cycle of update notifications. 337 338 @return: None 339 """ 340 while self.changes: 341 oid = self.changes.pop() 342 self.log.debug("Got oid %r" % oid) 343 obj = self.dmd._p_jar[oid] 344 self.log.debug("Object %r changed" % obj) 345 try: 346 obj = obj.__of__(self.dmd).primaryAq() 347 self.log.debug("Noticing object %s changed" % obj.getPrimaryUrlPath()) 348 except AttributeError, ex: 349 self.log.debug("Noticing object %s " % obj) 350 for s in self.services.values(): 351 s.deleted(obj) 352 else: 353 for s in self.services.values(): 354 s.update(obj)
355 356
357 - def sendEvent(self, **kw):
358 """ 359 Useful method for posting events to the EventManager. 360 361 @type kw: keywords (dict) 362 @param kw: the values for an event: device, summary, etc. 363 @return: None 364 """ 365 if not 'device' in kw: 366 kw['device'] = self.options.monitor 367 if not 'component' in kw: 368 kw['component'] = self.name 369 try: 370 self.zem.sendEvent(Event(**kw)) 371 except: 372 self.log.exception("Unable to send an event")
373
374 - def loadChecker(self):
375 """ 376 Load the password file 377 378 @return: an object satisfying the ICredentialsChecker 379 interface using a password file or an empty list if the file 380 is not available. Uses the file specified in the --passwd 381 command line option. 382 """ 383 try: 384 checker = checkers.FilePasswordDB(self.options.passwordfile) 385 # grab credentials for the workers to login 386 u, p = checker._loadCredentials().next() 387 self.workerUsername, self.workerPassword = u, p 388 return checker 389 except Exception, ex: 390 self.log.exception("Unable to load %s", self.options.passwordfile) 391 return []
392 393
394 - def getService(self, name, instance):
395 """ 396 Helper method to load services dynamically for a collector. 397 Returned instances are cached: reconnecting collectors will 398 get the same service object. 399 400 @type name: string 401 @param name: the dotted-name of the module to load 402 (uses @L{Products.ZenUtils.Utils.importClass}) 403 @param instance: string 404 @param instance: each service serves only one specific collector 405 instances (like 'localhost'). instance defines the collector's 406 instance name. 407 @return: a service loaded from ZenHub/services or one of the zenpacks. 408 """ 409 # Sanity check the names given to us 410 if not self.dmd.Monitors.Performance._getOb(instance, False): 411 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \ 412 self.options.monitor + " is not in the current list" ) 413 414 try: 415 return self.services[name, instance] 416 417 except KeyError: 418 from Products.ZenUtils.Utils import importClass 419 try: 420 ctor = importClass(name) 421 except ImportError: 422 ctor = importClass('Products.ZenHub.services.%s' % name, name) 423 svc = ctor(self.dmd, instance) 424 if self.options.workers: 425 svc = WorkerInterceptor(self, svc) 426 self.services[name, instance] = svc 427 return svc
428
429 - def deferToWorker(self, args):
430 """Take a remote request and queue it for worker processes. 431 432 @type args: tuple 433 @param args: the arguments to the remote_execute() method in the worker 434 @return: a Deferred for the eventual results of the method call 435 436 """ 437 d = defer.Deferred() 438 svcName, instance, method = args[:3] 439 service = self.getService(svcName, instance).service 440 priority = service.getMethodPriority(method) 441 442 if self.options.prioritize: 443 # Insert job into workList so that it stays sorted by priority. 444 for i, job in enumerate(self.workList): 445 if priority < job[1]: 446 self.workList.insert(i, (d, priority, args) ) 447 break 448 else: 449 self.workList.append( (d, priority, args) ) 450 else: 451 # Run jobs on a first come, first serve basis. 452 self.workList.append( (d, priority, args) ) 453 454 self.giveWorkToWorkers() 455 return d
456 457
458 - def giveWorkToWorkers(self):
459 """Parcel out a method invocation to an available worker process 460 """ 461 self.log.debug("worklist has %d items", len(self.workList)) 462 while self.workList: 463 for i, worker in enumerate(self.workers): 464 # linear search is not ideal, but simple enough 465 if not worker.busy: 466 job = self.getJobForWorker(i) 467 if job is None: continue 468 worker.busy = True 469 def finished(result, finishedWorker): 470 finishedWorker.busy = False 471 self.giveWorkToWorkers() 472 return result
473 self.log.debug("Giving %s to worker %d", job[2][2], i) 474 d2 = worker.callRemote('execute', *job[2]) 475 d2.addBoth(finished, worker) 476 d2.chainDeferred(job[0]) 477 break 478 else: 479 self.log.debug("all workers are busy") 480 break 481
482 - def getJobForWorker(self, workerId):
483 if self.options.anyworker: 484 return self.workList.pop(0) 485 else: 486 # Restrict lower priority jobs to a subset of the workers. 487 lenWorkers = float(len(self.workers)) 488 for i in range(len(self.workList)): 489 priority = self.workList[i][1] 490 if priority < (workerId+1) / lenWorkers: 491 return self.workList.pop(i)
492
493 - def createWorker(self):
494 """Start a worker subprocess 495 496 @return: None 497 """ 498 # this probably can't happen, but let's make sure 499 if len(self.workers) >= self.options.workers: 500 return 501 # create a config file for the slave to pass credentials 502 import os, tempfile 503 fd, tmp = tempfile.mkstemp() 504 try: 505 os.write(fd, "hubport %s\n" % self.options.pbport) 506 os.write(fd, "username %s\n" % self.workerUsername) 507 os.write(fd, "password %s\n" % self.workerPassword) 508 os.write(fd, "host %s\n" % self.options.host) 509 os.write(fd, "logseverity %s\n" % self.options.logseverity) 510 os.write(fd, "cachesize %s\n" % self.options.cachesize) 511 finally: 512 os.close(fd) 513 # start the worker 514 exe = zenPath('bin', 'zenhubworker') 515 516 # watch for output, and generally just take notice 517 class WorkerRunningProtocol(protocol.ProcessProtocol): 518 519 def outReceived(s, data): 520 self.log.debug("Worker reports %s" % (data,))
521 522 def errReceived(s, data): 523 self.log.info("Worker reports %s" % (data,)) 524 525 def processEnded(s, reason): 526 os.unlink(tmp) 527 self.log.warning("Worker exited with status: %d (%s)", 528 reason.value.exitCode, 529 getExitMessage(reason.value.exitCode)) 530 args = (exe, 'run', '-C', tmp) 531 self.log.debug("Starting %s", ' '.join(args)) 532 reactor.spawnProcess(WorkerRunningProtocol(), exe, args, os.environ) 533
534 - def heartbeat(self):
535 """ 536 Since we don't do anything on a regular basis, just 537 push heartbeats regularly. 538 539 @return: None 540 """ 541 seconds = 30 542 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds) 543 self.zem.sendEvent(evt) 544 self.niceDoggie(seconds) 545 reactor.callLater(seconds, self.heartbeat) 546 r = self.rrdStats 547 totalTime = sum([s.callTime for s in self.services.values()]) 548 self.zem.sendEvents( 549 r.counter('totalTime', seconds, int(self.totalTime * 1000)) + 550 r.counter('totalEvents', seconds, self.totalEvents) + 551 r.gauge('services', seconds, len(self.services)) + 552 r.counter('totalCallTime', seconds, totalTime) + 553 r.gauge('workListLength', seconds, len(self.workList)) 554 )
555 556
557 - def main(self):
558 """ 559 Start the main event loop. 560 """ 561 if self.options.cycle: 562 self.heartbeat() 563 reactor.run() 564 for worker in self.workers: 565 worker.transport.signalProcess('KILL')
566 567
568 - def buildOptions(self):
569 """ 570 Adds our command line options to ZCmdBase command line options. 571 """ 572 ZCmdBase.buildOptions(self) 573 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport', 574 type='int', default=XML_RPC_PORT, 575 help='Port to use for XML-based Remote Procedure Calls (RPC)') 576 self.parser.add_option('--pbport', dest='pbport', 577 type='int', default=PB_PORT, 578 help="Port to use for Twisted's pb service") 579 self.parser.add_option('--passwd', dest='passwordfile', 580 type='string', default=zenPath('etc','hubpasswd'), 581 help='File where passwords are stored') 582 self.parser.add_option('--monitor', dest='monitor', 583 default='localhost', 584 help='Name of the distributed monitor this hub runs on') 585 self.parser.add_option('--workers', dest='workers', 586 type='int', default=0, 587 help="Number of worker instances to handle requests") 588 self.parser.add_option('--prioritize', dest='prioritize', 589 action='store_true', default=False, 590 help="Run higher priority jobs before lower priority ones") 591 self.parser.add_option('--anyworker', dest='anyworker', 592 action='store_true', default=False, 593 help='Allow any priority job to run on any worker')
594 595 if __name__ == '__main__': 596 z = ZenHub() 597 z.main() 598