Package Products :: Package ZenRRD :: Module zenprocess
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenRRD.zenprocess

  1  #! /usr/bin/env python 
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, 2009 Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14  __doc__= """zenprocess 
 15   
 16  Gets SNMP process data from a device's HOST-RESOURCES-MIB 
 17  and store process performance in RRD files. 
 18  """ 
 19   
 20  import Globals 
 21  import logging 
 22  import sys 
 23   
 24  import zope.component 
 25  import zope.interface 
 26   
 27  from Products.ZenCollector.daemon import CollectorDaemon 
 28  from Products.ZenCollector.interfaces import ICollectorPreferences,\ 
 29      IScheduledTask, IEventService, IDataService, IConfigurationListener 
 30  from Products.ZenCollector.tasks import SimpleTaskFactory, SimpleTaskSplitter,\ 
 31      TaskStates 
 32  from Products.ZenEvents import Event 
 33  from Products.ZenEvents.ZenEventClasses import Status_Snmp, Status_OSProcess, \ 
 34          Status_Perf 
 35  from Products.ZenUtils.observable import ObservableMixin 
 36  from Products.ZenUtils.Chain import Chain 
 37   
 38  # We retrieve our configuration data remotely via a Twisted PerspectiveBroker 
 39  # connection. To do so, we need to import the class that will be used by the 
 40  # configuration service to send the data over, i.e. DeviceProxy. 
 41  from Products.ZenUtils.Utils import unused 
 42  from Products.ZenCollector.services.config import DeviceProxy 
 43  unused(DeviceProxy) 
 44  from Products.ZenHub.services.ProcessConfig import ProcessProxy 
 45  unused(ProcessProxy) 
 46  from Products.ZenHub.services.PerformanceConfig import SnmpConnInfo 
 47  unused(SnmpConnInfo) 
 48   
 49  from twisted.internet import defer, error 
 50  from twisted.python.failure import Failure 
 51   
 52  log = logging.getLogger("zen.zenprocess") 
 53   
 54  # HOST-RESOURCES-MIB OIDs used 
 55  HOSTROOT  ='.1.3.6.1.2.1.25' 
 56  RUNROOT   = HOSTROOT + '.4' 
 57  NAMETABLE = RUNROOT + '.2.1.2' 
 58  PATHTABLE = RUNROOT + '.2.1.4' 
 59  ARGSTABLE = RUNROOT + '.2.1.5' 
 60  PERFROOT  = HOSTROOT + '.5' 
 61  CPU       = PERFROOT + '.1.1.1.'        # note trailing dot 
 62  MEM       = PERFROOT + '.1.1.2.'        # note trailing dot 
 63   
 64  # Max size for CPU numbers 
 65  WRAP = 0xffffffffL 
 66   
 67  # Create an implementation of the ICollectorPreferences interface so that the 
 68  # ZenCollector framework can configure itself from our preferences. 
69 -class ZenProcessPreferences(object):
70 zope.interface.implements(ICollectorPreferences) 71
72 - def __init__(self):
73 """ 74 Constructs a new ZenProcessPreferences instance and provide default 75 values for needed attributes. 76 """ 77 self.collectorName = "zenprocess" 78 self.defaultRRDCreateCommand = None 79 self.configCycleInterval = 20 # minutes 80 81 #will be updated based on Performance Config property of same name 82 self.processCycleInterval = 3 * 60 83 84 #will be filled in based on buildOptions 85 self.options = None 86 87 # the configurationService attribute is the fully qualified class-name 88 # of our configuration service that runs within ZenHub 89 self.configurationService = 'Products.ZenHub.services.ProcessConfig'
90 91 @property
92 - def cycleInterval(self):
93 """ 94 defined as a property since it is needed by the interface 95 """ 96 return self.processCycleInterval
97
98 - def buildOptions(self, parser):
99 """ 100 Build a list of command-line options 101 """ 102 parser.add_option('--showprocs', 103 dest='showprocs', 104 action="store_true", 105 default=False, 106 help="Show the list of processes found." \ 107 "For debugging purposes only.") 108 parser.add_option('--showrawtables', 109 dest='showrawtables', 110 action="store_true", 111 default=False, 112 help="Show the raw SNMP processes data returned " \ 113 "from the device. For debugging purposes only.")
114
115 - def postStartup(self):
116 pass
117
118 -class DeviceStats:
119 - def __init__(self, deviceProxy):
120 self.config = deviceProxy 121 # map pid number to ProcessStats object 122 self._pidToProcess = {} 123 # map ProcessProxy id to ProcessStats object 124 self._processes = {} 125 for id, process in deviceProxy.processes.items(): 126 self._processes[id] = ProcessStats(process)
127
128 - def update(self, deviceProxy):
129 unused = set(self._processes.keys()) 130 for id, process in deviceProxy.processes.items(): 131 unused.discard(id) 132 if self._processes.get(id): 133 self._processes[id].update(process) 134 else: 135 self._processes[id] = ProcessStats(process) 136 137 #delete the left overs 138 for id in unused: 139 del self.processes[id]
140 141 @property
142 - def processStats(self):
143 """ 144 The ProcessStats: processes configured to be monitored 145 """ 146 return self._processes.values()
147 148 @property
149 - def pids(self):
150 """ 151 returns the pids from being tracked 152 """ 153 return self._pidToProcess.keys()
154 155 @property
156 - def monitoredProcs(self):
157 """ 158 returns ProcessStats for which we have a pid 159 """ 160 return self._pidToProcess.values()
161
162 -class ProcessStats:
163 - def __init__(self, processProxy):
164 self._pids={} 165 self._config = processProxy 166 self.cpu = 0
167
168 - def update(self, processProxy):
169 self._config = processProxy
170
171 - def __str__(self):
172 """ 173 Override the Python default to represent ourselves as a string 174 """ 175 return str(self._config.name)
176 __repr__ = __str__ 177
178 - def match(self, name, args):
179 """ 180 Perform exact comparisons on the process names. 181 182 @parameter name: name of a process to compare 183 @type name: string 184 @parameter args: argument list of the process 185 @type args: string 186 @return: does the name match this process's info? 187 @rtype: Boolean 188 """ 189 if self._config.name is None: 190 return False 191 if self._config.ignoreParameters or not args: 192 return self._config.originalName == name 193 return self._config.originalName == '%s %s' % (name, args)
194
195 - def updateCpu(self, pid, value):
196 """ 197 """ 198 pid = self._pids.setdefault(pid, Pid()) 199 cpu = pid.updateCpu(value) 200 if cpu is not None: 201 self.cpu += cpu 202 self.cpu %= WRAP
203
204 - def getCpu(self):
205 """ 206 """ 207 return self.cpu
208
209 - def updateMemory(self, pid, value):
210 """ 211 """ 212 self._pids.setdefault(pid, Pid()).memory = value
213
214 - def getMemory(self):
215 """ 216 """ 217 return sum([x.memory for x in self._pids.values() 218 if x.memory is not None])
219
220 - def discardPid(self, pid):
221 """ 222 """ 223 if pid in self._pids: 224 del self._pids[pid]
225
226 -class Pid:
227 """ 228 Helper class to track process id information 229 """ 230 cpu = None 231 memory = None 232
233 - def updateCpu(self, n):
234 """ 235 """ 236 if n is not None: 237 try: 238 n = int(n) 239 except ValueError: 240 log.warning("Bad value for CPU: '%s'", n) 241 242 if self.cpu is None or n is None: 243 self.cpu = n 244 return None 245 diff = n - self.cpu 246 if diff < 0: 247 # don't provide a value when the counter falls backwards 248 n = None 249 diff = None 250 self.cpu = n 251 return diff
252
253 - def updateMemory(self, n):
254 """ 255 """ 256 self.memory = n
257
258 - def __str__(self):
259 """ 260 Override the Python default to represent ourselves as a string 261 """ 262 return '<Pid> memory: %s cpu: %s' % (self.memory, self.cpu)
263 __repr__ = __str__
264
265 -class ConfigListener(object):
266 zope.interface.implements(IConfigurationListener) 267
268 - def deleted(self, configurationId):
269 """ 270 Called when a configuration is deleted from the collector 271 """ 272 log.debug('ConfigListener: configuration %s deleted' % configurationId) 273 ZenProcessTask.DEVICE_STATS.pop(configurationId, None)
274
275 - def added(self, configuration):
276 """ 277 Called when a configuration is added to the collector 278 """ 279 log.debug('ConfigListener: configuration %s added' % configuration)
280 281
282 - def updated(self, newConfiguration):
283 """ 284 Called when a configuration is updated in collector 285 """ 286 log.debug('ConfigListener: configuration %s updated' % newConfiguration)
287 288 # Create an implementation of the IScheduledTask interface that will perform 289 # the actual collection work needed by this collector.
290 -class ZenProcessTask(ObservableMixin):
291 """ 292 A scheduled task that finds instances of configure processes and collects 293 metrics on the processes 294 """ 295 zope.interface.implements(IScheduledTask) 296 297 #Keep state about process stats across task updates 298 DEVICE_STATS = {} 299 300 #counter to keep track of total restarted and missing processes 301 RESTARTED = 0 302 MISSING = 0 303 304 STATE_CONNECTING = 'CONNECTING' 305 STATE_SCANNING_PROCS = 'SCANNING_PROCESSES' 306 STATE_FETCH_PERF = 'FETCH_PERF_DATA' 307 STATE_STORE_PERF = 'STORE_PERF_DATA' 308 309 statusEvent = { 'eventClass' : Status_OSProcess, 310 'eventGroup' : 'Process' } 311
312 - def __init__(self, 313 deviceId, 314 taskName, 315 scheduleIntervalSeconds, 316 taskConfig):
317 super(ZenProcessTask, self).__init__() 318 319 #needed for interface 320 self.name = taskName 321 self.configId = deviceId 322 self.interval = scheduleIntervalSeconds 323 self.state = TaskStates.STATE_IDLE 324 325 #the task config corresponds to a DeviceProxy 326 self._device = taskConfig 327 self._devId = self._device.name 328 self._manageIp = self._device.manageIp 329 self._maxOidsPerRequest = self._device.zMaxOIDPerRequest 330 331 self._dataService = zope.component.queryUtility(IDataService) 332 self._eventService = zope.component.queryUtility(IEventService) 333 self._preferences = zope.component.queryUtility(ICollectorPreferences, 334 "zenprocess") 335 self.snmpProxy = None 336 self.snmpConnInfo = self._device.snmpConnInfo 337 338 self._deviceStats = ZenProcessTask.DEVICE_STATS.get(self._devId) 339 if self._deviceStats: 340 self._deviceStats.update(self._device) 341 else: 342 self._deviceStats = DeviceStats(self._device) 343 ZenProcessTask.DEVICE_STATS[self._devId] = self._deviceStats
344
345 - def _failure(self, reason):
346 """ 347 Twisted errBack to log the exception for a single device. 348 349 @parameter reason: explanation of the failure 350 @type reason: Twisted error instance 351 """ 352 msg = 'Unable to read processes on device %s' % self._devId 353 if isinstance(reason.value, error.TimeoutError): 354 log.debug('Timeout on device %s' % self._devId) 355 msg = '%s; Timeout on device' % msg 356 else: 357 msg = '%s; error: %s' % (msg, reason.getErrorMessage()) 358 log.error('Error on device %s; %s' % (self._devId, 359 reason.getErrorMessage())) 360 361 self._eventService.sendEvent(self.statusEvent, 362 eventClass=Status_Snmp, 363 component="process", 364 device=self._devId, 365 summary=msg, 366 severity=Event.Error) 367 return reason
368 369
370 - def _connectCallback(self, result):
371 """ 372 Callback called after a successful connect to the remote device. 373 """ 374 log.debug("Connected to %s [%s]", self._devId, self._manageIp) 375 return result
376
377 - def _collectCallback(self, result):
378 """ 379 Callback called after a connect or previous collection so that another 380 collection can take place. 381 """ 382 log.debug("scanning for processes from %s [%s]", 383 self._devId, self._manageIp) 384 385 self.state = ZenProcessTask.STATE_SCANNING_PROCS 386 tables = [NAMETABLE, PATHTABLE, ARGSTABLE] 387 d = self._getTables(tables) 388 d.addCallbacks(self._storeProcessNames, self._failure) 389 d.addCallback(self._fetchPerf) 390 return d
391
392 - def _finished(self, result):
393 """ 394 Callback activated when the task is complete 395 """ 396 if not isinstance(result, Failure): 397 log.debug("Device %s [%s] scanned successfully", 398 self._devId, self._manageIp) 399 else: 400 log.debug("Device %s [%s] scanned failed, %s", 401 self._devId, self._manageIp, result.getErrorMessage()) 402 403 try: 404 self._close() 405 except Exception, e: 406 log.warn("Failed to close device %s: error %s" % 407 (self._devId, str(e))) 408 # give the result to the rest of the callback/errchain so that the 409 # ZenCollector framework can keep track of the success/failure rate 410 return result
411
412 - def cleanup(self):
413 return self._close()
414
415 - def doTask(self):
416 """ 417 Contact to one device and return a deferred which gathers data from 418 the device. 419 420 @return: job to scan a device 421 @rtype: Twisted deferred object 422 """ 423 424 # see if we need to connect first before doing any collection 425 d = defer.maybeDeferred(self._connect) 426 d.addCallbacks(self._connectCallback, self._failure) 427 d.addCallback(self._collectCallback) 428 429 # Add the _finished callback to be called in both success and error 430 # scenarios. 431 d.addBoth(self._finished) 432 433 # returning a Deferred will keep the framework from assuming the task 434 # is done until the Deferred actually completes 435 return d
436
437 - def _storeProcessNames(self, results):
438 """ 439 Parse the process tables and reconstruct the list of processes 440 that are on the device. 441 442 @parameter results: results of SNMP table gets 443 @type results: dictionary of dictionaries 444 @parameter device: proxy connection object 445 @type device: Device object 446 """ 447 448 if not results or not results[NAMETABLE]: 449 summary = 'Device %s does not publish HOST-RESOURCES-MIB' % \ 450 self._devId 451 resolution="Verify with snmpwalk -v1 -c community %s %s" % \ 452 (self._devId, NAMETABLE ) 453 454 self._eventService.sendEvent(self.statusEvent, 455 device=self._devId, 456 summary=summary, 457 resolution=resolution, 458 severity=Event.Error) 459 log.info(summary) 460 return defer.fail(summary) 461 462 summary = 'Process table up for device %s' % self._devId 463 self._clearSnmpError(summary) 464 465 showrawtables = self._preferences.options.showrawtables 466 args, procs = mapResultsToDicts(showrawtables, results) 467 if self._preferences.options.showprocs: 468 self._showProcessList( procs ) 469 470 # look for changes in processes 471 beforePids = set(self._deviceStats.pids) 472 afterPidToProcessStats = {} 473 for pStats in self._deviceStats.processStats: 474 for pid, (name, args) in procs: 475 if pStats.match(name, args): 476 log.debug("Found process %d on %s" % (pid, 477 pStats._config.name)) 478 afterPidToProcessStats[pid] = pStats 479 afterPids = set(afterPidToProcessStats.keys()) 480 afterByConfig = reverseDict(afterPidToProcessStats) 481 newPids = afterPids - beforePids 482 deadPids = beforePids - afterPids 483 484 # report pid restarts 485 restarted = {} 486 for pid in deadPids: 487 procStats = self._deviceStats._pidToProcess[pid] 488 procStats.discardPid(pid) 489 if procStats in afterByConfig: 490 ZenProcessTask.RESTARTED += 1 491 pConfig = procStats._config 492 if pConfig.restart: 493 restarted[procStats] = True 494 495 summary = 'Process restarted: %s' % pConfig.originalName 496 497 self._eventService.sendEvent(self.statusEvent, 498 device=self._devId, 499 summary=summary, 500 component=pConfig.originalName, 501 severity=pConfig.severity) 502 log.info(summary) 503 504 # report alive processes 505 for processStat in afterByConfig.keys(): 506 if processStat in restarted: continue 507 summary = "Process up: %s" % processStat._config.originalName 508 self._eventService.sendEvent(self.statusEvent, 509 device=self._devId, 510 summary=summary, 511 component=processStat._config.originalName, 512 severity=Event.Clear) 513 log.debug(summary) 514 515 for pid in newPids: 516 log.debug("Found new %s pid %d on %s" % ( 517 afterPidToProcessStats[pid]._config.originalName, pid, 518 self._devId)) 519 520 self._deviceStats._pidToProcess = afterPidToProcessStats 521 522 # Look for missing processes 523 for procStat in self._deviceStats.processStats: 524 if procStat not in afterByConfig: 525 procConfig = procStat._config 526 ZenProcessTask.MISSING += 1 527 summary = 'Process not running: %s' % procConfig.originalName 528 self._eventService.sendEvent(self.statusEvent, 529 device=self._devId, 530 summary=summary, 531 component=procConfig.originalName, 532 severity=procConfig.severity) 533 log.warning(summary) 534 535 # Store per-device, per-process statistics 536 pidCounts = dict([(p, 0) for p in self._deviceStats.processStats]) 537 for procStat in self._deviceStats.monitoredProcs: 538 pidCounts[procStat] += 1 539 for procName, count in pidCounts.items(): 540 self._save(procName, 'count_count', count, 'GAUGE') 541 return results
542
543 - def _fetchPerf(self, results):
544 """ 545 Get performance data for all the monitored processes on a device 546 547 @parameter device: proxy object to the remote computer 548 @type device: Device object 549 """ 550 self.state = ZenProcessTask.STATE_FETCH_PERF 551 552 oids = [] 553 for pid in self._deviceStats.pids: 554 oids.extend([CPU + str(pid), MEM + str(pid)]) 555 if not oids: 556 return defer.succeed(([])) 557 d = Chain(self._get, iter(chunk(oids, self._maxOidsPerRequest))).run() 558 d.addCallback(self._storePerfStats) 559 d.addErrback(self._failure) 560 return d
561
562 - def _storePerfStats(self, results):
563 """ 564 Save the process performance data in RRD files 565 566 @parameter results: results of SNMP table gets 567 @type results: list of (success, result) tuples 568 @parameter device: proxy object to the remote computer 569 @type device: Device object 570 """ 571 self.state = ZenProcessTask.STATE_STORE_PERF 572 for success, result in results: 573 if not success: 574 #return the failure 575 return result 576 self._clearSnmpError('Process table up for device %s' % self._devId) 577 parts = {} 578 for success, values in results: 579 if success: 580 parts.update(values) 581 results = parts 582 byConf = reverseDict(self._deviceStats._pidToProcess) 583 for procStat, pids in byConf.items(): 584 if len(pids) != 1: 585 log.info("There are %d pids by the name %s", 586 len(pids), procStat._config.name) 587 procName = procStat._config.name 588 for pid in pids: 589 cpu = results.get(CPU + str(pid), None) 590 mem = results.get(MEM + str(pid), None) 591 procStat.updateCpu(pid, cpu) 592 procStat.updateMemory(pid, mem) 593 self._save(procName, 'cpu_cpu', procStat.getCpu(), 594 'DERIVE', min=0) 595 self._save(procName, 'mem_mem', 596 procStat.getMemory() * 1024, 'GAUGE') 597 return results
598
599 - def _getTables(self, oids):
600 """ 601 Perform SNMP getTable for specified OIDs 602 @parameter oids: OIDs to gather 603 @type oids: list of strings 604 @return: Twisted deferred 605 @rtype: Twisted deferred 606 """ 607 repetitions = self._maxOidsPerRequest / len(oids) 608 t = self.snmpProxy.getTable(oids, 609 timeout=self.snmpConnInfo.zSnmpTimeout, 610 retryCount=self.snmpConnInfo.zSnmpTries, 611 maxRepetitions=repetitions, 612 limit=sys.maxint) 613 return t
614
615 - def _get(self, oids):
616 """ 617 Perform SNMP get for specified OIDs 618 619 @parameter oids: OIDs to gather 620 @type oids: list of strings 621 @return: Twisted deferred 622 @rtype: Twisted deferred 623 """ 624 return self.snmpProxy.get(oids, 625 self.snmpConnInfo.zSnmpTimeout, 626 self.snmpConnInfo.zSnmpTries)
627
628 - def _connect(self):
629 """ 630 Create a connection to the remote device 631 """ 632 self.state = ZenProcessTask.STATE_CONNECTING 633 if (self.snmpProxy is None or 634 self.snmpProxy.snmpConnInfo != self.snmpConnInfo): 635 self.snmpProxy = self.snmpConnInfo.createSession() 636 self.snmpProxy.open()
637
638 - def _close(self):
639 """ 640 Close down the connection to the remote device 641 """ 642 if self.snmpProxy: 643 self.snmpProxy.close() 644 self.snmpProxy = None
645 646
647 - def _showProcessList(self, procs):
648 """ 649 Display the processes in a sane manner. 650 651 @parameter procs: list of (pid, (name, args)) 652 @type procs: list of tuples 653 """ 654 device_name = self._devId 655 proc_list = [ '%s %s %s' % (pid, name, args) for pid, (name, args) \ 656 in sorted(procs)] 657 proc_list.append('') 658 log.info("#===== Processes on %s:\n%s", device_name, '\n'.join(proc_list))
659
660 - def _clearSnmpError(self, message):
661 """ 662 Send an event to clear other events. 663 664 @parameter name: device for which the event applies 665 @type name: string 666 @parameter message: clear text 667 @type message: string 668 """ 669 self._eventService.sendEvent(self.statusEvent, 670 eventClass=Status_Snmp, 671 component="process", 672 device=self._devId, 673 summary=message, 674 agent='zenprocess', 675 severity=Event.Clear)
676
677 - def _save(self, pidName, statName, value, rrdType, min='U'):
678 """ 679 Save a value into an RRD file 680 681 @param pidName: process id of the monitored process 682 @type pidName: string 683 @param statName: metric name 684 @type statName: string 685 @param value: data to be stored 686 @type value: number 687 @param rrdType: RRD data type (eg ABSOLUTE, DERIVE, COUNTER) 688 @type rrdType: string 689 """ 690 deviceName = self._devId 691 path = 'Devices/%s/os/processes/%s/%s' % (deviceName, pidName, statName) 692 try: 693 self._dataService.writeRRD(path, value, rrdType, min=min) 694 except Exception, ex: 695 summary= "Unable to save data for process-monitor RRD %s" % \ 696 path 697 log.critical( summary ) 698 699 message= "Data was value= %s, type=%s" % \ 700 ( value, rrdType ) 701 log.critical( message ) 702 log.exception( ex ) 703 704 import traceback 705 trace_info= traceback.format_exc() 706 707 self._eventService.sendEvent(dict( 708 dedupid="%s|%s" % (self._preferences.options.monitor, 709 'RRD write failure'), 710 severity=Event.Critical, 711 device=self._preferences.options.monitor, 712 eventClass=Status_Perf, 713 component="RRD", 714 pidName=pidName, 715 statName=statName, 716 path=path, 717 message=message, 718 traceback=trace_info, 719 summary=summary))
720
721 -def mapResultsToDicts(showrawtables, results):
722 """ 723 Parse the process tables and reconstruct the list of processes 724 that are on the device. 725 726 @parameter results: results of SNMP table gets ie (OID + pid, value) 727 @type results: dictionary of dictionaries 728 @return: maps relating names and pids to each other 729 @rtype: dictionary, list of tuples 730 """ 731 def extract(dictionary, oid, value): 732 """ 733 Helper function to extract SNMP table data. 734 """ 735 pid = int(oid.split('.')[-1]) 736 dictionary[pid] = value
737 738 names, paths, args = {}, {}, {} 739 if showrawtables: 740 log.info("NAMETABLE = %r", results[NAMETABLE]) 741 for row in results[NAMETABLE].items(): 742 extract(names, *row) 743 744 if showrawtables: 745 log.info("PATHTABLE = %r", results[PATHTABLE]) 746 for row in results[PATHTABLE].items(): 747 extract(paths, *row) 748 749 if showrawtables: 750 log.info("ARGSTABLE = %r", results[ARGSTABLE]) 751 for row in results[ARGSTABLE].items(): 752 extract(args, *row) 753 754 procs = [] 755 for pid, name in names.items(): 756 path = paths.get(pid, '') 757 if path and path.find('\\') == -1: 758 name = path 759 arg = args.get(pid, '') 760 procs.append( (pid, (name, arg) ) ) 761 762 return args, procs 763
764 -def reverseDict(d):
765 """ 766 Return a dictionary with keys and values swapped: 767 all values are lists to handle the different keys mapping to the same value 768 """ 769 result = {} 770 for a, v in d.items(): 771 result.setdefault(v, []).append(a) 772 return result
773
774 -def chunk(lst, n):
775 """ 776 Break lst into n-sized chunks 777 """ 778 return [lst[i:i+n] for i in range(0, len(lst), n)]
779 780 # Collector Daemon Main entry point 781 # 782 if __name__ == '__main__': 783 myPreferences = ZenProcessPreferences() 784 785 myTaskFactory = SimpleTaskFactory(ZenProcessTask) 786 myTaskSplitter = SimpleTaskSplitter(myTaskFactory) 787 daemon = CollectorDaemon(myPreferences, myTaskSplitter, ConfigListener()) 788 daemon.run() 789