1
2
3
4
5
6
7
8
9
10
11
12 """zenhub daemon
13
14 Provide remote, authenticated, and possibly encrypted two-way
15 communications with the Model and Event databases.
16 """
17 import Globals
18
19 if __name__ == "__main__":
20
21 from Products.ZenHub import installReactor
22 installReactor()
23
24 from XmlRpcService import XmlRpcService
25
26 import collections
27 import socket
28 import time
29 import signal
30 import cPickle as pickle
31 import os
32 from random import choice
33
34 from twisted.cred import portal, checkers, credentials
35 from twisted.spread import pb, banana
36 banana.SIZE_LIMIT = 1024 * 1024 * 10
37
38 from twisted.internet import reactor, protocol, defer
39 from twisted.python import failure
40 from twisted.web import server, xmlrpc
41 from twisted.internet.error import ProcessExitedAlready
42 from zope.event import notify
43 from zope.interface import implements
44 from zope.component import getUtility, getUtilitiesFor, adapts
45 from ZODB.POSException import POSKeyError
46
47 from Products.DataCollector.Plugins import loadPlugins
48 from Products.Five import zcml
49 from Products.ZenUtils.ZCmdBase import ZCmdBase
50 from Products.ZenUtils.Utils import zenPath, getExitMessage, unused, load_config, load_config_override, ipv6_available, atomicWrite
51 from Products.ZenUtils.DaemonStats import DaemonStats
52 from Products.ZenEvents.Event import Event, EventHeartbeat
53 from Products.ZenEvents.ZenEventClasses import App_Start
54 from Products.ZenMessaging.queuemessaging.interfaces import IEventPublisher
55 from Products.ZenRelations.PrimaryPathObjectManager import PrimaryPathObjectManager
56 from Products.ZenModel.DeviceComponent import DeviceComponent
57 from Products.ZenHub.services.RenderConfig import RenderConfig
58 from Products.ZenHub.interfaces import IInvalidationProcessor, IServiceAddedEvent, IHubCreatedEvent, IHubWillBeCreatedEvent, IInvalidationOid, IHubConfProvider, IHubHeartBeatCheck
59 from Products.ZenHub.interfaces import IParserReadyForOptionsEvent, IInvalidationFilter
60 from Products.ZenHub.interfaces import FILTER_INCLUDE, FILTER_EXCLUDE
61 from Products.ZenHub.WorkerSelection import WorkerSelector
62
63 from Products.ZenHub.PBDaemon import RemoteBadMonitor
64 pb.setUnjellyableForClass(RemoteBadMonitor, RemoteBadMonitor)
65
66 from BTrees.IIBTree import IITreeSet
67
68
69
70
71
72
73
74 from Products.DataCollector.plugins.DataMaps import ObjectMap
75
76 import sys
77 sys.path.insert(0, zenPath('Products', 'DataCollector', 'plugins'))
78 import DataMaps
79 unused(DataMaps, ObjectMap)
80
81 from Products.ZenHub import XML_RPC_PORT
82 from Products.ZenHub import PB_PORT
83 from Products.ZenHub import ZENHUB_ZENRENDER
84
85 HubWorklistItem = collections.namedtuple('HubWorklistItem', 'recvtime deferred priority servicename instance method args')
86 WorkerStats = collections.namedtuple('WorkerStats', 'status description lastupdate previdle')
87 LastCallReturnValue = collections.namedtuple('LastCallReturnValue', 'returnvalue')
90 """Provide some level of authentication for XML/RPC calls"""
91
95
96
98 """
99 Call the inherited render engine after authentication succeeds.
100 See @L{XmlRpcService.XmlRpcService.Render}.
101 """
102 return XmlRpcService.render(self, request)
103
104
106 """
107 Render an XMLRPC error indicating an authentication failure.
108 @type request: HTTPRequest
109 @param request: the request for this xmlrpc call.
110 @return: None
111 """
112 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
113
114
116 """
117 Unpack the authorization header and check the credentials.
118 @type request: HTTPRequest
119 @param request: the request for this xmlrpc call.
120 @return: NOT_DONE_YET
121 """
122 auth = request.received_headers.get('authorization', None)
123 if not auth:
124 self.unauthorized(request)
125 else:
126 try:
127 type, encoded = auth.split()
128 if type not in ('Basic',):
129 self.unauthorized(request)
130 else:
131 user, passwd = encoded.decode('base64').split(':')
132 c = credentials.UsernamePassword(user, passwd)
133 d = self.checker.requestAvatarId(c)
134 d.addCallback(self.doRender, request)
135 def error(unused, request):
136 self.unauthorized(request)
137 d.addErrback(error, request)
138 except Exception:
139 self.unauthorized(request)
140 return server.NOT_DONE_YET
141
144 """
145 Connect collectors to their configuration Services
146 """
147
150
151
154
159 """
160 Allow a collector to find a Hub service by name. It also
161 associates the service with a collector so that changes can be
162 pushed back out to collectors.
163
164 @type serviceName: string
165 @param serviceName: a name, like 'EventService'
166 @type instance: string
167 @param instance: the collector's instance name, like 'localhost'
168 @type listener: a remote reference to the collector
169 @param listener: the callback interface to the collector
170 @return a remote reference to a service
171 """
172 try:
173 service = self.hub.getService(serviceName, instance)
174 except Exception:
175 self.hub.log.exception("Failed to get service '%s'", serviceName)
176 return None
177 else:
178 if service is not None and listener:
179 service.addListener(listener)
180 return service
181
183 """
184 Allow a worker register for work.
185
186 @type worker: a pb.RemoteReference
187 @param worker: a reference to zenhubworker
188 @return None
189 """
190 worker.busy = False
191 self.hub.workers.append(worker)
192 def removeWorker(worker):
193 if worker in self.hub.workers:
194 self.hub.workers.remove(worker)
195 worker.notifyOnDisconnect(removeWorker)
196
203
209
215
220
222 """
223 Following the Twisted authentication framework.
224 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html
225 """
226 implements(portal.IRealm)
227
230
232 if pb.IPerspective not in interfaces:
233 raise NotImplementedError
234 return pb.IPerspective, self.hubAvitar, lambda:None
235
238 """Redirect service requests to one of the worker processes. Note
239 that everything else (like change notifications) go through
240 locally hosted services."""
241
242 callTime = 0.
243
247
249 "Intercept requests and send them down to workers"
250 svc = str(self.service.__class__).rpartition('.')[0]
251 instance = self.service.instance
252 args = broker.unserialize(args)
253 kw = broker.unserialize(kw)
254
255
256
257
258
259
260 pickledArgs = pickle.dumps( (args, kw), pickle.HIGHEST_PROTOCOL )
261 chunkedArgs=[]
262 chunkSize = 102400
263 while pickledArgs:
264 chunk = pickledArgs[:chunkSize]
265 chunkedArgs.append(chunk)
266 pickledArgs = pickledArgs[chunkSize:]
267
268 result = self.zenhub.deferToWorker( (svc, instance, message, chunkedArgs) )
269 return broker.serialize(result, self.perspective)
270
272 "Implement the HubService interface by forwarding to the local service"
273 return getattr(self.service, attr)
274
277
279 self.eventworklist = []
280 self.otherworklist = []
281 self.applyworklist = []
282
283
284
285 self.eventPriorityList = [self.eventworklist, self.otherworklist, self.applyworklist]
286 self.otherPriorityList = [self.otherworklist, self.applyworklist, self.eventworklist]
287 self.applyPriorityList = [self.applyworklist, self.eventworklist, self.otherworklist]
288 self.dispatch = {
289 'sendEvents': self.eventworklist,
290 'applyDataMaps': self.applyworklist
291 }
292
294 return self.dispatch.get(item, self.otherworklist)
295
297 return len(self.eventworklist) + len(self.otherworklist) + len(self.applyworklist)
298
300 """
301 Select a single task to be distributed to a worker. We prioritize tasks as follows:
302 sendEvents > configuration service calls > applyDataMaps
303 To prevent starving any queue in an event storm, we randomize the task selection,
304 preferring tasks according to the above priority.
305 """
306 eventchain = filter(None, self.eventPriorityList)
307 otherchain = filter(None, self.otherPriorityList)
308 applychain = filter(None, self.applyPriorityList)
309 seq = choice([eventchain]*4 +
310 [otherchain]*2 +
311 [applychain]
312 )
313 ret = seq[0].pop(0)
314 return ret
315
316 - def push(self, job):
317 self[job.method].insert(0, job)
318
320 self[job.method].append(job)
321
323 """
324 Listen for changes to objects in the Zeo database and update the
325 collectors' configuration.
326
327 The remote collectors connect the ZenHub and request configuration
328 information and stay connected. When changes are detected in the
329 Zeo database, configuration updates are sent out to collectors
330 asynchronously. In this way, changes made in the web GUI can
331 affect collection immediately, instead of waiting for a
332 configuration cycle.
333
334 Each collector uses a different, pluggable service within ZenHub
335 to translate objects into configuration and data. ZenPacks can
336 add services for their collectors. Collectors communicate using
337 Twisted's Perspective Broker, which provides authenticated,
338 asynchronous, bidirectional method invocation.
339
340 ZenHub also provides an XmlRPC interface to some common services
341 to support collectors written in other languages.
342
343 ZenHub does very little work in its own process, but instead dispatches
344 the work to a pool of zenhubworkers, running zenhubworker.py. zenhub
345 manages these workers with 3 data structures:
346 - workers - a list of remote PB instances
347 - worker_processes - a set of WorkerRunningProtocol instances
348 - workerprocessmap - a dict mapping pid to process instance created
349 by reactor.spawnprocess
350 Callbacks and handlers that detect worker shutdown update these
351 structures automatically. ONLY ONE HANDLER must take care of restarting
352 new workers, to avoid accidentally spawning too many workers. This
353 handler also verifies that zenhub is not in the process of shutting
354 down, so that callbacks triggered during daemon shutdown don't keep
355 starting new workers.
356
357 TODO: document invalidation workers
358 """
359
360 totalTime = 0.
361 totalEvents = 0
362 totalCallTime = 0.
363 name = 'zenhub'
364
366 """
367 Hook ourselves up to the Zeo database and wait for collectors
368 to connect.
369 """
370
371 self.workers = []
372 self.workTracker = {}
373 self.workList = _ZenHubWorklist()
374
375 self.worker_processes=set()
376
377 self.workerprocessmap = {}
378 self.shutdown = False
379 self.counters = collections.Counter()
380
381 ZCmdBase.__init__(self)
382 import Products.ZenHub
383 load_config("hub.zcml", Products.ZenHub)
384 notify(HubWillBeCreatedEvent(self))
385
386
387 self.workerselector = WorkerSelector(self.options)
388 self.workList.log = self.log
389
390
391 maxReservedEventsWorkers = 0
392 if self.options.workers:
393 maxReservedEventsWorkers = self.options.workers-1
394 if self.options.workersReservedForEvents > maxReservedEventsWorkers:
395 self.options.workersReservedForEvents = maxReservedEventsWorkers
396 self.log.info("reduced number of workers reserved for sending events to %d",
397 self.options.workersReservedForEvents)
398
399 self.zem = self.dmd.ZenEventManager
400 loadPlugins(self.dmd)
401 self.services = {}
402
403 er = HubRealm(self)
404 checker = self.loadChecker()
405 pt = portal.Portal(er, [checker])
406 interface = '::' if ipv6_available() else ''
407 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt), interface=interface)
408
409 xmlsvc = AuthXmlRpcService(self.dmd, checker)
410 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc), interface=interface)
411
412
413 if self.options.graph_proxy:
414 self.renderConfig = RenderConfig(self.dmd, ZENHUB_ZENRENDER )
415
416
417 import Products.ZenMessaging.queuemessaging
418 load_config_override('twistedpublisher.zcml', Products.ZenMessaging.queuemessaging)
419 notify(HubCreatedEvent(self))
420 self.sendEvent(eventClass=App_Start,
421 summary="%s started" % self.name,
422 severity=0)
423
424 self._initialize_invalidation_filters()
425 reactor.callLater(5, self.processQueue)
426
427 self.rrdStats = self.getRRDStats()
428
429 if self.options.workers:
430 self.workerconfig = zenPath('var', 'zenhub', '%s_worker.conf' % self._getConf().id)
431 self._createWorkerConf()
432 for i in range(self.options.workers):
433 self.createWorker()
434
435
436 reactor.callLater(2, self.giveWorkToWorkers, True)
437
438
439 try:
440 signal.signal(signal.SIGUSR2, self.sighandler_USR2)
441 except ValueError:
442
443
444
445 pass
446
448
449 self._workerStats()
450
451
452 for worker in self.workerprocessmap.values():
453 try:
454 worker.signalProcess(signal.SIGUSR2)
455 time.sleep(0.5)
456 except Exception:
457 pass
458
461
465
479
499
501 filters = (f for n, f in getUtilitiesFor(IInvalidationFilter))
502 self._invalidation_filters = []
503 for fltr in sorted(filters, key=lambda f:getattr(f, 'weight', 100)):
504 fltr.initialize(self.dmd)
505 self._invalidation_filters.append(fltr)
506 self.log.debug('Registered %s invalidation filters.' %
507 len(self._invalidation_filters))
508
538
546
548 """
549 Perform one cycle of update notifications.
550
551 @return: None
552 """
553 changes_dict = self.storage.poll_invalidations()
554 if changes_dict is not None:
555 processor = getUtility(IInvalidationProcessor)
556 d = processor.processQueue(tuple(set(self._filter_oids(changes_dict))))
557 def done(n):
558 if n:
559 self.log.debug('Processed %s oids' % n)
560 d.addCallback(done)
561
562
564 """
565 Useful method for posting events to the EventManager.
566
567 @type kw: keywords (dict)
568 @param kw: the values for an event: device, summary, etc.
569 @return: None
570 """
571 if not 'device' in kw:
572 kw['device'] = self.options.monitor
573 if not 'component' in kw:
574 kw['component'] = self.name
575 try:
576 self.zem.sendEvent(Event(**kw))
577 except Exception:
578 self.log.exception("Unable to send an event")
579
581 """
582 Load the password file
583
584 @return: an object satisfying the ICredentialsChecker
585 interface using a password file or an empty list if the file
586 is not available. Uses the file specified in the --passwd
587 command line option.
588 """
589 try:
590 checker = checkers.FilePasswordDB(self.options.passwordfile)
591
592 u, p = checker._loadCredentials().next()
593 self.workerUsername, self.workerPassword = u, p
594 return checker
595 except Exception, ex:
596 self.log.exception("Unable to load %s", self.options.passwordfile)
597 return []
598
600 """
601 Helper method to load services dynamically for a collector.
602 Returned instances are cached: reconnecting collectors will
603 get the same service object.
604
605 @type name: string
606 @param name: the dotted-name of the module to load
607 (uses @L{Products.ZenUtils.Utils.importClass})
608 @param instance: string
609 @param instance: each service serves only one specific collector
610 instances (like 'localhost'). instance defines the collector's
611 instance name.
612 @return: a service loaded from ZenHub/services or one of the zenpacks.
613 """
614
615 if not self.dmd.Monitors.Performance._getOb(instance, False):
616 raise RemoteBadMonitor( "The provided performance monitor '%s'" % \
617 self.options.monitor + " is not in the current list" )
618
619 try:
620 return self.services[name, instance]
621
622 except KeyError:
623 from Products.ZenUtils.Utils import importClass
624 try:
625 ctor = importClass(name)
626 except ImportError:
627 ctor = importClass('Products.ZenHub.services.%s' % name, name)
628 try:
629 svc = ctor(self.dmd, instance)
630 except Exception:
631 self.log.exception("Failed to initialize %s", ctor)
632
633 if ctor.__module__ in sys.modules:
634 del sys.modules[ctor.__module__]
635 return None
636 else:
637 if self.options.workers:
638 svc = WorkerInterceptor(self, svc)
639 self.services[name, instance] = svc
640 notify(ServiceAddedEvent(name, instance))
641 return svc
642
660
662 now = time.time()
663 jobDesc = "%s:%s.%s" % (job.instance, job.servicename, job.method)
664 stats = self.workTracker.pop(wId, None)
665 idletime = 0
666 if stats:
667 idletime = now - stats.lastupdate
668 self.log.debug("Giving %s to worker %d, (%s)", job.method, wId, jobDesc)
669 self.workTracker[wId] = WorkerStats('Busy', jobDesc, now, idletime)
670
679
680
682 """Parcel out a method invocation to an available worker process
683 """
684 if self.workList:
685 self.log.debug("worklist has %d items", len(self.workList))
686 incompleteJobs = []
687 while self.workList:
688 if all(w.busy for w in self.workers):
689 self.log.debug("all workers are busy")
690 break
691
692 job = self.workList.pop()
693 self.log.debug("get candidate workers for %s..." % job.method)
694 candidateWorkers = list(self.workerselector.getCandidateWorkerIds(job.method, self.workers))
695 self.log.debug("candidate workers are %r", candidateWorkers)
696 for i in candidateWorkers:
697 worker = self.workers[i]
698 worker.busy = True
699 def finished(result, finishedWorker, wId):
700 finishedWorker.busy = False
701 error = None
702 if not isinstance(result, failure.Failure):
703 try:
704 result = pickle.loads(''.join(result))
705 except Exception as e:
706 error = e
707 self.log.exception("Error un-pickling result from worker")
708
709
710
711
712 if isinstance(result, LastCallReturnValue):
713 self.log.debug("worker %s is shutting down" % wId)
714 result = result.returnvalue
715 if finishedWorker in self.workers:
716 self.workers.remove(finishedWorker)
717
718 else:
719 error = result.getErrorMessage()
720 self.updateStatusAtFinish(wId, error)
721 reactor.callLater(0,self.giveWorkToWorkers)
722 return result
723
724 self.counters['workerItems'] += 1
725 self.updateStatusAtStart(i, job)
726 try:
727 d2 = worker.callRemote('execute', *job.args)
728 d2.addBoth(finished, worker, i)
729 except Exception:
730 self.log.warning("Failed to execute job on zenhub worker")
731 d2 = defer.maybeDeferred(finished, failure.Failure(), worker, i)
732 finally:
733 d2.chainDeferred(job.deferred)
734 break
735 else:
736 self.log.debug("no worker available for %s" % job.method)
737
738
739 incompleteJobs.append(job)
740
741 for job in reversed(incompleteJobs):
742
743 self.workList.push(job)
744
745 if incompleteJobs:
746 reactor.callLater(0,self.giveWorkToWorkers)
747
748 if requeue and not self.shutdown:
749 reactor.callLater(5,self.giveWorkToWorkers, True)
750
752 now = time.time()
753 lines = ['Worklist Stats:',
754 '\tEvents:\t%s' % len(self.workList.eventworklist),
755 '\tOther:\t%s' % len(self.workList.otherworklist),
756 '\tApplyDataMaps:\t%s' % len(self.workList.applyworklist),
757 '\tTotal:\t%s' % len(self.workList),
758 'Worker Stats:']
759 for wId, worker in enumerate(self.workers):
760 stat = self.workTracker.get(wId, None)
761 linePattern = '\t%d:%s\t[%s%s]\t%.3fs'
762 lines.append(linePattern % (
763 wId,
764 'Busy' if worker.busy else 'Idle',
765 '%s %s' % (stat.status, stat.description) if stat else 'No Stats',
766 ' Idle:%.3fs' % stat.previdle if stat and stat.previdle else '',
767 now - stat.lastupdate if stat else 0
768 ))
769 self.log.info('\n'.join(lines))
770
772 workerconfigdir = os.path.dirname(self.workerconfig)
773 if not os.path.exists(workerconfigdir):
774 os.makedirs(workerconfigdir)
775 with open(self.workerconfig,'w') as workerfd:
776 workerfd.write("hubport %s\n" % self.options.pbport)
777 workerfd.write("username %s\n" % self.workerUsername)
778 workerfd.write("password %s\n" % self.workerPassword)
779 workerfd.write("logseverity %s\n" % self.options.logseverity)
780 workerfd.write("zodb-cachesize %s\n" % self.options.zodb_cachesize)
781 workerfd.write("calllimit %s\n" % self.options.worker_call_limit)
782
784 """Start a worker subprocess
785
786 @return: None
787 """
788
789 if len(self.worker_processes) >= self.options.workers:
790 self.log.info("already at maximum number of worker processes, no worker will be created")
791 return
792 exe = zenPath('bin', 'zenhubworker')
793
794
795 class WorkerRunningProtocol(protocol.ProcessProtocol):
796
797 def __init__(self, parent):
798 self._pid = 0
799 self.parent = parent
800 self.log = parent.log
801
802 @property
803 def pid(self):
804 return self._pid
805
806 def connectionMade(self):
807 self._pid = self.transport.pid
808 reactor.callLater(1, self.parent.giveWorkToWorkers)
809
810 def outReceived(self, data):
811 self.log.debug("Worker (%d) reports %s" % (self.pid, data.rstrip(),))
812
813 def errReceived(self, data):
814 self.log.info("Worker (%d) reports %s" % (self.pid, data.rstrip(),))
815
816 def processEnded(self, reason):
817 self.parent.worker_processes.discard(self)
818 self.parent.workerprocessmap.pop(self.pid, None)
819 self.log.warning("Worker (%d) exited with status: %d (%s)",
820 self.pid,
821 reason.value.exitCode,
822 getExitMessage(reason.value.exitCode))
823
824 if not self.parent.shutdown:
825 self.log.info("Starting new zenhubworker")
826 self.parent.createWorker()
827
828 args = (exe, 'run', '-C', self.workerconfig)
829 self.log.debug("Starting %s", ' '.join(args))
830 prot = WorkerRunningProtocol(self)
831 proc = reactor.spawnProcess(prot, exe, args, os.environ)
832 self.workerprocessmap[proc.pid] = proc
833 self.worker_processes.add(prot)
834
836 """
837 Since we don't do anything on a regular basis, just
838 push heartbeats regularly.
839
840 @return: None
841 """
842 seconds = 30
843 evt = EventHeartbeat(self.options.monitor, self.name, 3*seconds)
844 self.zem.sendEvent(evt)
845 self.niceDoggie(seconds)
846 reactor.callLater(seconds, self.heartbeat)
847 r = self.rrdStats
848 totalTime = sum(s.callTime for s in self.services.values())
849 events = r.counter('totalTime', seconds, int(self.totalTime * 1000))
850 events += r.counter('totalEvents', seconds, self.totalEvents)
851 events += r.gauge('services', seconds, len(self.services))
852 events += r.counter('totalCallTime', seconds, totalTime)
853 events += r.gauge('workListLength', seconds, len(self.workList))
854 for name, value in self.counters.items():
855 events += r.counter(name, seconds, value)
856 self.zem.sendEvents(events)
857
858
859 self.saveCounters()
860 try:
861 hbcheck = IHubHeartBeatCheck(self)
862 hbcheck.check()
863 except:
864 self.log.exception("Error processing heartbeat hook")
865
867 atomicWrite(
868 zenPath('var/zenhub_counters.pickle'),
869 pickle.dumps(self.counters),
870 raiseException=False,
871 )
872
874 try:
875 self.counters = pickle.load(open(zenPath('var/zenhub_counters.pickle')))
876 except Exception:
877 pass
878
880 """
881 Start the main event loop.
882 """
883 if self.options.cycle:
884 self.heartbeat()
885 reactor.run()
886 self.shutdown = True
887 for proc in self.workerprocessmap.itervalues():
888 try:
889 proc.signalProcess('KILL')
890 except ProcessExitedAlready:
891 pass
892 except Exception:
893 pass
894 workerconfig = getattr(self,'workerconfig', None)
895 if workerconfig and os.path.exists(workerconfig):
896 os.unlink(self.workerconfig)
897 getUtility(IEventPublisher).close()
898
900 """
901 Adds our command line options to ZCmdBase command line options.
902 """
903 ZCmdBase.buildOptions(self)
904 self.parser.add_option('--xmlrpcport', '-x', dest='xmlrpcport',
905 type='int', default=XML_RPC_PORT,
906 help='Port to use for XML-based Remote Procedure Calls (RPC)')
907 self.parser.add_option('--pbport', dest='pbport',
908 type='int', default=PB_PORT,
909 help="Port to use for Twisted's pb service")
910 self.parser.add_option('--passwd', dest='passwordfile',
911 type='string', default=zenPath('etc','hubpasswd'),
912 help='File where passwords are stored')
913 self.parser.add_option('--monitor', dest='monitor',
914 default='localhost',
915 help='Name of the distributed monitor this hub runs on')
916 self.parser.add_option('--workers', dest='workers',
917 type='int', default=2,
918 help="Number of worker instances to handle requests")
919 self.parser.add_option('--prioritize', dest='prioritize',
920 action='store_true', default=False,
921 help="Run higher priority jobs before lower priority ones")
922 self.parser.add_option('--anyworker', dest='anyworker',
923 action='store_true', default=False,
924 help='Allow any priority job to run on any worker')
925 self.parser.add_option('--logworkerstats', dest='logworkerstats',
926 action='store_true', default=False,
927 help='Log current worker state to $ZENHOME/log/workerstats')
928 self.parser.add_option('--no-graph-proxy', dest='graph_proxy',
929 action='store_false', default=True,
930 help="Don't listen to proxy graph requests to zenrender")
931 self.parser.add_option('--workers-reserved-for-events', dest='workersReservedForEvents',
932 type='int', default=1,
933 help="Number of worker instances to reserve for handling events")
934 self.parser.add_option('--worker-call-limit', dest='worker_call_limit',
935 type='int', default=200,
936 help="Maximum number of remote calls a worker can run before restarting")
937
938 notify(ParserReadyForOptionsEvent(self.parser))
939
950
960
961
962 if __name__ == '__main__':
963 from Products.ZenHub.zenhub import ZenHub
964 z = ZenHub()
965
966
967 z.loadCounters()
968
969 z.main()
970
971
972 z.saveCounters()
973