Package Products :: Package ZenRRD :: Module zenprocess
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zenprocess

  1  #! /usr/bin/env python 
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, 2009 Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14  __doc__= """zenprocess 
 15   
 16  Gets SNMP process data from a device's HOST-RESOURCES-MIB 
 17  and store process performance in RRD files. 
 18  """ 
 19   
 20  import Globals 
 21  import logging 
 22  import sys 
 23   
 24  import zope.component 
 25  import zope.interface 
 26   
 27  from Products.ZenCollector.daemon import CollectorDaemon 
 28  from Products.ZenCollector.interfaces import ICollectorPreferences,\ 
 29      IScheduledTask, IEventService, IDataService, IConfigurationListener 
 30  from Products.ZenCollector.tasks import SimpleTaskFactory, SimpleTaskSplitter,\ 
 31      TaskStates 
 32  from Products.ZenEvents import Event 
 33  from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess, \ 
 34          Status_Perf 
 35  from Products.ZenUtils.observable import ObservableMixin 
 36  from Products.ZenUtils.Chain import Chain 
 37   
 38  # We retrieve our configuration data remotely via a Twisted PerspectiveBroker 
 39  # connection. To do so, we need to import the class that will be used by the 
 40  # configuration service to send the data over, i.e. DeviceProxy. 
 41  from Products.ZenUtils.Utils import unused 
 42  from Products.ZenCollector.services.config import DeviceProxy 
 43  unused(DeviceProxy) 
 44  from Products.ZenHub.services.ProcessConfig import ProcessProxy 
 45  unused(ProcessProxy) 
 46  from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo 
 47  unused(SnmpConnInfo) 
 48   
 49  from twisted.internet import defer, error 
 50  from twisted.python.failure import Failure 
 51   
 52  log = logging.getLogger("zen.zenprocess") 
 53   
 54  # HOST-RESOURCES-MIB OIDs used 
 55  HOSTROOT  ='.1.3.6.1.2.1.25' 
 56  RUNROOT   = HOSTROOT + '.4' 
 57  NAMETABLE = RUNROOT + '.2.1.2' 
 58  PATHTABLE = RUNROOT + '.2.1.4' 
 59  ARGSTABLE = RUNROOT + '.2.1.5' 
 60  PERFROOT  = HOSTROOT + '.5' 
 61  CPU       = PERFROOT + '.1.1.1.'        # note trailing dot 
 62  MEM       = PERFROOT + '.1.1.2.'        # note trailing dot 
 63   
 64  # Max size for CPU numbers 
 65  WRAP = 0xffffffffL 
66 67 # Create an implementation of the ICollectorPreferences interface so that the 68 # ZenCollector framework can configure itself from our preferences. 69 -class ZenProcessPreferences(object):
70 zope.interface.implements(ICollectorPreferences) 71
72 - def __init__(self):
73 """ 74 Constructs a new ZenProcessPreferences instance and provide default 75 values for needed attributes. 76 """ 77 self.collectorName = "zenprocess" 78 self.defaultRRDCreateCommand = None 79 self.configCycleInterval = 20 # minutes 80 81 #will be updated based on Performance Config property of same name 82 self.processCycleInterval = 3 * 60 83 84 #will be filled in based on buildOptions 85 self.options = None 86 87 # the configurationService attribute is the fully qualified class-name 88 # of our configuration service that runs within ZenHub 89 self.configurationService = 'Products.ZenHub.services.ProcessConfig'
90 91 @property
92 - def cycleInterval(self):
93 """ 94 defined as a property since it is needed by the interface 95 """ 96 return self.processCycleInterval
97
98 - def buildOptions(self, parser):
99 """ 100 Build a list of command-line options 101 """ 102 parser.add_option('--showprocs', 103 dest='showprocs', 104 action="store_true", 105 default=False, 106 help="Show the list of processes found." \ 107 "For debugging purposes only.") 108 parser.add_option('--showrawtables', 109 dest='showrawtables', 110 action="store_true", 111 default=False, 112 help="Show the raw SNMP processes data returned " \ 113 "from the device. For debugging purposes only.")
114
115 - def postStartup(self):
116 pass
117
118 -class DeviceStats:
119 - def __init__(self, deviceProxy):
120 self.config = deviceProxy 121 # map pid number to ProcessStats object 122 self._pidToProcess = {} 123 # map ProcessProxy id to ProcessStats object 124 self._processes = {} 125 for id, process in deviceProxy.processes.items(): 126 self._processes[id] = ProcessStats(process)
127
128 - def update(self, deviceProxy):
129 unused = set(self._processes.keys()) 130 for id, process in deviceProxy.processes.items(): 131 unused.discard(id) 132 if self._processes.get(id): 133 self._processes[id].update(process) 134 else: 135 self._processes[id] = ProcessStats(process) 136 137 #delete the left overs 138 for id in unused: 139 del self.processes[id]
140 141 @property
142 - def processStats(self):
143 """ 144 The ProcessStats: processes configured to be monitored 145 """ 146 return self._processes.values()
147 148 @property
149 - def pids(self):
150 """ 151 returns the pids from being tracked 152 """ 153 return self._pidToProcess.keys()
154 155 @property
156 - def monitoredProcs(self):
157 """ 158 returns ProcessStats for which we have a pid 159 """ 160 return self._pidToProcess.values()
161
162 -class ProcessStats:
163 - def __init__(self, processProxy):
164 self._pids={} 165 self._config = processProxy 166 self.cpu = 0
167
168 - def update(self, processProxy):
169 self._config = processProxy
170
171 - def __str__(self):
172 """ 173 Override the Python default to represent ourselves as a string 174 """ 175 return str(self._config.name)
176 __repr__ = __str__ 177
178 - def match(self, name, args):
179 """ 180 Perform exact comparisons on the process names. 181 182 @parameter name: name of a process to compare 183 @type name: string 184 @parameter args: argument list of the process 185 @type args: string 186 @return: does the name match this process's info? 187 @rtype: Boolean 188 """ 189 if self._config.name is None: 190 return False 191 if self._config.ignoreParameters or not args: 192 return self._config.originalName == name 193 return self._config.originalName == '%s %s' % (name, args)
194
195 - def updateCpu(self, pid, value):
196 """ 197 """ 198 pid = self._pids.setdefault(pid, Pid()) 199 cpu = pid.updateCpu(value) 200 if cpu is not None: 201 self.cpu += cpu 202 self.cpu %= WRAP
203
204 - def getCpu(self):
205 """ 206 """ 207 return self.cpu
208
209 - def updateMemory(self, pid, value):
210 """ 211 """ 212 self._pids.setdefault(pid, Pid()).memory = value
213
214 - def getMemory(self):
215 """ 216 """ 217 return sum([x.memory for x in self._pids.values() 218 if x.memory is not None])
219
220 - def discardPid(self, pid):
221 """ 222 """ 223 if pid in self._pids: 224 del self._pids[pid]
225
226 -class Pid:
227 """ 228 Helper class to track process id information 229 """ 230 cpu = None 231 memory = None 232
233 - def updateCpu(self, n):
234 """ 235 """ 236 if n is not None: 237 try: 238 n = int(n) 239 except ValueError: 240 log.warning("Bad value for CPU: '%s'", n) 241 242 if self.cpu is None or n is None: 243 self.cpu = n 244 return None 245 diff = n - self.cpu 246 if diff < 0: 247 # don't provide a value when the counter falls backwards 248 n = None 249 diff = None 250 self.cpu = n 251 return diff
252
253 - def updateMemory(self, n):
254 """ 255 """ 256 self.memory = n
257
258 - def __str__(self):
259 """ 260 Override the Python default to represent ourselves as a string 261 """ 262 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
263 __repr__ = __str__
264
265 -class ConfigListener(object):
266 zope.interface.implements(IConfigurationListener) 267
268 - def deleted(self, configurationId):
269 """ 270 Called when a configuration is deleted from the collector 271 """ 272 log.debug('ConfigListener: configuration %s deleted' % configurationId) 273 ZenProcessTask.DEVICE_STATS.pop(configurationId, None)
274
275 - def added(self, configuration):
276 """ 277 Called when a configuration is added to the collector 278 """ 279 log.debug('ConfigListener: configuration %s added' % configuration)
280 281
282 - def updated(self, newConfiguration):
283 """ 284 Called when a configuration is updated in collector 285 """ 286 log.debug('ConfigListener: configuration %s updated' % newConfiguration)
287
288 # Create an implementation of the IScheduledTask interface that will perform 289 # the actual collection work needed by this collector. 290 -class ZenProcessTask(ObservableMixin):
291 """ 292 A scheduled task that finds instances of configure processes and collects 293 metrics on the processes 294 """ 295 zope.interface.implements(IScheduledTask) 296 297 #Keep state about process stats across task updates 298 DEVICE_STATS = {} 299 300 #counter to keep track of total restarted and missing processes 301 RESTARTED = 0 302 MISSING = 0 303 304 STATE_CONNECTING = 'CONNECTING' 305 STATE_SCANNING_PROCS = 'SCANNING_PROCESSES' 306 STATE_FETCH_PERF = 'FETCH_PERF_DATA' 307 STATE_STORE_PERF = 'STORE_PERF_DATA' 308 309 statusEvent = { 'eventClass' : Status_OSProcess, 310 'eventGroup' : 'Process' } 311
312 - def __init__(self, 313 deviceId, 314 taskName, 315 scheduleIntervalSeconds, 316 taskConfig):
317 super(ZenProcessTask, self).__init__() 318 319 #needed for interface 320 self.name = taskName 321 self.configId = deviceId 322 self.interval = scheduleIntervalSeconds 323 self.state = TaskStates.STATE_IDLE 324 325 #the task config corresponds to a DeviceProxy 326 self._device = taskConfig 327 self._devId = self._device.name 328 self._manageIp = self._device.manageIp 329 self._maxOidsPerRequest = self._device.zMaxOIDPerRequest 330 331 self._dataService = zope.component.queryUtility(IDataService) 332 self._eventService = zope.component.queryUtility(IEventService) 333 self._preferences = zope.component.queryUtility(ICollectorPreferences, 334 "zenprocess") 335 self.snmpProxy = None 336 self.snmpConnInfo = self._device.snmpConnInfo 337 338 self._deviceStats = ZenProcessTask.DEVICE_STATS.get(self._devId) 339 if self._deviceStats: 340 self._deviceStats.update(self._device) 341 else: 342 self._deviceStats = DeviceStats(self._device) 343 ZenProcessTask.DEVICE_STATS[self._devId] = self._deviceStats
344
345 - def _failure(self, reason):
346 """ 347 Twisted errBack to log the exception for a single device. 348 349 @parameter reason: explanation of the failure 350 @type reason: Twisted error instance 351 """ 352 msg = 'Unable to read processes on device %s' % self._devId 353 if isinstance(reason.value, error.TimeoutError): 354 log.debug('Timeout on device %s' % self._devId) 355 msg = '%s; Timeout on device' % msg 356 else: 357 msg = '%s; error: %s' % (msg, reason.getErrorMessage()) 358 log.error('Error on device %s; %s' % (self._devId, 359 reason.getErrorMessage())) 360 361 self._eventService.sendEvent(self.statusEvent, 362 eventClass=Status_Snmp, 363 component="process", 364 device=self._devId, 365 summary=msg, 366 severity=Event.Error) 367 return reason
368 369
370 - def _connectCallback(self, result):
371 """ 372 Callback called after a successful connect to the remote device. 373 """ 374 log.debug("Connected to %s [%s]", self._devId, self._manageIp) 375 return result
376
377 - def _collectCallback(self, result):
378 """ 379 Callback called after a connect or previous collection so that another 380 collection can take place. 381 """ 382 log.debug("scanning for processes from %s [%s]", 383 self._devId, self._manageIp) 384 385 self.state = ZenProcessTask.STATE_SCANNING_PROCS 386 tables = [NAMETABLE, PATHTABLE, ARGSTABLE] 387 d = self._getTables(tables) 388 d.addCallbacks(self._storeProcessNames, self._failure) 389 d.addCallback(self._fetchPerf) 390 return d
391
392 - def _finished(self, result):
393 """ 394 Callback activated when the task is complete 395 """ 396 if not isinstance(result, Failure): 397 log.debug("Device %s [%s] scanned successfully", 398 self._devId, self._manageIp) 399 else: 400 log.debug("Device %s [%s] scanned failed, %s", 401 self._devId, self._manageIp, result.getErrorMessage()) 402 403 try: 404 self._close() 405 except Exception, e: 406 log.warn("Failed to close device %s: error %s" % 407 (self._devId, str(e))) 408 # give the result to the rest of the callback/errchain so that the 409 # ZenCollector framework can keep track of the success/failure rate 410 return result
411
412 - def cleanup(self):
413 return self._close()
414
415 - def doTask(self):
416 """ 417 Contact to one device and return a deferred which gathers data from 418 the device. 419 420 @return: job to scan a device 421 @rtype: Twisted deferred object 422 """ 423 424 # see if we need to connect first before doing any collection 425 d = defer.maybeDeferred(self._connect) 426 d.addCallbacks(self._connectCallback, self._failure) 427 d.addCallback(self._collectCallback) 428 429 # Add the _finished callback to be called in both success and error 430 # scenarios. 431 d.addBoth(self._finished) 432 433 # returning a Deferred will keep the framework from assuming the task 434 # is done until the Deferred actually completes 435 return d
436
437 - def _storeProcessNames(self, results):
438 """ 439 Parse the process tables and reconstruct the list of processes 440 that are on the device. 441 442 @parameter results: results of SNMP table gets 443 @type results: dictionary of dictionaries 444 @parameter device: proxy connection object 445 @type device: Device object 446 """ 447 448 if not results or not results[NAMETABLE]: 449 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % \ 450 self._devId 451 resolution="Verify with snmpwalk -v1 -c community %s %s" % \ 452 (self._devId, NAMETABLE ) 453 454 self._eventService.sendEvent(self.statusEvent, 455 device=self._devId, 456 summary=summary, 457 resolution=resolution, 458 severity=Event.Error) 459 log.info(summary) 460 return defer.fail(summary) 461 462 summary = 'Process table up for device %s' % self._devId 463 self._clearSnmpError(summary) 464 465 showrawtables = self._preferences.options.showrawtables 466 args, procs = mapResultsToDicts(showrawtables, results) 467 if self._preferences.options.showprocs: 468 self._showProcessList( procs ) 469 470 # look for changes in processes 471 beforePids = set(self._deviceStats.pids) 472 afterPidToProcessStats = {} 473 for pStats in self._deviceStats.processStats: 474 for pid, (name, args) in procs: 475 if pStats.match(name, args): 476 log.debug("Found process %d on %s" % (pid, 477 pStats._config.name)) 478 afterPidToProcessStats[pid] = pStats 479 afterPids = set(afterPidToProcessStats.keys()) 480 afterByConfig = reverseDict(afterPidToProcessStats) 481 newPids = afterPids - beforePids 482 deadPids = beforePids - afterPids 483 484 # report pid restarts 485 restarted = {} 486 for pid in deadPids: 487 procStats = self._deviceStats._pidToProcess[pid] 488 procStats.discardPid(pid) 489 if procStats in afterByConfig: 490 ZenProcessTask.RESTARTED += 1 491 pConfig = procStats._config 492 if pConfig.restart: 493 restarted[procStats] = True 494 495 summary = 'Process restarted: %s' % pConfig.originalName 496 497 self._eventService.sendEvent(self.statusEvent, 498 device=self._devId, 499 summary=summary, 500 component=pConfig.originalName, 501 eventKey=pConfig.processClass, 502 severity=pConfig.severity) 503 log.info(summary) 504 505 # report alive processes 506 for processStat in afterByConfig.keys(): 507 if processStat in restarted: continue 508 summary = "Process up: %s" % processStat._config.originalName 509 self._eventService.sendEvent(self.statusEvent, 510 device=self._devId, 511 summary=summary, 512 component=processStat._config.originalName, 513 eventKey=processStat._config.processClass, 514 severity=Event.Clear) 515 log.debug(summary) 516 517 for pid in newPids: 518 log.debug("Found new %s pid %d on %s" % ( 519 afterPidToProcessStats[pid]._config.originalName, pid, 520 self._devId)) 521 522 self._deviceStats._pidToProcess = afterPidToProcessStats 523 524 # Look for missing processes 525 for procStat in self._deviceStats.processStats: 526 if procStat not in afterByConfig: 527 procConfig = procStat._config 528 ZenProcessTask.MISSING += 1 529 summary = 'Process not running: %s' % procConfig.originalName 530 self._eventService.sendEvent(self.statusEvent, 531 device=self._devId, 532 summary=summary, 533 component=procConfig.originalName, 534 eventKey=procConfig.processClass, 535 severity=procConfig.severity) 536 log.warning(summary) 537 538 # Store per-device, per-process statistics 539 pidCounts = dict([(p, 0) for p in self._deviceStats.processStats]) 540 for procStat in self._deviceStats.monitoredProcs: 541 pidCounts[procStat] += 1 542 for procName, count in pidCounts.items(): 543 self._save(procName, 'count_count', count, 'GAUGE') 544 return results
545
546 - def _fetchPerf(self, results):
547 """ 548 Get performance data for all the monitored processes on a device 549 550 @parameter device: proxy object to the remote computer 551 @type device: Device object 552 """ 553 self.state = ZenProcessTask.STATE_FETCH_PERF 554 555 oids = [] 556 for pid in self._deviceStats.pids: 557 oids.extend([CPU + str(pid), MEM + str(pid)]) 558 if not oids: 559 return defer.succeed(([])) 560 d = Chain(self._get, iter(chunk(oids, self._maxOidsPerRequest))).run() 561 d.addCallback(self._storePerfStats) 562 d.addErrback(self._failure) 563 return d
564
565 - def _storePerfStats(self, results):
566 """ 567 Save the process performance data in RRD files 568 569 @parameter results: results of SNMP table gets 570 @type results: list of (success, result) tuples 571 @parameter device: proxy object to the remote computer 572 @type device: Device object 573 """ 574 self.state = ZenProcessTask.STATE_STORE_PERF 575 for success, result in results: 576 if not success: 577 #return the failure 578 return result 579 self._clearSnmpError('Process table up for device %s' % self._devId) 580 parts = {} 581 for success, values in results: 582 if success: 583 parts.update(values) 584 results = parts 585 byConf = reverseDict(self._deviceStats._pidToProcess) 586 for procStat, pids in byConf.items(): 587 if len(pids) != 1: 588 log.info("There are %d pids by the name %s", 589 len(pids), procStat._config.name) 590 procName = procStat._config.name 591 for pid in pids: 592 cpu = results.get(CPU + str(pid), None) 593 mem = results.get(MEM + str(pid), None) 594 procStat.updateCpu(pid, cpu) 595 procStat.updateMemory(pid, mem) 596 self._save(procName, 'cpu_cpu', procStat.getCpu(), 597 'DERIVE', min=0) 598 self._save(procName, 'mem_mem', 599 procStat.getMemory() * 1024, 'GAUGE') 600 return results
601
602 - def _getTables(self, oids):
603 """ 604 Perform SNMP getTable for specified OIDs 605 @parameter oids: OIDs to gather 606 @type oids: list of strings 607 @return: Twisted deferred 608 @rtype: Twisted deferred 609 """ 610 repetitions = self._maxOidsPerRequest / len(oids) 611 t = self.snmpProxy.getTable(oids, 612 timeout=self.snmpConnInfo.zSnmpTimeout, 613 retryCount=self.snmpConnInfo.zSnmpTries, 614 maxRepetitions=repetitions, 615 limit=sys.maxint) 616 return t
617
618 - def _get(self, oids):
619 """ 620 Perform SNMP get for specified OIDs 621 622 @parameter oids: OIDs to gather 623 @type oids: list of strings 624 @return: Twisted deferred 625 @rtype: Twisted deferred 626 """ 627 return self.snmpProxy.get(oids, 628 self.snmpConnInfo.zSnmpTimeout, 629 self.snmpConnInfo.zSnmpTries)
630
631 - def _connect(self):
632 """ 633 Create a connection to the remote device 634 """ 635 self.state = ZenProcessTask.STATE_CONNECTING 636 if (self.snmpProxy is None or 637 self.snmpProxy.snmpConnInfo != self.snmpConnInfo): 638 self.snmpProxy = self.snmpConnInfo.createSession() 639 self.snmpProxy.open()
640
641 - def _close(self):
642 """ 643 Close down the connection to the remote device 644 """ 645 if self.snmpProxy: 646 self.snmpProxy.close() 647 self.snmpProxy = None
648 649
650 - def _showProcessList(self, procs):
651 """ 652 Display the processes in a sane manner. 653 654 @parameter procs: list of (pid, (name, args)) 655 @type procs: list of tuples 656 """ 657 device_name = self._devId 658 proc_list = [ '%s %s %s' % (pid, name, args) for pid, (name, args) \ 659 in sorted(procs)] 660 proc_list.append('') 661 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
662
663 - def _clearSnmpError(self, message):
664 """ 665 Send an event to clear other events. 666 667 @parameter name: device for which the event applies 668 @type name: string 669 @parameter message: clear text 670 @type message: string 671 """ 672 self._eventService.sendEvent(self.statusEvent, 673 eventClass=Status_Snmp, 674 component="process", 675 device=self._devId, 676 summary=message, 677 agent='zenprocess', 678 severity=Event.Clear)
679
680 - def _save(self, pidName, statName, value, rrdType, min='U'):
681 """ 682 Save a value into an RRD file 683 684 @param pidName: process id of the monitored process 685 @type pidName: string 686 @param statName: metric name 687 @type statName: string 688 @param value: data to be stored 689 @type value: number 690 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER) 691 @type rrdType: string 692 """ 693 deviceName = self._devId 694 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName) 695 try: 696 self._dataService.writeRRD(path, value, rrdType, min=min) 697 except Exception, ex: 698 summary= "Unable to save data for process-monitor RRD %s" % \ 699 path 700 log.critical( summary ) 701 702 message= "Data was value= %s, type=%s" % \ 703 ( value, rrdType ) 704 log.critical( message ) 705 log.exception( ex ) 706 707 import traceback 708 trace_info= traceback.format_exc() 709 710 self._eventService.sendEvent(dict( 711 dedupid="%s|%s" % (self._preferences.options.monitor, 712 'RRD write failure'), 713 severity=Event.Critical, 714 device=self._preferences.options.monitor, 715 eventClass=Status_Perf, 716 component="RRD", 717 pidName=pidName, 718 statName=statName, 719 path=path, 720 message=message, 721 traceback=trace_info, 722 summary=summary))
723
724 -def mapResultsToDicts(showrawtables, results):
725 """ 726 Parse the process tables and reconstruct the list of processes 727 that are on the device. 728 729 @parameter results: results of SNMP table gets ie (OID + pid, value) 730 @type results: dictionary of dictionaries 731 @return: maps relating names and pids to each other 732 @rtype: dictionary, list of tuples 733 """ 734 def extract(dictionary, oid, value): 735 """ 736 Helper function to extract SNMP table data. 737 """ 738 pid = int(oid.split('.')[-1]) 739 dictionary[pid] = value
740 741 names, paths, args = {}, {}, {} 742 if showrawtables: 743 log.info("NAMETABLE = %r", results[NAMETABLE]) 744 for row in results[NAMETABLE].items(): 745 extract(names, *row) 746 747 if showrawtables: 748 log.info("PATHTABLE = %r", results[PATHTABLE]) 749 for row in results[PATHTABLE].items(): 750 extract(paths, *row) 751 752 if showrawtables: 753 log.info("ARGSTABLE = %r", results[ARGSTABLE]) 754 for row in results[ARGSTABLE].items(): 755 extract(args, *row) 756 757 procs = [] 758 for pid, name in names.items(): 759 path = paths.get(pid, '') 760 if path and path.find('\\') == -1: 761 name = path 762 arg = args.get(pid, '') 763 procs.append( (pid, (name, arg) ) ) 764 765 return args, procs 766
767 -def reverseDict(d):
768 """ 769 Return a dictionary with keys and values swapped: 770 all values are lists to handle the different keys mapping to the same value 771 """ 772 result = {} 773 for a, v in d.items(): 774 result.setdefault(v, []).append(a) 775 return result
776
777 -def chunk(lst, n):
778 """ 779 Break lst into n-sized chunks 780 """ 781 return [lst[i:i+n] for i in range(0, len(lst), n)]
782 783 # Collector Daemon Main entry point 784 # 785 if __name__ == '__main__': 786 myPreferences = ZenProcessPreferences() 787 788 myTaskFactory = SimpleTaskFactory(ZenProcessTask) 789 myTaskSplitter = SimpleTaskSplitter(myTaskFactory) 790 daemon = CollectorDaemon(myPreferences, myTaskSplitter, ConfigListener()) 791 daemon.run() 792