Package Products :: Package ZenHub :: Module zenhub
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.zenhub

  1  #! /usr/bin/env python 
  2  ############################################################################## 
  3  #  
  4  # Copyright (C) Zenoss, Inc. 2007, all rights reserved. 
  5  #  
  6  # This content is made available according to terms specified in 
  7  # License.zenoss under the directory where your Zenoss product is installed. 
  8  #  
  9  ############################################################################## 
 10   
 11   
 12  """zenhub daemon 
 13   
 14  Provide remote, authenticated, and possibly encrypted two-way 
 15  communications with the Model and Event databases. 
 16  """ 
 17  import Globals 
 18   
 19  if __name__ == "__main__": 
 20      # Install the 'best' reactor available, BUT only if run as a script. 
 21      from Products.ZenHub import installReactor 
 22      installReactor() 
 23   
 24  from XmlRpcService import XmlRpcService 
 25   
 26  import collections 
 27  import socket 
 28  import time 
 29  import signal 
 30  import cPickle as pickle 
 31  import os 
 32  from random import choice 
 33   
 34  from twisted.cred import portal, checkers, credentials 
 35  from twisted.spread import pb, banana 
 36  banana.SIZE_LIMIT = 1024 * 1024 * 10 
 37   
 38  from twisted.internet import reactor, protocol, defer 
 39  from twisted.python import failure 
 40  from twisted.web import server, xmlrpc 
 41  from twisted.internet.error import ProcessExitedAlready 
 42  from zope.event import notify 
 43  from zope.interface import implements 
 44  from zope.component import getUtility, getUtilitiesFor, adapts 
 45  from ZODB.POSException import POSKeyError 
 46   
 47  from Products.DataCollector.Plugins import loadPlugins 
 48  from Products.Five import zcml 
 49  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 50  from Products.ZenUtils.Utils import zenPath, getExitMessage, unused, load_config, load_config_override, ipv6_available, atomicWrite 
 51  from Products.ZenUtils.DaemonStats import DaemonStats 
 52  from Products.ZenEvents.Event import Event, EventHeartbeat 
 53  from Products.ZenEvents.ZenEventClasses import App_Start 
 54  from Products.ZenMessaging.queuemessaging.interfaces import IEventPublisher 
 55  from Products.ZenRelations.PrimaryPathObjectManager import PrimaryPathObjectManager 
 56  from Products.ZenModel.DeviceComponent import DeviceComponent 
 57  from Products.ZenHub.services.RenderConfig import RenderConfig 
 58  from Products.ZenHub.interfaces import IInvalidationProcessor, IServiceAddedEvent, IHubCreatedEvent, IHubWillBeCreatedEvent, IInvalidationOid, IHubConfProvider, IHubHeartBeatCheck 
 59  from Products.ZenHub.interfaces import IParserReadyForOptionsEvent, IInvalidationFilter 
 60  from Products.ZenHub.interfaces import FILTER_INCLUDE, FILTER_EXCLUDE 
 61  from Products.ZenHub.WorkerSelection import WorkerSelector 
 62   
 63  from Products.ZenHub.PBDaemon import RemoteBadMonitor 
 64  pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor) 
 65   
 66  from BTrees.IIBTree import IITreeSet 
 67   
 68  # Due to the manipulation of sys.path during the loading of plugins, 
 69  # we can get ObjectMap imported both as DataMaps.ObjectMap and the 
 70  # full-path from Products.  The following gets the class registered 
 71  # with the jelly serialization engine under both names: 
 72  # 
 73  #  1st: get Products.DataCollector.plugins.DataMaps.ObjectMap 
 74  from Products.DataCollector.plugins.DataMaps import ObjectMap 
 75  #  2nd: get DataMaps.ObjectMap 
 76  import sys 
 77  sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins')) 
 78  import DataMaps 
 79  unused(DataMaps, ObjectMap) 
 80   
 81  from Products.ZenHub import XML_RPC_PORT 
 82  from Products.ZenHub import PB_PORT 
 83  from Products.ZenHub import ZENHUB_ZENRENDER 
 84   
 85  HubWorklistItem = collections.namedtuple('HubWorklistItem', 'recvtime deferred priority servicename instance method args') 
 86  WorkerStats = collections.namedtuple('WorkerStats', 'status description lastupdate previdle') 
 87  LastCallReturnValue = collections.namedtuple('LastCallReturnValue', 'returnvalue') 
88 89 -class AuthXmlRpcService(XmlRpcService):
90 """Provide some level of authentication for XML/RPC calls""" 91
92 - def __init__(self, dmd, checker):
93 XmlRpcService.__init__(self, dmd) 94 self.checker = checker
95 96
97 - def doRender(self, unused, request):
98 """ 99 Call the inherited render engine after authentication succeeds. 100 See @L{XmlRpcService.XmlRpcService.Render}. 101 """ 102 return XmlRpcService.render(self, request)
103 104
105 - def unauthorized(self, request):
106 """ 107 Render an XMLRPC error indicating an authentication failure. 108 @type request: HTTPRequest 109 @param request: the request for this xmlrpc call. 110 @return: None 111 """ 112 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
113 114
115 - def render(self, request):
116 """ 117 Unpack the authorization header and check the credentials. 118 @type request: HTTPRequest 119 @param request: the request for this xmlrpc call. 120 @return: NOT_DONE_YET 121 """ 122 auth = request.received_headers.get('authorization', None) 123 if not auth: 124 self.unauthorized(request) 125 else: 126 try: 127 type, encoded = auth.split() 128 if type not in ('Basic',): 129 self.unauthorized(request) 130 else: 131 user, passwd = encoded.decode('base64').split(':') 132 c = credentials.UsernamePassword(user, passwd) 133 d = self.checker.requestAvatarId(c) 134 d.addCallback(self.doRender, request) 135 def error(unused, request): 136 self.unauthorized(request)
137 d.addErrback(error, request) 138 except Exception: 139 self.unauthorized(request) 140 return server.NOT_DONE_YET
141
142 143 -class HubAvitar(pb.Avatar):
144 """ 145 Connect collectors to their configuration Services 146 """ 147
148 - def __init__(self, hub):
149 self.hub = hub
150 151
152 - def perspective_ping(self):
153 return 'pong'
154
155 - def perspective_getService(self, 156 serviceName, 157 instance = None, 158 listener = None):
159 """ 160 Allow a collector to find a Hub service by name. It also 161 associates the service with a collector so that changes can be 162 pushed back out to collectors. 163 164 @type serviceName: string 165 @param serviceName: a name, like 'EventService' 166 @type instance: string 167 @param instance: the collector's instance name, like 'localhost' 168 @type listener: a remote reference to the collector 169 @param listener: the callback interface to the collector 170 @return a remote reference to a service 171 """ 172 try: 173 service = self.hub.getService(serviceName, instance) 174 except Exception: 175 self.hub.log.exception("Failed to get service '%s'", serviceName) 176 return None 177 else: 178 if service is not None and listener: 179 service.addListener(listener) 180 return service
181
182 - def perspective_reportingForWork(self, worker):
183 """ 184 Allow a worker register for work. 185 186 @type worker: a pb.RemoteReference 187 @param worker: a reference to zenhubworker 188 @return None 189 """ 190 worker.busy = False 191 self.hub.workers.append(worker) 192 def removeWorker(worker): 193 if worker in self.hub.workers: 194 self.hub.workers.remove(worker)
195 worker.notifyOnDisconnect(removeWorker)
196
197 198 -class ServiceAddedEvent(object):
199 implements(IServiceAddedEvent)
200 - def __init__(self, name, instance):
201 self.name = name 202 self.instance = instance
203
204 205 -class HubWillBeCreatedEvent(object):
206 implements(IHubWillBeCreatedEvent)
207 - def __init__(self, hub):
208 self.hub = hub
209
210 211 -class HubCreatedEvent(object):
212 implements(IHubCreatedEvent)
213 - def __init__(self, hub):
214 self.hub = hub
215
216 -class ParserReadyForOptionsEvent(object):
217 implements(IParserReadyForOptionsEvent)
218 - def __init__(self, parser):
219 self.parser = parser
220
221 -class HubRealm(object):
222 """ 223 Following the Twisted authentication framework. 224 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html 225 """ 226 implements(portal.IRealm) 227
228 - def __init__(self, hub):
229 self.hubAvitar = HubAvitar(hub)
230
231 - def requestAvatar(self, collName, mind, *interfaces):
232 if pb.IPerspective not in interfaces: 233 raise NotImplementedError 234 return pb.IPerspective, self.hubAvitar, lambda:None
235
236 237 -class WorkerInterceptor(pb.Referenceable):
238 """Redirect service requests to one of the worker processes. Note 239 that everything else (like change notifications) go through 240 locally hosted services.""" 241 242 callTime = 0. 243
244 - def __init__(self, zenhub, service):
245 self.zenhub = zenhub 246 self.service = service
247
248 - def remoteMessageReceived(self, broker, message, args, kw):
249 "Intercept requests and send them down to workers" 250 svc = str(self.service.__class__).rpartition('.')[0] 251 instance = self.service.instance 252 args = broker.unserialize(args) 253 kw = broker.unserialize(kw) 254 # hide the types in the args: subverting the jelly protection mechanism, 255 # but the types just passed through and the worker may not have loaded 256 # the required service before we try passing types for that service 257 # PB has a 640k limit, not bytes but len of sequences. When args are 258 # pickled the resulting string may be larger than 640k, split into 259 # 100k chunks 260 pickledArgs = pickle.dumps( (args, kw), pickle.HIGHEST_PROTOCOL ) 261 chunkedArgs=[] 262 chunkSize = 102400 263 while pickledArgs: 264 chunk = pickledArgs[:chunkSize] 265 chunkedArgs.append(chunk) 266 pickledArgs = pickledArgs[chunkSize:] 267 268 result = self.zenhub.deferToWorker( (svc, instance, message, chunkedArgs) ) 269 return broker.serialize(result, self.perspective)
270
271 - def __getattr__(self, attr):
272 "Implement the HubService interface by forwarding to the local service" 273 return getattr(self.service, attr)
274
275 276 -class _ZenHubWorklist(object):
277
278 - def __init__(self):
279 self.eventworklist = [] 280 self.otherworklist = [] 281 self.applyworklist = [] 282 283 #priority lists for eventual task selection. All queues are appended in case 284 #any of them are empty. 285 self.eventPriorityList = [self.eventworklist, self.otherworklist, self.applyworklist] 286 self.otherPriorityList = [self.otherworklist, self.applyworklist, self.eventworklist] 287 self.applyPriorityList = [self.applyworklist, self.eventworklist, self.otherworklist] 288 self.dispatch = { 289 'sendEvents': self.eventworklist, 290 'applyDataMaps': self.applyworklist 291 }
292
293 - def __getitem__(self, item):
294 return self.dispatch.get(item, self.otherworklist)
295
296 - def __len__(self):
297 return len(self.eventworklist) + len(self.otherworklist) + len(self.applyworklist)
298
299 - def pop(self):
300 """ 301 Select a single task to be distributed to a worker. We prioritize tasks as follows: 302 sendEvents > configuration service calls > applyDataMaps 303 To prevent starving any queue in an event storm, we randomize the task selection, 304 preferring tasks according to the above priority. 305 """ 306 eventchain = filter(None, self.eventPriorityList) 307 otherchain = filter(None, self.otherPriorityList) 308 applychain = filter(None, self.applyPriorityList) 309 seq = choice([eventchain]*4 + 310 [otherchain]*2 + 311 [applychain] 312 ) 313 ret = seq[0].pop(0) 314 return ret
315
316 - def push(self, job):
317 self[job.method].insert(0, job)
318
319 - def append(self, job):
320 self[job.method].append(job)
321
322 -class ZenHub(ZCmdBase):
323 """ 324 Listen for changes to objects in the Zeo database and update the 325 collectors' configuration. 326 327 The remote collectors connect the ZenHub and request configuration 328 information and stay connected. When changes are detected in the 329 Zeo database, configuration updates are sent out to collectors 330 asynchronously. In this way, changes made in the web GUI can 331 affect collection immediately, instead of waiting for a 332 configuration cycle. 333 334 Each collector uses a different, pluggable service within ZenHub 335 to translate objects into configuration and data. ZenPacks can 336 add services for their collectors. Collectors communicate using 337 Twisted's Perspective Broker, which provides authenticated, 338 asynchronous, bidirectional method invocation. 339 340 ZenHub also provides an XmlRPC interface to some common services 341 to support collectors written in other languages. 342 343 ZenHub does very little work in its own process, but instead dispatches 344 the work to a pool of zenhubworkers, running zenhubworker.py. zenhub 345 manages these workers with 3 data structures: 346 - workers - a list of remote PB instances 347 - worker_processes - a set of WorkerRunningProtocol instances 348 - workerprocessmap - a dict mapping pid to process instance created 349 by reactor.spawnprocess 350 Callbacks and handlers that detect worker shutdown update these 351 structures automatically. ONLY ONE HANDLER must take care of restarting 352 new workers, to avoid accidentally spawning too many workers. This 353 handler also verifies that zenhub is not in the process of shutting 354 down, so that callbacks triggered during daemon shutdown don't keep 355 starting new workers. 356 357 TODO: document invalidation workers 358 """ 359 360 totalTime = 0. 361 totalEvents = 0 362 totalCallTime = 0. 363 name = 'zenhub' 364
365 - def __init__(self):
366 """ 367 Hook ourselves up to the Zeo database and wait for collectors 368 to connect. 369 """ 370 # list of remote worker references 371 self.workers = [] 372 self.workTracker = {} 373 self.workList = _ZenHubWorklist() 374 # set of worker processes 375 self.worker_processes=set() 376 # map of worker pids -> worker processes 377 self.workerprocessmap = {} 378 self.shutdown = False 379 self.counters = collections.Counter() 380 381 ZCmdBase.__init__(self) 382 import Products.ZenHub 383 load_config("hub.zcml", Products.ZenHub) 384 notify(HubWillBeCreatedEvent(self)) 385 386 #Worker selection handler 387 self.workerselector = WorkerSelector(self.options) 388 self.workList.log = self.log 389 390 # make sure we don't reserve more than n-1 workers for events 391 maxReservedEventsWorkers = 0 392 if self.options.workers: 393 maxReservedEventsWorkers = self.options.workers-1 394 if self.options.workersReservedForEvents > maxReservedEventsWorkers: 395 self.options.workersReservedForEvents = maxReservedEventsWorkers 396 self.log.info("reduced number of workers reserved for sending events to %d", 397 self.options.workersReservedForEvents) 398 399 self.zem = self.dmd.ZenEventManager 400 loadPlugins(self.dmd) 401 self.services = {} 402 403 er = HubRealm(self) 404 checker = self.loadChecker() 405 pt = portal.Portal(er, [checker]) 406 interface = '::' if ipv6_available() else '' 407 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt), interface=interface) 408 409 xmlsvc = AuthXmlRpcService(self.dmd, checker) 410 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc), interface=interface) 411 412 #start listening for zenrender requests 413 if self.options.graph_proxy: 414 self.renderConfig = RenderConfig(self.dmd, ZENHUB_ZENRENDER ) 415 416 # responsible for sending messages to the queues 417 import Products.ZenMessaging.queuemessaging 418 load_config_override('twistedpublisher.zcml', Products.ZenMessaging.queuemessaging) 419 notify(HubCreatedEvent(self)) 420 self.sendEvent(eventClass=App_Start, 421 summary="%s started" % self.name, 422 severity=0) 423 424 self._initialize_invalidation_filters() 425 reactor.callLater(5, self.processQueue) 426 427 self.rrdStats = self.getRRDStats() 428 429 if self.options.workers: 430 self.workerconfig = zenPath('var', 'zenhub', '%s_worker.conf' % self._getConf().id) 431 self._createWorkerConf() 432 for i in range(self.options.workers): 433 self.createWorker() 434 435 # start cyclic call to giveWorkToWorkers 436 reactor.callLater(2, self.giveWorkToWorkers, True) 437 438 # set up SIGUSR2 handling 439 try: 440 signal.signal(signal.SIGUSR2, self.sighandler_USR2) 441 except ValueError: 442 # If we get called multiple times, this will generate an exception: 443 # ValueError: signal only works in main thread 444 # Ignore it as we've already set up the signal handler. 445 pass
446
447 - def sighandler_USR2(self, signum, frame):
448 #log zenhub's worker stats 449 self._workerStats() 450 451 # send SIGUSR2 signal to all workers 452 for worker in self.workerprocessmap.values(): 453 try: 454 worker.signalProcess(signal.SIGUSR2) 455 time.sleep(0.5) 456 except Exception: 457 pass
458
459 - def stop(self):
460 self.shutdown = True
461
462 - def _getConf(self):
463 confProvider = IHubConfProvider(self) 464 return confProvider.getHubConf()
465
466 - def getRRDStats(self):
467 """ 468 Return the most recent RRD statistic information. 469 """ 470 rrdStats = DaemonStats() 471 perfConf = self._getConf() 472 473 from Products.ZenModel.BuiltInDS import BuiltInDS 474 threshs = perfConf.getThresholdInstances(BuiltInDS.sourcetype) 475 createCommand = getattr(perfConf, 'defaultRRDCreateCommand', None) 476 rrdStats.config(perfConf.id, 'zenhub', threshs, createCommand) 477 478 return rrdStats
479
480 - def processQueue(self):
481 """ 482 Periodically (once a second) process database changes 483 484 @return: None 485 """ 486 now = time.time() 487 try: 488 self.syncdb() # reads the object invalidations 489 except Exception, ex: 490 self.log.warn("Unable to poll invalidations, will try again.") 491 else: 492 try: 493 self.doProcessQueue() 494 except Exception, ex: 495 self.log.exception("Unable to poll invalidations.") 496 reactor.callLater(1, self.processQueue) 497 self.totalEvents += 1 498 self.totalTime += time.time() - now
499
501 filters = (f for n, f in getUtilitiesFor(IInvalidationFilter)) 502 self._invalidation_filters = [] 503 for fltr in sorted(filters, key=lambda f:getattr(f, 'weight', 100)): 504 fltr.initialize(self.dmd) 505 self._invalidation_filters.append(fltr) 506 self.log.debug('Registered %s invalidation filters.' % 507 len(self._invalidation_filters))
508
509 - def _filter_oids(self, oids):
510 app = self.dmd.getPhysicalRoot() 511 i = 0 512 for oid in oids: 513 i += 1 514 try: 515 obj = app._p_jar[oid] 516 except POSKeyError: 517 # State is gone from the database. Send it along. 518 yield oid 519 else: 520 if isinstance(obj, (PrimaryPathObjectManager, DeviceComponent)): 521 try: 522 obj = obj.__of__(self.dmd).primaryAq() 523 except (AttributeError, KeyError): 524 # It's a delete. This should go through. 525 yield oid 526 else: 527 included = True 528 for fltr in self._invalidation_filters: 529 result = fltr.include(obj) 530 if result in (FILTER_INCLUDE, FILTER_EXCLUDE): 531 included = (result == FILTER_INCLUDE) 532 break 533 if included: 534 oids = self._transformOid(oid, obj) 535 if oids: 536 for oid in oids: 537 yield oid
538
539 - def _transformOid(self, oid, obj):
540 oidTransform = IInvalidationOid(obj) 541 newOids = oidTransform.transformOid(oid) 542 if isinstance(newOids, str): 543 newOids = [newOids] 544 for newOid in newOids: 545 yield newOid
546
547 - def doProcessQueue(self):
548 """ 549 Perform one cycle of update notifications. 550 551 @return: None 552 """ 553 changes_dict = self.storage.poll_invalidations() 554 if changes_dict is not None: 555 processor = getUtility(IInvalidationProcessor) 556 d = processor.processQueue(tuple(set(self._filter_oids(changes_dict)))) 557 def done(n): 558 if n: 559 self.log.debug('Processed %s oids' % n)
560 d.addCallback(done)
561 562
563 - def sendEvent(self, **kw):
564 """ 565 Useful method for posting events to the EventManager. 566 567 @type kw: keywords (dict) 568 @param kw: the values for an event: device, summary, etc. 569 @return: None 570 """ 571 if not 'device' in kw: 572 kw['device'] = self.options.monitor 573 if not 'component' in kw: 574 kw['component'] = self.name 575 try: 576 self.zem.sendEvent(Event(**kw)) 577 except Exception: 578 self.log.exception("Unable to send an event")
579
580 - def loadChecker(self):
581 """ 582 Load the password file 583 584 @return: an object satisfying the ICredentialsChecker 585 interface using a password file or an empty list if the file 586 is not available. Uses the file specified in the --passwd 587 command line option. 588 """ 589 try: 590 checker = checkers.FilePasswordDB(self.options.passwordfile) 591 # grab credentials for the workers to login 592 u, p = checker._loadCredentials().next() 593 self.workerUsername, self.workerPassword = u, p 594 return checker 595 except Exception, ex: 596 self.log.exception("Unable to load %s", self.options.passwordfile) 597 return []
598
599 - def getService(self, name, instance):
600 """ 601 Helper method to load services dynamically for a collector. 602 Returned instances are cached: reconnecting collectors will 603 get the same service object. 604 605 @type name: string 606 @param name: the dotted-name of the module to load 607 (uses @L{Products.ZenUtils.Utils.importClass}) 608 @param instance: string 609 @param instance: each service serves only one specific collector 610 instances (like 'localhost'). instance defines the collector's 611 instance name. 612 @return: a service loaded from ZenHub/services or one of the zenpacks. 613 """ 614 # Sanity check the names given to us 615 if not self.dmd.Monitors.Performance._getOb(instance, False): 616 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \ 617 self.options.monitor + " is not in the current list" ) 618 619 try: 620 return self.services[name, instance] 621 622 except KeyError: 623 from Products.ZenUtils.Utils import importClass 624 try: 625 ctor = importClass(name) 626 except ImportError: 627 ctor = importClass('Products.ZenHub.services.%s' % name, name) 628 try: 629 svc = ctor(self.dmd, instance) 630 except Exception: 631 self.log.exception("Failed to initialize %s", ctor) 632 # Module can't be used, so unload it. 633 if ctor.__module__ in sys.modules: 634 del sys.modules[ctor.__module__] 635 return None 636 else: 637 if self.options.workers: 638 svc = WorkerInterceptor(self, svc) 639 self.services[name, instance] = svc 640 notify(ServiceAddedEvent(name, instance)) 641 return svc
642
643 - def deferToWorker(self, args):
644 """Take a remote request and queue it for worker processes. 645 646 @type args: tuple 647 @param args: the arguments to the remote_execute() method in the worker 648 @return: a Deferred for the eventual results of the method call 649 650 """ 651 d = defer.Deferred() 652 svcName, instance, method = args[:3] 653 service = self.getService(svcName, instance).service 654 priority = service.getMethodPriority(method) 655 656 self.workList.append(HubWorklistItem(time.time(), d, priority, svcName, instance, method, args)) 657 658 self.giveWorkToWorkers() 659 return d
660
661 - def updateStatusAtStart(self, wId, job):
662 now = time.time() 663 jobDesc = "%s:%s.%s" % (job.instance, job.servicename, job.method) 664 stats = self.workTracker.pop(wId, None) 665 idletime = 0 666 if stats: 667 idletime = now - stats.lastupdate 668 self.log.debug("Giving %s to worker %d, (%s)", job.method, wId, jobDesc) 669 self.workTracker[wId] = WorkerStats('Busy', jobDesc, now, idletime)
670
671 - def updateStatusAtFinish(self, wId, error=None):
672 now = time.time() 673 stats = self.workTracker.pop(wId, None) 674 if stats: 675 elapsed = now - stats.lastupdate 676 self.log.debug("worker %s, work %s finished in %s" % (wId, stats.description, elapsed)) 677 self.workTracker[wId] = WorkerStats('Error: %s' % error if error else 'Idle', 678 stats.description, now, 0)
679 680
681 - def giveWorkToWorkers(self, requeue=False):
682 """Parcel out a method invocation to an available worker process 683 """ 684 if self.workList: 685 self.log.debug("worklist has %d items", len(self.workList)) 686 incompleteJobs = [] 687 while self.workList: 688 if all(w.busy for w in self.workers): 689 self.log.debug("all workers are busy") 690 break 691 692 job = self.workList.pop() 693 self.log.debug("get candidate workers for %s..." % job.method) 694 candidateWorkers = list(self.workerselector.getCandidateWorkerIds(job.method, self.workers)) 695 self.log.debug("candidate workers are %r", candidateWorkers) 696 for i in candidateWorkers: 697 worker = self.workers[i] 698 worker.busy = True 699 def finished(result, finishedWorker, wId): 700 finishedWorker.busy = False 701 error = None 702 if not isinstance(result, failure.Failure): 703 try: 704 result = pickle.loads(''.join(result)) 705 except Exception as e: 706 error = e 707 self.log.exception("Error un-pickling result from worker") 708 709 # if zenhubworker is about to shutdown, it will wrap the actual result 710 # in a LastCallReturnValue tuple - remove worker from worker list to 711 # keep from accidentally sending it any more work while it shuts down 712 if isinstance(result, LastCallReturnValue): 713 self.log.debug("worker %s is shutting down" % wId) 714 result = result.returnvalue 715 if finishedWorker in self.workers: 716 self.workers.remove(finishedWorker) 717 718 else: 719 error = result.getErrorMessage() 720 self.updateStatusAtFinish(wId, error) 721 reactor.callLater(0,self.giveWorkToWorkers) 722 return result
723 724 self.counters['workerItems'] += 1 725 self.updateStatusAtStart(i, job) 726 try: 727 d2 = worker.callRemote('execute', *job.args) 728 d2.addBoth(finished, worker, i) 729 except Exception: 730 self.log.warning("Failed to execute job on zenhub worker") 731 d2 = defer.maybeDeferred(finished, failure.Failure(), worker, i) 732 finally: 733 d2.chainDeferred(job.deferred) 734 break 735 else: 736 self.log.debug("no worker available for %s" % job.method) 737 #could not complete this job, put it back in the queue once 738 #we're finished saturating the workers 739 incompleteJobs.append(job) 740 741 for job in reversed(incompleteJobs): 742 #could not complete this job, put it back in the queue 743 self.workList.push(job) 744 745 if incompleteJobs: 746 reactor.callLater(0,self.giveWorkToWorkers) 747 748 if requeue and not self.shutdown: 749 reactor.callLater(5,self.giveWorkToWorkers, True) 750
751 - def _workerStats(self):
752 now = time.time() 753 lines = ['Worklist Stats:', 754 '\tEvents:\t%s' % len(self.workList.eventworklist), 755 '\tOther:\t%s' % len(self.workList.otherworklist), 756 '\tApplyDataMaps:\t%s' % len(self.workList.applyworklist), 757 '\tTotal:\t%s' % len(self.workList), 758 'Worker Stats:'] 759 for wId, worker in enumerate(self.workers): 760 stat = self.workTracker.get(wId, None) 761 linePattern = '\t%d:%s\t[%s%s]\t%.3fs' 762 lines.append(linePattern % ( 763 wId, 764 'Busy' if worker.busy else 'Idle', 765 '%s %s' % (stat.status, stat.description) if stat else 'No Stats', 766 ' Idle:%.3fs' % stat.previdle if stat and stat.previdle else '', 767 now - stat.lastupdate if stat else 0 768 )) 769 self.log.info('\n'.join(lines))
770
771 - def _createWorkerConf(self):
772 workerconfigdir = os.path.dirname(self.workerconfig) 773 if not os.path.exists(workerconfigdir): 774 os.makedirs(workerconfigdir) 775 with open(self.workerconfig,'w') as workerfd: 776 workerfd.write("hubport %s\n" % self.options.pbport) 777 workerfd.write("username %s\n" % self.workerUsername) 778 workerfd.write("password %s\n" % self.workerPassword) 779 workerfd.write("logseverity %s\n" % self.options.logseverity) 780 workerfd.write("zodb-cachesize %s\n" % self.options.zodb_cachesize) 781 workerfd.write("calllimit %s\n" % self.options.worker_call_limit)
782
783 - def createWorker(self):
784 """Start a worker subprocess 785 786 @return: None 787 """ 788 # this probably can't happen, but let's make sure 789 if len(self.worker_processes) >= self.options.workers: 790 self.log.info("already at maximum number of worker processes, no worker will be created") 791 return 792 exe = zenPath('bin', 'zenhubworker') 793 794 # watch for output, and generally just take notice 795 class WorkerRunningProtocol(protocol.ProcessProtocol): 796 797 def __init__(self, parent): 798 self._pid = 0 799 self.parent = parent 800 self.log = parent.log
801 802 @property 803 def pid(self): 804 return self._pid 805 806 def connectionMade(self): 807 self._pid = self.transport.pid 808 reactor.callLater(1, self.parent.giveWorkToWorkers) 809 810 def outReceived(self, data): 811 self.log.debug("Worker (%d) reports %s" % (self.pid, data.rstrip(),)) 812 813 def errReceived(self, data): 814 self.log.info("Worker (%d) reports %s" % (self.pid, data.rstrip(),)) 815 816 def processEnded(self, reason): 817 self.parent.worker_processes.discard(self) 818 self.parent.workerprocessmap.pop(self.pid, None) 819 self.log.warning("Worker (%d) exited with status: %d (%s)", 820 self.pid, 821 reason.value.exitCode, 822 getExitMessage(reason.value.exitCode)) 823 # if not shutting down, restart a new worker 824 if not self.parent.shutdown: 825 self.log.info("Starting new zenhubworker") 826 self.parent.createWorker() 827 828 args = (exe, 'run', '-C', self.workerconfig) 829 self.log.debug("Starting %s", ' '.join(args)) 830 prot = WorkerRunningProtocol(self) 831 proc = reactor.spawnProcess(prot, exe, args, os.environ) 832 self.workerprocessmap[proc.pid] = proc 833 self.worker_processes.add(prot) 834
835 - def heartbeat(self):
836 """ 837 Since we don't do anything on a regular basis, just 838 push heartbeats regularly. 839 840 @return: None 841 """ 842 seconds = 30 843 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds) 844 self.zem.sendEvent(evt) 845 self.niceDoggie(seconds) 846 reactor.callLater(seconds, self.heartbeat) 847 r = self.rrdStats 848 totalTime = sum(s.callTime for s in self.services.values()) 849 events = r.counter('totalTime', seconds, int(self.totalTime * 1000)) 850 events += r.counter('totalEvents', seconds, self.totalEvents) 851 events += r.gauge('services', seconds, len(self.services)) 852 events += r.counter('totalCallTime', seconds, totalTime) 853 events += r.gauge('workListLength', seconds, len(self.workList)) 854 for name, value in self.counters.items(): 855 events += r.counter(name, seconds, value) 856 self.zem.sendEvents(events) 857 858 # persist counters values 859 self.saveCounters() 860 try: 861 hbcheck = IHubHeartBeatCheck(self) 862 hbcheck.check() 863 except: 864 self.log.exception("Error processing heartbeat hook")
865
866 - def saveCounters(self):
867 atomicWrite( 868 zenPath('var/zenhub_counters.pickle'), 869 pickle.dumps(self.counters), 870 raiseException=False, 871 )
872
873 - def loadCounters(self):
874 try: 875 self.counters = pickle.load(open(zenPath('var/zenhub_counters.pickle'))) 876 except Exception: 877 pass
878
879 - def main(self):
880 """ 881 Start the main event loop. 882 """ 883 if self.options.cycle: 884 self.heartbeat() 885 reactor.run() 886 self.shutdown = True 887 for proc in self.workerprocessmap.itervalues(): 888 try: 889 proc.signalProcess('KILL') 890 except ProcessExitedAlready: 891 pass 892 except Exception: 893 pass 894 workerconfig = getattr(self,'workerconfig', None) 895 if workerconfig and os.path.exists(workerconfig): 896 os.unlink(self.workerconfig) 897 getUtility(IEventPublisher).close()
898
899 - def buildOptions(self):
900 """ 901 Adds our command line options to ZCmdBase command line options. 902 """ 903 ZCmdBase.buildOptions(self) 904 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport', 905 type='int', default=XML_RPC_PORT, 906 help='Port to use for XML-based Remote Procedure Calls (RPC)') 907 self.parser.add_option('--pbport', dest='pbport', 908 type='int', default=PB_PORT, 909 help="Port to use for Twisted's pb service") 910 self.parser.add_option('--passwd', dest='passwordfile', 911 type='string', default=zenPath('etc','hubpasswd'), 912 help='File where passwords are stored') 913 self.parser.add_option('--monitor', dest='monitor', 914 default='localhost', 915 help='Name of the distributed monitor this hub runs on') 916 self.parser.add_option('--workers', dest='workers', 917 type='int', default=2, 918 help="Number of worker instances to handle requests") 919 self.parser.add_option('--prioritize', dest='prioritize', 920 action='store_true', default=False, 921 help="Run higher priority jobs before lower priority ones") 922 self.parser.add_option('--anyworker', dest='anyworker', 923 action='store_true', default=False, 924 help='Allow any priority job to run on any worker') 925 self.parser.add_option('--logworkerstats', dest='logworkerstats', 926 action='store_true', default=False, 927 help='Log current worker state to $ZENHOME/log/workerstats') 928 self.parser.add_option('--no-graph-proxy', dest='graph_proxy', 929 action='store_false', default=True, 930 help="Don't listen to proxy graph requests to zenrender") 931 self.parser.add_option('--workers-reserved-for-events', dest='workersReservedForEvents', 932 type='int', default=1, 933 help="Number of worker instances to reserve for handling events") 934 self.parser.add_option('--worker-call-limit', dest='worker_call_limit', 935 type='int', default=200, 936 help="Maximum number of remote calls a worker can run before restarting") 937 938 notify(ParserReadyForOptionsEvent(self.parser))
939
940 -class DefaultConfProvider(object):
941 implements(IHubConfProvider) 942 adapts(ZenHub) 943
944 - def __init__(self, zenhub):
945 self._zenhub = zenhub
946
947 - def getHubConf(self):
948 zenhub = self._zenhub 949 return zenhub.dmd.Monitors.Performance._getOb(zenhub.options.monitor, None)
950
951 -class DefaultHubHeartBeatCheck(object):
952 implements(IHubHeartBeatCheck) 953 adapts(ZenHub) 954
955 - def __init__(self, zenhub):
956 self._zenhub = zenhub
957
958 - def check(self):
959 pass
960 961 962 if __name__ == '__main__': 963 from Products.ZenHub.zenhub import ZenHub 964 z = ZenHub() 965 966 # during startup, restore performance counters 967 z.loadCounters() 968 969 z.main() 970 971 # during shutdown, attempt to save our performance counters 972 z.saveCounters() 973