Package Products :: Package ZenHub :: Module zenhub
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.zenhub

  1  #! /usr/bin/env python  
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__ = """zenhub 
 16   
 17  Provide remote, authenticated, and possibly encrypted two-way 
 18  communications with the Model and Event databases. 
 19   
 20  """ 
 21  import Globals 
 22   
 23  from XmlRpcService import XmlRpcService 
 24   
 25  import time 
 26  import pickle 
 27   
 28  from twisted.cred import portal, checkers, credentials 
 29  from twisted.spread import pb, banana 
 30  banana.SIZE_LIMIT = 1024 * 1024 * 10 
 31   
 32  from twisted.internet import reactor, protocol, defer 
 33  from twisted.web import server, xmlrpc 
 34  from zope.interface import implements 
 35   
 36   
 37  from Products.DataCollector.Plugins import loadPlugins 
 38  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 39  from Products.ZenUtils.Utils import zenPath, getExitMessage, unused 
 40  from Products.ZenUtils.DaemonStats import DaemonStats 
 41  from Products.ZenEvents.Event import Event, EventHeartbeat 
 42  from Products.ZenEvents.ZenEventClasses import App_Start 
 43  from Products.ZenUtils.Utils import unused 
 44   
 45  from Products.ZenHub.PBDaemon import RemoteBadMonitor 
 46  pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor) 
 47   
 48  # Due to the manipulation of sys.path during the loading of plugins, 
 49  # we can get ObjectMap imported both as DataMaps.ObjectMap and the 
 50  # full-path from Products.  The following gets the class registered 
 51  # with the jelly serialization engine under both names: 
 52  # 
 53  #  1st: get Products.DataCollector.plugins.DataMaps.ObjectMap 
 54  from Products.DataCollector.plugins.DataMaps import ObjectMap 
 55  #  2nd: get DataMaps.ObjectMap 
 56  import sys 
 57  sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins')) 
 58  import DataMaps 
 59  unused(DataMaps, ObjectMap) 
 60   
 61   
 62   
 63  XML_RPC_PORT = 8081 
 64  PB_PORT = 8789 
 65   
66 -class AuthXmlRpcService(XmlRpcService):
67 "Provide some level of authentication for XML/RPC calls" 68
69 - def __init__(self, dmd, checker):
70 XmlRpcService.__init__(self, dmd) 71 self.checker = checker
72 73
74 - def doRender(self, unused, request):
75 """ 76 Call the inherited render engine after authentication succeeds. 77 See @L{XmlRpcService.XmlRpcService.Render}. 78 """ 79 return XmlRpcService.render(self, request)
80 81
82 - def unauthorized(self, request):
83 """ 84 Render an XMLRPC error indicating an authentication failure. 85 @type request: HTTPRequest 86 @param request: the request for this xmlrpc call. 87 @return: None 88 """ 89 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
90 91
92 - def render(self, request):
93 """ 94 Unpack the authorization header and check the credentials. 95 @type request: HTTPRequest 96 @param request: the request for this xmlrpc call. 97 @return: NOT_DONE_YET 98 """ 99 auth = request.received_headers.get('authorization', None) 100 if not auth: 101 self.unauthorized(request) 102 else: 103 try: 104 type, encoded = auth.split() 105 if type not in ('Basic',): 106 self.unauthorized(request) 107 else: 108 user, passwd = encoded.decode('base64').split(':') 109 c = credentials.UsernamePassword(user, passwd) 110 d = self.checker.requestAvatarId(c) 111 d.addCallback(self.doRender, request) 112 def error(unused, request): 113 self.unauthorized(request)
114 d.addErrback(error, request) 115 except Exception: 116 self.unauthorized(request) 117 return server.NOT_DONE_YET
118 119
120 -class HubAvitar(pb.Avatar):
121 """ 122 Connect collectors to their configuration Services 123 """ 124
125 - def __init__(self, hub):
126 self.hub = hub
127
128 - def perspective_getService(self, 129 serviceName, 130 instance = None, 131 listener = None):
132 """ 133 Allow a collector to find a Hub service by name. It also 134 associates the service with a collector so that changes can be 135 pushed back out to collectors. 136 137 @type serviceName: string 138 @param serviceName: a name, like 'EventService' 139 @type instance: string 140 @param instance: the collector's instance name, like 'localhost' 141 @type listener: a remote reference to the collector 142 @param listener: the callback interface to the collector 143 @return a remote reference to a service 144 """ 145 service = self.hub.getService(serviceName, instance) 146 if service is not None and listener: 147 service.addListener(listener) 148 return service
149
150 - def perspective_reportingForWork(self, worker):
151 """ 152 Allow a worker register for work. 153 154 @type worker: a pb.RemoteReference 155 @param worker: a reference to zenhubworker 156 @return None 157 """ 158 worker.busy = False 159 self.hub.workers.append(worker) 160 def removeWorker(worker): 161 if worker in self.hub.workers: 162 self.hub.workers.remove(worker) 163 reactor.callLater(1, self.hub.createWorker)
164 worker.notifyOnDisconnect(removeWorker)
165 166
167 -class HubRealm(object):
168 """ 169 Following the Twisted authentication framework. 170 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html 171 """ 172 implements(portal.IRealm) 173
174 - def __init__(self, hub):
175 self.hubAvitar = HubAvitar(hub)
176
177 - def requestAvatar(self, collName, mind, *interfaces):
178 if pb.IPerspective not in interfaces: 179 raise NotImplementedError 180 return pb.IPerspective, self.hubAvitar, lambda:None
181 182
183 -class WorkerInterceptor(pb.Referenceable):
184 """Redirect service requests to one of the worker processes. Note 185 that everything else (like change notifications) go through 186 locally hosted services.""" 187 188 callTime = 0. 189
190 - def __init__(self, zenhub, service):
191 self.zenhub = zenhub 192 self.service = service
193
194 - def remoteMessageReceived(self, broker, message, args, kw):
195 "Intercept requests and send them down to workers" 196 svc = str(self.service.__class__).rsplit('.', 1)[0] 197 instance = self.service.instance 198 args = broker.unserialize(args) 199 kw = broker.unserialize(kw) 200 # hide the types in the args: subverting the jelly protection mechanism, 201 # but the types just passed through and the worker may not have loaded 202 # the required service before we try passing types for that service 203 args = pickle.dumps( (args, kw) ) 204 result = self.zenhub.deferToWorker( (svc, instance, message, args) ) 205 return broker.serialize(result, self.perspective)
206
207 - def addListener(self, listener):
208 "Implement the HubService interface by forwarding to the local service" 209 return self.service.addListener(listener)
210
211 - def removeListener(self, listener):
212 "Implement the HubService interface by forwarding to the local service" 213 return self.service.removeListener(listener)
214
215 - def update(self, object):
216 "Implement the HubService interface by forwarding to the local service" 217 return self.service.update(object)
218
219 - def deleted(self, object):
220 "Implement the HubService interface by forwarding to the local service" 221 return self.service.deleted(object)
222 223
224 -class ZenHub(ZCmdBase):
225 """ 226 Listen for changes to objects in the Zeo database and update the 227 collectors' configuration. 228 229 The remote collectors connect the ZenHub and request configuration 230 information and stay connected. When changes are detected in the 231 Zeo database, configuration updates are sent out to collectors 232 asynchronously. In this way, changes made in the web GUI can 233 affect collection immediately, instead of waiting for a 234 configuration cycle. 235 236 Each collector uses a different, pluggable service within ZenHub 237 to translate objects into configuration and data. ZenPacks can 238 add services for their collectors. Collectors communicate using 239 Twisted's Perspective Broker, which provides authenticated, 240 asynchronous, bidirectional method invocation. 241 242 ZenHub also provides an XmlRPC interface to some common services 243 to support collectors written in other languages. 244 """ 245 246 totalTime = 0. 247 totalEvents = 0 248 totalCallTime = 0. 249 name = 'zenhub' 250
251 - def __init__(self):
252 """ 253 Hook ourselves up to the Zeo database and wait for collectors 254 to connect. 255 """ 256 self.changes = [] 257 self.workers = [] 258 self.workList = [] 259 self.worker_processes=set() 260 261 ZCmdBase.__init__(self) 262 self.zem = self.dmd.ZenEventManager 263 loadPlugins(self.dmd) 264 self.services = {} 265 266 er = HubRealm(self) 267 checker = self.loadChecker() 268 pt = portal.Portal(er, [checker]) 269 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt)) 270 271 xmlsvc = AuthXmlRpcService(self.dmd, checker) 272 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc)) 273 274 275 self.sendEvent(eventClass=App_Start, 276 summary="%s started" % self.name, 277 severity=0) 278 reactor.callLater(5, self.processQueue) 279 self.rrdStats = self.getRRDStats() 280 for i in range(int(self.options.workers)): 281 self.createWorker()
282
283 - def getRRDStats(self):
284 """ 285 Return the most recent RRD statistic information. 286 """ 287 rrdStats = DaemonStats() 288 perfConf = self.dmd.Monitors.Performance._getOb(self.options.monitor, None) 289 290 from Products.ZenModel.BuiltInDS import BuiltInDS 291 threshs = perfConf.getThresholdInstances(BuiltInDS.sourcetype) 292 createCommand = getattr(perfConf, 'defaultRRDCreateCommand', None) 293 rrdStats.config(perfConf.id, 'zenhub', threshs, createCommand) 294 295 return rrdStats
296
297 - def zeoConnect(self):
298 """ 299 Override the kind of zeo connection we have so we can listen 300 to Zeo object updates. Updates comes as OID invalidations. 301 302 @return: None 303 """ 304 from ZEO.cache import ClientCache as ClientCacheBase 305 class ClientCache(object): 306 """ 307 A ClientCache wrapper that hooks into invalidation so that zenhub 308 can notice when they occur. 309 """ 310 def __init__(s, *args, **kwargs): 311 s._cache = ClientCacheBase(*args, **kwargs)
312 313 def __getattr__(s, name): 314 return getattr(s._cache, name)
315 316 def invalidate(s, oid, tid, server_invalidation=True): 317 self.changes.insert(0, oid) 318 return s._cache.invalidate(oid, tid, 319 server_invalidation) 320 321 from ZEO.ClientStorage import ClientStorage as ClientStorageBase 322 class ClientStorage(ClientStorageBase): 323 "Override the caching class to intercept messages" 324 ClientCacheClass = ClientCache 325 326 storage = ClientStorage((self.options.host, self.options.port), 327 client=self.options.pcachename, 328 var=self.options.pcachedir, 329 cache_size=self.options.pcachesize*1024*1024) 330 from ZODB import DB 331 self.db = DB(storage, cache_size=self.options.cachesize) 332 333
334 - def processQueue(self):
335 """ 336 Periodically (once a second) process database changes 337 338 @return: None 339 """ 340 now = time.time() 341 self.syncdb() # reads the object invalidations 342 try: 343 self.doProcessQueue() 344 except Exception, ex: 345 self.log.exception(ex) 346 reactor.callLater(1, self.processQueue) 347 self.totalEvents += 1 348 self.totalTime += time.time() - now
349 350
351 - def doProcessQueue(self):
352 """ 353 Perform one cycle of update notifications. 354 355 @return: None 356 """ 357 while self.changes: 358 oid = self.changes.pop() 359 self.log.debug("Got oid %r" % oid) 360 obj = self.dmd._p_jar[oid] 361 self.log.debug("Object %r changed" % obj) 362 try: 363 obj = obj.__of__(self.dmd).primaryAq() 364 self.log.debug("Noticing object %s changed" % obj.getPrimaryUrlPath()) 365 except (AttributeError, KeyError), ex: 366 self.log.debug("Noticing object %s " % obj) 367 for s in self.services.values(): 368 s.deleted(obj) 369 else: 370 for s in self.services.values(): 371 s.update(obj)
372 373
374 - def sendEvent(self, **kw):
375 """ 376 Useful method for posting events to the EventManager. 377 378 @type kw: keywords (dict) 379 @param kw: the values for an event: device, summary, etc. 380 @return: None 381 """ 382 if not 'device' in kw: 383 kw['device'] = self.options.monitor 384 if not 'component' in kw: 385 kw['component'] = self.name 386 try: 387 self.zem.sendEvent(Event(**kw)) 388 except: 389 self.log.exception("Unable to send an event")
390
391 - def loadChecker(self):
392 """ 393 Load the password file 394 395 @return: an object satisfying the ICredentialsChecker 396 interface using a password file or an empty list if the file 397 is not available. Uses the file specified in the --passwd 398 command line option. 399 """ 400 try: 401 checker = checkers.FilePasswordDB(self.options.passwordfile) 402 # grab credentials for the workers to login 403 u, p = checker._loadCredentials().next() 404 self.workerUsername, self.workerPassword = u, p 405 return checker 406 except Exception, ex: 407 self.log.exception("Unable to load %s", self.options.passwordfile) 408 return []
409 410
411 - def getService(self, name, instance):
412 """ 413 Helper method to load services dynamically for a collector. 414 Returned instances are cached: reconnecting collectors will 415 get the same service object. 416 417 @type name: string 418 @param name: the dotted-name of the module to load 419 (uses @L{Products.ZenUtils.Utils.importClass}) 420 @param instance: string 421 @param instance: each service serves only one specific collector 422 instances (like 'localhost'). instance defines the collector's 423 instance name. 424 @return: a service loaded from ZenHub/services or one of the zenpacks. 425 """ 426 # Sanity check the names given to us 427 if not self.dmd.Monitors.Performance._getOb(instance, False): 428 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \ 429 self.options.monitor + " is not in the current list" ) 430 431 try: 432 return self.services[name, instance] 433 434 except KeyError: 435 from Products.ZenUtils.Utils import importClass 436 try: 437 ctor = importClass(name) 438 except ImportError: 439 ctor = importClass('Products.ZenHub.services.%s' % name, name) 440 svc = ctor(self.dmd, instance) 441 if self.options.workers: 442 svc = WorkerInterceptor(self, svc) 443 self.services[name, instance] = svc 444 return svc
445
446 - def deferToWorker(self, args):
447 """Take a remote request and queue it for worker processes. 448 449 @type args: tuple 450 @param args: the arguments to the remote_execute() method in the worker 451 @return: a Deferred for the eventual results of the method call 452 453 """ 454 d = defer.Deferred() 455 svcName, instance, method = args[:3] 456 service = self.getService(svcName, instance).service 457 priority = service.getMethodPriority(method) 458 459 if self.options.prioritize: 460 # Insert job into workList so that it stays sorted by priority. 461 for i, job in enumerate(self.workList): 462 if priority < job[1]: 463 self.workList.insert(i, (d, priority, args) ) 464 break 465 else: 466 self.workList.append( (d, priority, args) ) 467 else: 468 # Run jobs on a first come, first serve basis. 469 self.workList.append( (d, priority, args) ) 470 471 self.giveWorkToWorkers() 472 return d
473 474
475 - def giveWorkToWorkers(self):
476 """Parcel out a method invocation to an available worker process 477 """ 478 self.log.debug("worklist has %d items", len(self.workList)) 479 while self.workList: 480 for i, worker in enumerate(self.workers): 481 # linear search is not ideal, but simple enough 482 if not worker.busy: 483 job = self.getJobForWorker(i) 484 if job is None: continue 485 worker.busy = True 486 def finished(result, finishedWorker): 487 finishedWorker.busy = False 488 self.giveWorkToWorkers() 489 return result
490 self.log.debug("Giving %s to worker %d", job[2][2], i) 491 d2 = worker.callRemote('execute', *job[2]) 492 d2.addBoth(finished, worker) 493 d2.chainDeferred(job[0]) 494 break 495 else: 496 self.log.debug("all workers are busy") 497 break 498
499 - def getJobForWorker(self, workerId):
500 if self.options.anyworker: 501 return self.workList.pop(0) 502 else: 503 # Restrict lower priority jobs to a subset of the workers. 504 lenWorkers = float(len(self.workers)) 505 for i in range(len(self.workList)): 506 priority = self.workList[i][1] 507 if priority < (workerId+1) / lenWorkers: 508 return self.workList.pop(i)
509
510 - def createWorker(self):
511 """Start a worker subprocess 512 513 @return: None 514 """ 515 # this probably can't happen, but let's make sure 516 if len(self.workers) >= self.options.workers: 517 return 518 # create a config file for the slave to pass credentials 519 import os, tempfile 520 fd, tmp = tempfile.mkstemp() 521 try: 522 os.write(fd, "hubport %s\n" % self.options.pbport) 523 os.write(fd, "username %s\n" % self.workerUsername) 524 os.write(fd, "password %s\n" % self.workerPassword) 525 os.write(fd, "host %s\n" % self.options.host) 526 os.write(fd, "logseverity %s\n" % self.options.logseverity) 527 os.write(fd, "cachesize %s\n" % self.options.cachesize) 528 finally: 529 os.close(fd) 530 # start the worker 531 exe = zenPath('bin', 'zenhubworker') 532 533 # watch for output, and generally just take notice 534 class WorkerRunningProtocol(protocol.ProcessProtocol): 535 536 def outReceived(s, data): 537 self.log.debug("Worker reports %s" % (data,))
538 539 def errReceived(s, data): 540 self.log.info("Worker reports %s" % (data,)) 541 542 def processEnded(s, reason): 543 os.unlink(tmp) 544 self.worker_processes.discard(s) 545 self.log.warning("Worker exited with status: %d (%s)", 546 reason.value.exitCode, 547 getExitMessage(reason.value.exitCode)) 548 args = (exe, 'run', '-C', tmp) 549 self.log.debug("Starting %s", ' '.join(args)) 550 proc = reactor.spawnProcess(WorkerRunningProtocol(), exe, args, os.environ) 551 self.worker_processes.add(proc)
552 - def heartbeat(self):
553 """ 554 Since we don't do anything on a regular basis, just 555 push heartbeats regularly. 556 557 @return: None 558 """ 559 seconds = 30 560 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds) 561 self.zem.sendEvent(evt) 562 self.niceDoggie(seconds) 563 reactor.callLater(seconds, self.heartbeat) 564 r = self.rrdStats 565 totalTime = sum([s.callTime for s in self.services.values()]) 566 self.zem.sendEvents( 567 r.counter('totalTime', seconds, int(self.totalTime * 1000)) + 568 r.counter('totalEvents', seconds, self.totalEvents) + 569 r.gauge('services', seconds, len(self.services)) + 570 r.counter('totalCallTime', seconds, totalTime) + 571 r.gauge('workListLength', seconds, len(self.workList)) 572 )
573 574
575 - def main(self):
576 """ 577 Start the main event loop. 578 """ 579 if self.options.cycle: 580 self.heartbeat() 581 reactor.run() 582 for proc in self.worker_processes: 583 proc.signalProcess('KILL')
584
585 - def buildOptions(self):
586 """ 587 Adds our command line options to ZCmdBase command line options. 588 """ 589 ZCmdBase.buildOptions(self) 590 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport', 591 type='int', default=XML_RPC_PORT, 592 help='Port to use for XML-based Remote Procedure Calls (RPC)') 593 self.parser.add_option('--pbport', dest='pbport', 594 type='int', default=PB_PORT, 595 help="Port to use for Twisted's pb service") 596 self.parser.add_option('--passwd', dest='passwordfile', 597 type='string', default=zenPath('etc','hubpasswd'), 598 help='File where passwords are stored') 599 self.parser.add_option('--monitor', dest='monitor', 600 default='localhost', 601 help='Name of the distributed monitor this hub runs on') 602 self.parser.add_option('--workers', dest='workers', 603 type='int', default=0, 604 help="Number of worker instances to handle requests") 605 self.parser.add_option('--prioritize', dest='prioritize', 606 action='store_true', default=False, 607 help="Run higher priority jobs before lower priority ones") 608 self.parser.add_option('--anyworker', dest='anyworker', 609 action='store_true', default=False, 610 help='Allow any priority job to run on any worker')
611 612 if __name__ == '__main__': 613 z = ZenHub() 614 z.main() 615