Package ZenHub :: Module zenhub
[hide private]
[frames] | no frames]

Source Code for Module ZenHub.zenhub

  1  #! /usr/bin/env python  
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__ = """zenhub 
 16   
 17  Provide remote, authenticated, and possibly encrypted two-way 
 18  communications with the Model and Event databases. 
 19   
 20  """ 
 21   
 22  from XmlRpcService import XmlRpcService 
 23   
 24  import time 
 25  import pickle 
 26   
 27  from twisted.cred import portal, checkers, credentials 
 28  from twisted.spread import pb, banana 
 29  banana.SIZE_LIMIT = 1024 * 1024 * 10 
 30   
 31  from twisted.internet import reactor, protocol, defer 
 32  from twisted.web import server, xmlrpc 
 33  from zope.interface import implements 
 34   
 35  import Globals 
 36   
 37  from Products.DataCollector.Plugins import loadPlugins 
 38  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 39  from Products.ZenUtils.Utils import zenPath, getExitMessage, unused 
 40  from Products.ZenUtils.DaemonStats import DaemonStats 
 41  from Products.ZenEvents.Event import Event, EventHeartbeat 
 42  from Products.ZenEvents.ZenEventClasses import App_Start 
 43  from Products.ZenUtils.Utils import unused 
 44   
 45  from Products.ZenHub.PBDaemon import RemoteBadMonitor 
 46  pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor) 
 47   
 48  # Due to the manipulation of sys.path during the loading of plugins, 
 49  # we can get ObjectMap imported both as DataMaps.ObjectMap and the 
 50  # full-path from Products.  The following gets the class registered 
 51  # with the jelly serialization engine under both names: 
 52  # 
 53  #  1st: get Products.DataCollector.plugins.DataMaps.ObjectMap 
 54  from Products.DataCollector.plugins.DataMaps import ObjectMap 
 55  #  2nd: get DataMaps.ObjectMap 
 56  import sys 
 57  sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins')) 
 58  import DataMaps 
 59  unused(DataMaps, ObjectMap) 
 60   
 61   
 62   
 63  XML_RPC_PORT = 8081 
 64  PB_PORT = 8789 
 65   
66 -class AuthXmlRpcService(XmlRpcService):
67 "Provide some level of authentication for XML/RPC calls" 68
69 - def __init__(self, dmd, checker):
70 XmlRpcService.__init__(self, dmd) 71 self.checker = checker
72 73
74 - def doRender(self, unused, request):
75 """ 76 Call the inherited render engine after authentication succeeds. 77 See @L{XmlRpcService.XmlRpcService.Render}. 78 """ 79 return XmlRpcService.render(self, request)
80 81
82 - def unauthorized(self, request):
83 """ 84 Render an XMLRPC error indicating an authentication failure. 85 @type request: HTTPRequest 86 @param request: the request for this xmlrpc call. 87 @return: None 88 """ 89 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
90 91
92 - def render(self, request):
93 """ 94 Unpack the authorization header and check the credentials. 95 @type request: HTTPRequest 96 @param request: the request for this xmlrpc call. 97 @return: NOT_DONE_YET 98 """ 99 auth = request.received_headers.get('authorization', None) 100 if not auth: 101 self.unauthorized(request) 102 else: 103 try: 104 type, encoded = auth.split() 105 if type not in ('Basic',): 106 self.unauthorized(request) 107 else: 108 user, passwd = encoded.decode('base64').split(':') 109 c = credentials.UsernamePassword(user, passwd) 110 d = self.checker.requestAvatarId(c) 111 d.addCallback(self.doRender, request) 112 def error(unused, request): 113 self.unauthorized(request)
114 d.addErrback(error, request) 115 except Exception: 116 self.unauthorized(request) 117 return server.NOT_DONE_YET
118 119
120 -class HubAvitar(pb.Avatar):
121 """ 122 Connect collectors to their configuration Services 123 """ 124
125 - def __init__(self, hub):
126 self.hub = hub
127
128 - def perspective_getService(self, 129 serviceName, 130 instance = None, 131 listener = None):
132 """ 133 Allow a collector to find a Hub service by name. It also 134 associates the service with a collector so that changes can be 135 pushed back out to collectors. 136 137 @type serviceName: string 138 @param serviceName: a name, like 'EventService' 139 @type instance: string 140 @param instance: the collector's instance name, like 'localhost' 141 @type listener: a remote reference to the collector 142 @param listener: the callback interface to the collector 143 @return a remote reference to a service 144 """ 145 service = self.hub.getService(serviceName, instance) 146 if service is not None and listener: 147 service.addListener(listener) 148 return service
149
150 - def perspective_reportingForWork(self, worker):
151 """ 152 Allow a worker register for work. 153 154 @type worker: a pb.RemoteReference 155 @param worker: a reference to zenhubworker 156 @return None 157 """ 158 worker.busy = False 159 self.hub.workers.append(worker) 160 def removeWorker(worker): 161 if worker in self.hub.workers: 162 self.hub.workers.remove(worker) 163 reactor.callLater(1, self.hub.createWorker)
164 worker.notifyOnDisconnect(removeWorker)
165 166
167 -class HubRealm(object):
168 """ 169 Following the Twisted authentication framework. 170 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html 171 """ 172 implements(portal.IRealm) 173
174 - def __init__(self, hub):
175 self.hubAvitar = HubAvitar(hub)
176
177 - def requestAvatar(self, collName, mind, *interfaces):
178 if pb.IPerspective not in interfaces: 179 raise NotImplementedError 180 return pb.IPerspective, self.hubAvitar, lambda:None
181 182
183 -class WorkerInterceptor(pb.Referenceable):
184 """Redirect service requests to one of the worker processes. Note 185 that everything else (like change notifications) go through 186 locally hosted services.""" 187 188 callTime = 0. 189
190 - def __init__(self, zenhub, service):
191 self.zenhub = zenhub 192 self.service = service
193
194 - def remoteMessageReceived(self, broker, message, args, kw):
195 "Intercept requests and send them down to workers" 196 svc = str(self.service.__class__).rsplit('.', 1)[0] 197 instance = self.service.instance 198 args = broker.unserialize(args) 199 kw = broker.unserialize(kw) 200 # hide the types in the args: subverting the jelly protection mechanism, 201 # but the types just passed through and the worker may not have loaded 202 # the required service before we try passing types for that service 203 args = pickle.dumps( (args, kw) ) 204 result = self.zenhub.deferToWorker( (svc, instance, message, args) ) 205 return broker.serialize(result, self.perspective)
206
207 - def addListener(self, listener):
208 "Implement the HubService interface by forwarding to the local service" 209 return self.service.addListener(listener)
210
211 - def removeListener(self, listener):
212 "Implement the HubService interface by forwarding to the local service" 213 return self.service.removeListener(listener)
214
215 - def update(self, object):
216 "Implement the HubService interface by forwarding to the local service" 217 return self.service.update(object)
218
219 - def deleted(self, object):
220 "Implement the HubService interface by forwarding to the local service" 221 return self.service.deleted(object)
222 223
224 -class ZenHub(ZCmdBase):
225 """ 226 Listen for changes to objects in the Zeo database and update the 227 collectors' configuration. 228 229 The remote collectors connect the ZenHub and request configuration 230 information and stay connected. When changes are detected in the 231 Zeo database, configuration updates are sent out to collectors 232 asynchronously. In this way, changes made in the web GUI can 233 affect collection immediately, instead of waiting for a 234 configuration cycle. 235 236 Each collector uses a different, pluggable service within ZenHub 237 to translate objects into configuration and data. ZenPacks can 238 add services for their collectors. Collectors communicate using 239 Twisted's Perspective Broker, which provides authenticated, 240 asynchronous, bidirectional method invocation. 241 242 ZenHub also provides an XmlRPC interface to some common services 243 to support collectors written in other languages. 244 """ 245 246 totalTime = 0. 247 totalEvents = 0 248 totalCallTime = 0. 249 name = 'zenhub' 250
251 - def __init__(self):
252 """ 253 Hook ourselves up to the Zeo database and wait for collectors 254 to connect. 255 """ 256 self.changes = [] 257 self.workers = [] 258 self.workList = [] 259 260 ZCmdBase.__init__(self) 261 self.zem = self.dmd.ZenEventManager 262 loadPlugins(self.dmd) 263 self.services = {} 264 265 er = HubRealm(self) 266 checker = self.loadChecker() 267 pt = portal.Portal(er, [checker]) 268 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt)) 269 270 xmlsvc = AuthXmlRpcService(self.dmd, checker) 271 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc)) 272 273 self.sendEvent(eventClass=App_Start, 274 summary="%s started" % self.name, 275 severity=0) 276 reactor.callLater(5, self.processQueue) 277 self.rrdStats = self.getRRDStats() 278 for i in range(int(self.options.workers)): 279 self.createWorker()
280
281 - def getRRDStats(self):
282 """ 283 Return the most recent RRD statistic information. 284 """ 285 rrdStats = DaemonStats() 286 perfConf = self.dmd.Monitors.Performance._getOb(self.options.monitor, None) 287 288 from Products.ZenModel.BuiltInDS import BuiltInDS 289 threshs = perfConf.getThresholdInstances(BuiltInDS.sourcetype) 290 createCommand = getattr(perfConf, 'defaultRRDCreateCommand', None) 291 rrdStats.config(perfConf.id, 'zenhub', threshs, createCommand) 292 293 return rrdStats
294
295 - def zeoConnect(self):
296 """ 297 Override the kind of zeo connection we have so we can listen 298 to Zeo object updates. Updates comes as OID invalidations. 299 300 @return: None 301 """ 302 from ZEO.cache import ClientCache as ClientCacheBase 303 class ClientCache(ClientCacheBase): 304 "A sub-class to notice object invalidation notifications" 305 def invalidate(s, oid, version, tid): 306 self.changes.insert(0, oid) 307 ClientCacheBase.invalidate(s, oid, version, tid)
308 309 from ZEO.ClientStorage import ClientStorage as ClientStorageBase 310 class ClientStorage(ClientStorageBase): 311 "Override the caching class to intercept messages" 312 ClientCacheClass = ClientCache 313 314 storage = ClientStorage((self.options.host, self.options.port), 315 client=self.options.pcachename, 316 var=self.options.pcachedir, 317 cache_size=self.options.pcachesize*1024*1024) 318 from ZODB import DB 319 self.db = DB(storage, cache_size=self.options.cachesize) 320 321
322 - def processQueue(self):
323 """ 324 Periodically (once a second) process database changes 325 326 @return: None 327 """ 328 now = time.time() 329 self.syncdb() # reads the object invalidations 330 try: 331 self.doProcessQueue() 332 except Exception, ex: 333 self.log.exception(ex) 334 reactor.callLater(1, self.processQueue) 335 self.totalEvents += 1 336 self.totalTime += time.time() - now
337 338
339 - def doProcessQueue(self):
340 """ 341 Perform one cycle of update notifications. 342 343 @return: None 344 """ 345 while self.changes: 346 oid = self.changes.pop() 347 self.log.debug("Got oid %r" % oid) 348 obj = self.dmd._p_jar[oid] 349 self.log.debug("Object %r changed" % obj) 350 try: 351 obj = obj.__of__(self.dmd).primaryAq() 352 self.log.debug("Noticing object %s changed" % obj.getPrimaryUrlPath()) 353 except AttributeError, ex: 354 self.log.debug("Noticing object %s " % obj) 355 for s in self.services.values(): 356 s.deleted(obj) 357 else: 358 for s in self.services.values(): 359 s.update(obj)
360 361
362 - def sendEvent(self, **kw):
363 """ 364 Useful method for posting events to the EventManager. 365 366 @type kw: keywords (dict) 367 @param kw: the values for an event: device, summary, etc. 368 @return: None 369 """ 370 if not 'device' in kw: 371 kw['device'] = self.options.monitor 372 if not 'component' in kw: 373 kw['component'] = self.name 374 try: 375 self.zem.sendEvent(Event(**kw)) 376 except: 377 self.log.exception("Unable to send an event")
378
379 - def loadChecker(self):
380 """ 381 Load the password file 382 383 @return: an object satisfying the ICredentialsChecker 384 interface using a password file or an empty list if the file 385 is not available. Uses the file specified in the --passwd 386 command line option. 387 """ 388 try: 389 checker = checkers.FilePasswordDB(self.options.passwordfile) 390 # grab credentials for the workers to login 391 u, p = checker._loadCredentials().next() 392 self.workerUsername, self.workerPassword = u, p 393 return checker 394 except Exception, ex: 395 self.log.exception("Unable to load %s", self.options.passwordfile) 396 return []
397 398
399 - def getService(self, name, instance):
400 """ 401 Helper method to load services dynamically for a collector. 402 Returned instances are cached: reconnecting collectors will 403 get the same service object. 404 405 @type name: string 406 @param name: the dotted-name of the module to load 407 (uses @L{Products.ZenUtils.Utils.importClass}) 408 @param instance: string 409 @param instance: each service serves only one specific collector 410 instances (like 'localhost'). instance defines the collector's 411 instance name. 412 @return: a service loaded from ZenHub/services or one of the zenpacks. 413 """ 414 # Sanity check the names given to us 415 if not self.dmd.Monitors.Performance._getOb(instance, False): 416 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \ 417 self.options.monitor + " is not in the current list" ) 418 419 try: 420 return self.services[name, instance] 421 422 except KeyError: 423 from Products.ZenUtils.Utils import importClass 424 try: 425 ctor = importClass(name) 426 except ImportError: 427 ctor = importClass('Products.ZenHub.services.%s' % name, name) 428 svc = ctor(self.dmd, instance) 429 if self.options.workers: 430 svc = WorkerInterceptor(self, svc) 431 self.services[name, instance] = svc 432 return svc
433
434 - def deferToWorker(self, args):
435 """Take a remote request and queue it for worker processes. 436 437 @type args: tuple 438 @param args: the arguments to the remote_execute() method in the worker 439 @return: a Deferred for the eventual results of the method call 440 441 """ 442 d = defer.Deferred() 443 svcName, instance, method = args[:3] 444 service = self.getService(svcName, instance).service 445 priority = service.getMethodPriority(method) 446 447 if self.options.prioritize: 448 # Insert job into workList so that it stays sorted by priority. 449 for i, job in enumerate(self.workList): 450 if priority < job[1]: 451 self.workList.insert(i, (d, priority, args) ) 452 break 453 else: 454 self.workList.append( (d, priority, args) ) 455 else: 456 # Run jobs on a first come, first serve basis. 457 self.workList.append( (d, priority, args) ) 458 459 self.giveWorkToWorkers() 460 return d
461 462
463 - def giveWorkToWorkers(self):
464 """Parcel out a method invocation to an available worker process 465 """ 466 self.log.debug("worklist has %d items", len(self.workList)) 467 while self.workList: 468 for i, worker in enumerate(self.workers): 469 # linear search is not ideal, but simple enough 470 if not worker.busy: 471 job = self.getJobForWorker(i) 472 if job is None: continue 473 worker.busy = True 474 def finished(result, finishedWorker): 475 finishedWorker.busy = False 476 self.giveWorkToWorkers() 477 return result
478 self.log.debug("Giving %s to worker %d", job[2][2], i) 479 d2 = worker.callRemote('execute', *job[2]) 480 d2.addBoth(finished, worker) 481 d2.chainDeferred(job[0]) 482 break 483 else: 484 self.log.debug("all workers are busy") 485 break 486
487 - def getJobForWorker(self, workerId):
488 if self.options.anyworker: 489 return self.workList.pop(0) 490 else: 491 # Restrict lower priority jobs to a subset of the workers. 492 lenWorkers = float(len(self.workers)) 493 for i in range(len(self.workList)): 494 priority = self.workList[i][1] 495 if priority < (workerId+1) / lenWorkers: 496 return self.workList.pop(i)
497
498 - def createWorker(self):
499 """Start a worker subprocess 500 501 @return: None 502 """ 503 # this probably can't happen, but let's make sure 504 if len(self.workers) >= self.options.workers: 505 return 506 # create a config file for the slave to pass credentials 507 import os, tempfile 508 fd, tmp = tempfile.mkstemp() 509 try: 510 os.write(fd, "hubport %s\n" % self.options.pbport) 511 os.write(fd, "username %s\n" % self.workerUsername) 512 os.write(fd, "password %s\n" % self.workerPassword) 513 os.write(fd, "host %s\n" % self.options.host) 514 os.write(fd, "logseverity %s\n" % self.options.logseverity) 515 os.write(fd, "cachesize %s\n" % self.options.cachesize) 516 finally: 517 os.close(fd) 518 # start the worker 519 exe = zenPath('bin', 'zenhubworker') 520 521 # watch for output, and generally just take notice 522 class WorkerRunningProtocol(protocol.ProcessProtocol): 523 524 def outReceived(s, data): 525 self.log.debug("Worker reports %s" % (data,))
526 527 def errReceived(s, data): 528 self.log.info("Worker reports %s" % (data,)) 529 530 def processEnded(s, reason): 531 os.unlink(tmp) 532 self.log.warning("Worker exited with status: %d (%s)", 533 reason.value.exitCode, 534 getExitMessage(reason.value.exitCode)) 535 args = (exe, 'run', '-C', tmp) 536 self.log.debug("Starting %s", ' '.join(args)) 537 reactor.spawnProcess(WorkerRunningProtocol(), exe, args, os.environ) 538
539 - def heartbeat(self):
540 """ 541 Since we don't do anything on a regular basis, just 542 push heartbeats regularly. 543 544 @return: None 545 """ 546 seconds = 30 547 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds) 548 self.zem.sendEvent(evt) 549 self.niceDoggie(seconds) 550 reactor.callLater(seconds, self.heartbeat) 551 r = self.rrdStats 552 totalTime = sum([s.callTime for s in self.services.values()]) 553 self.zem.sendEvents( 554 r.counter('totalTime', seconds, int(self.totalTime * 1000)) + 555 r.counter('totalEvents', seconds, self.totalEvents) + 556 r.gauge('services', seconds, len(self.services)) + 557 r.counter('totalCallTime', seconds, totalTime) + 558 r.gauge('workListLength', seconds, len(self.workList)) 559 )
560 561
562 - def main(self):
563 """ 564 Start the main event loop. 565 """ 566 if self.options.cycle: 567 self.heartbeat() 568 reactor.run() 569 for worker in self.workers: 570 worker.transport.signalProcess('KILL')
571 572
573 - def buildOptions(self):
574 """ 575 Adds our command line options to ZCmdBase command line options. 576 """ 577 ZCmdBase.buildOptions(self) 578 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport', 579 type='int', default=XML_RPC_PORT, 580 help='Port to use for XML-based Remote Procedure Calls (RPC)') 581 self.parser.add_option('--pbport', dest='pbport', 582 type='int', default=PB_PORT, 583 help="Port to use for Twisted's pb service") 584 self.parser.add_option('--passwd', dest='passwordfile', 585 type='string', default=zenPath('etc','hubpasswd'), 586 help='File where passwords are stored') 587 self.parser.add_option('--monitor', dest='monitor', 588 default='localhost', 589 help='Name of the distributed monitor this hub runs on') 590 self.parser.add_option('--workers', dest='workers', 591 type='int', default=0, 592 help="Number of worker instances to handle requests") 593 self.parser.add_option('--prioritize', dest='prioritize', 594 action='store_true', default=False, 595 help="Run higher priority jobs before lower priority ones") 596 self.parser.add_option('--anyworker', dest='anyworker', 597 action='store_true', default=False, 598 help='Allow any priority job to run on any worker')
599 600 if __name__ == '__main__': 601 z = ZenHub() 602 z.main() 603