Package Products :: Package ZenHub :: Module PBDaemon
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenHub.PBDaemon

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2007, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  __doc__ = """PBDaemon 
 12   
 13  Base for daemons that connect to zenhub 
 14   
 15  """ 
 16   
 17  import cPickle as pickle 
 18  import collections 
 19  import sys 
 20  import time 
 21  import traceback 
 22   
 23  import Globals 
 24   
 25  from Products.ZenUtils.ZenDaemon import ZenDaemon 
 26  from Products.ZenEvents.ZenEventClasses import Heartbeat 
 27  from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory 
 28  from Products.ZenUtils.DaemonStats import DaemonStats 
 29  from Products.ZenUtils.Utils import zenPath, atomicWrite 
 30  from Products.ZenUtils.Driver import drive 
 31  from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \ 
 32                                                  Clear, Warning 
 33   
 34  from twisted.cred import credentials 
 35  from twisted.internet import reactor, defer 
 36  from twisted.internet.error import ConnectionLost, ReactorNotRunning 
 37  from twisted.spread import pb 
 38  from twisted.python.failure import Failure 
 39  import twisted.python.log 
 40   
 41  from ZODB.POSException import ConflictError 
42 43 -class RemoteException(Exception, pb.Copyable, pb.RemoteCopy):
44 "Exception that can cross the PB barrier"
45 - def __init__(self, msg, tb):
46 Exception.__init__(self, msg) 47 self.traceback = tb
48 - def __str__(self):
49 return Exception.__str__(self) + self.traceback
50 51 pb.setUnjellyableForClass(RemoteException, RemoteException)
52 53 # ZODB conflicts 54 -class RemoteConflictError(RemoteException): pass
55 pb.setUnjellyableForClass(RemoteConflictError, RemoteConflictError)
56 57 # Invalid monitor specified 58 -class RemoteBadMonitor(RemoteException): pass
59
60 -def translateError(callable):
61 """ 62 Decorator function to wrap remote exceptions into something 63 understandable by our daemon. 64 65 @parameter callable: function to wrap 66 @type callable: function 67 @return: function's return or an exception 68 @rtype: various 69 """ 70 def inner(*args, **kw): 71 """ 72 Interior decorator 73 """ 74 try: 75 return callable(*args, **kw) 76 except ConflictError, ex: 77 raise RemoteConflictError( 78 'Remote exception: %s: %s' % (ex.__class__, ex), 79 traceback.format_exc()) 80 except Exception, ex: 81 raise RemoteException( 82 'Remote exception: %s: %s' % (ex.__class__, ex), 83 traceback.format_exc())
84 return inner 85 86 87 PB_PORT = 8789 88 89 startEvent = { 90 'eventClass': App_Start, 91 'summary': 'started', 92 'severity': Clear, 93 } 94 95 stopEvent = { 96 'eventClass':App_Stop, 97 'summary': 'stopped', 98 'severity': Warning, 99 } 100 101 102 DEFAULT_HUB_HOST = 'localhost' 103 DEFAULT_HUB_PORT = PB_PORT 104 DEFAULT_HUB_USERNAME = 'admin' 105 DEFAULT_HUB_PASSWORD = 'zenoss' 106 DEFAULT_HUB_MONITOR = 'localhost'
107 108 -class HubDown(Exception): pass
109
110 -class FakeRemote:
111 - def callRemote(self, *unused):
112 ex = HubDown("ZenHub is down") 113 return defer.fail(ex)
114
115 -class PBDaemon(ZenDaemon, pb.Referenceable):
116 117 name = 'pbdaemon' 118 initialServices = ['EventService'] 119 heartbeatEvent = {'eventClass':Heartbeat} 120 heartbeatTimeout = 60*3 121 _customexitcode = 0 122 _sendingEvents = False 123
124 - def __init__(self, noopts=0, keeproot=False, name=None):
125 # if we were provided our collector name via the constructor instead of 126 # via code, be sure to store it correctly. 127 if name is not None: 128 self.name = name 129 self.mname = name 130 131 try: 132 ZenDaemon.__init__(self, noopts, keeproot) 133 134 except IOError: 135 import traceback 136 self.log.critical( traceback.format_exc( 0 ) ) 137 sys.exit(1) 138 139 self.rrdStats = DaemonStats() 140 self.lastStats = 0 141 self.perspective = None 142 self.services = {} 143 self.eventQueue = [] 144 self.startEvent = startEvent.copy() 145 self.stopEvent = stopEvent.copy() 146 details = dict(component=self.name, device=self.options.monitor) 147 for evt in self.startEvent, self.stopEvent, self.heartbeatEvent: 148 evt.update(details) 149 self.initialConnect = defer.Deferred() 150 self.stopped = False 151 self._eventStatus = {} 152 self._eventStatusCount = collections.defaultdict(int) 153 self.counters = collections.Counter() 154 self.loadCounters() 155 self._heartbeatEvent = None 156 self._performanceEventsQueue = None 157 self._pingedZenhub = None
158
159 - def connecting(self):
160 """ 161 Called when about to connect to zenhub 162 """ 163 self.log.info("Attempting to connect to zenhub")
164
165 - def gotPerspective(self, perspective):
166 """ 167 This gets called every time we reconnect. 168 169 @parameter perspective: Twisted perspective object 170 @type perspective: Twisted perspective object 171 """ 172 self.log.info("Connected to ZenHub") 173 self.perspective = perspective 174 d2 = self.getInitialServices() 175 if self.initialConnect: 176 self.log.debug('Chaining getInitialServices with d2') 177 self.initialConnect, d = None, self.initialConnect 178 d2.chainDeferred(d)
179 180
181 - def connect(self):
182 pingInterval = self.options.zhPingInterval 183 factory = ReconnectingPBClientFactory(connectTimeout=60, pingPerspective=True, 184 pingInterval=pingInterval, pingtimeout=pingInterval * 5) 185 self.log.info("Connecting to %s:%d" % (self.options.hubhost, 186 self.options.hubport)) 187 factory.connectTCP(self.options.hubhost, self.options.hubport) 188 username = self.options.hubusername 189 password = self.options.hubpassword 190 self.log.debug("Logging in as %s" % username) 191 c = credentials.UsernamePassword(username, password) 192 factory.gotPerspective = self.gotPerspective 193 factory.connecting = self.connecting 194 factory.startLogin(c) 195 def timeout(d): 196 if not d.called: 197 self.connectTimeout()
198 reactor.callLater(self.options.hubtimeout, timeout, self.initialConnect) 199 return self.initialConnect
200
201 - def connectTimeout(self):
202 self.log.error('Timeout connecting to zenhub: is it running?') 203 pass
204
205 - def eventService(self):
206 return self.getServiceNow('EventService')
207 208
209 - def getServiceNow(self, svcName):
210 if not svcName in self.services: 211 self.log.warning('No service named %r: ZenHub may be disconnected' % svcName) 212 return self.services.get(svcName, None) or FakeRemote()
213 214
215 - def getService(self, serviceName, serviceListeningInterface=None):
216 """ 217 Attempt to get a service from zenhub. Returns a deferred. 218 When service is retrieved it is stashed in self.services with 219 serviceName as the key. When getService is called it will first 220 check self.services and if serviceName is already there it will return 221 the entry from self.services wrapped in a defer.succeed 222 """ 223 if serviceName in self.services: 224 return defer.succeed(self.services[serviceName]) 225 226 def removeService(ignored): 227 self.log.debug('Removing service %s' % serviceName) 228 if serviceName in self.services: 229 del self.services[serviceName]
230 231 def callback(result, serviceName): 232 self.log.debug('Loaded service %s from zenhub' % serviceName) 233 self.services[serviceName] = result 234 result.notifyOnDisconnect(removeService) 235 return result 236 237 def errback(error, serviceName): 238 self.log.debug('errback after getting service %s' % serviceName) 239 self.log.error('Could not retrieve service %s' % serviceName) 240 if serviceName in self.services: 241 del self.services[serviceName] 242 return error 243 244 d = self.perspective.callRemote('getService', 245 serviceName, 246 self.options.monitor, 247 serviceListeningInterface or self) 248 d.addCallback(callback, serviceName) 249 d.addErrback(errback, serviceName) 250 return d 251
252 - def getInitialServices(self):
253 """ 254 After connecting to zenhub, gather our initial list of services. 255 """ 256 def errback(error): 257 if isinstance(error, Failure): 258 self.log.critical( "Invalid monitor: %s" % self.options.monitor) 259 reactor.stop() 260 return defer.fail(RemoteBadMonitor( 261 "Invalid monitor: %s" % self.options.monitor, '')) 262 return error
263 264 self.log.debug('Setting up initial services: %s' % \ 265 ', '.join(self.initialServices)) 266 d = defer.DeferredList( 267 [self.getService(name) for name in self.initialServices], 268 fireOnOneErrback=True, consumeErrors=True) 269 d.addErrback(errback) 270 return d 271 272
273 - def connected(self):
274 pass
275
276 - def run(self):
277 self.rrdStats.config(self.options.monitor, self.name, []) 278 self.log.debug('Starting PBDaemon initialization') 279 d = self.connect() 280 def callback(result): 281 self.sendEvent(self.startEvent) 282 self.pushEventsLoop() 283 self.log.debug('Calling connected.') 284 self.connected() 285 return result
286 d.addCallback(callback) 287 d.addErrback(twisted.python.log.err) 288 reactor.run() 289 if self._customexitcode: 290 sys.exit(self._customexitcode) 291
292 - def sigTerm(self, signum=None, frame=None):
293 try: 294 ZenDaemon.sigTerm(self, signum, frame) 295 except SystemExit: 296 pass
297
298 - def setExitCode(self, exitcode):
299 self._customexitcode = exitcode
300
301 - def stop(self, ignored=''):
302 def stopNow(ignored): 303 if reactor.running: 304 try: 305 self.saveCounters() 306 reactor.stop() 307 except ReactorNotRunning: 308 self.log.debug("Tried to stop reactor that was stopped")
309 if reactor.running and not self.stopped: 310 self.stopped = True 311 if 'EventService' in self.services: 312 # send stop event if we don't have an implied --cycle, 313 # or if --cycle has been specified 314 if not hasattr(self.options, 'cycle') or \ 315 getattr(self.options, 'cycle', True): 316 self.sendEvent(self.stopEvent) 317 # give the reactor some time to send the shutdown event 318 drive(self.pushEvents).addBoth(stopNow) 319 self.log.debug( "Sent a 'stop' event" ) 320 else: 321 self.log.debug( "No event sent as no EventService available." ) 322 # but not too much time 323 reactor.callLater(1, stopNow, True) # requires bogus arg 324 else: 325 self.log.debug( "stop() called when not running" ) 326
327 - def sendEvents(self, events):
328 map(self.sendEvent, events)
329
330 - def sendEvent(self, event, **kw):
331 ''' Add event to queue of events to be sent. If we have an event 332 service then process the queue. 333 ''' 334 generatedEvent = self.generateEvent(event, **kw) 335 if generatedEvent: 336 self.eventQueue.append(generatedEvent) 337 self.counters['eventCount'] += 1 338 self.log.debug("Queued event (total of %d) %r", 339 len(self.eventQueue), 340 event) 341 342 # keep the queue in check, but don't trim it all the time 343 self._trimEventQueue(maxOver=self.options.eventflushchunksize)
344
345 - def generateEvent(self, event, **kw):
346 ''' Add event to queue of events to be sent. If we have an event 347 service then process the queue. 348 ''' 349 if not reactor.running: return 350 event = event.copy() 351 event['agent'] = self.name 352 event['monitor'] = self.options.monitor 353 event['manager'] = self.fqdn 354 event.update(kw) 355 if not self.options.allowduplicateclears or self.options.duplicateclearinterval > 0: 356 statusKey = ( event['device'], 357 event.get('component', ''), 358 event.get('eventKey', ''), 359 event.get('eventClass', '') ) 360 severity = event.get('severity', -1) 361 status = self._eventStatus.get(statusKey, -1) 362 if severity != -1: 363 if severity != status: 364 self._eventStatusCount[statusKey] = 0 365 else: 366 self._eventStatusCount[statusKey] += 1 367 self._eventStatus[statusKey] = severity 368 if severity == Clear and status == Clear: 369 if not self.options.allowduplicateclears: 370 self.log.debug("allowduplicateclears dropping useless clear event %r", event) 371 return 372 if self.options.duplicateclearinterval > 0 \ 373 and self._eventStatusCount[statusKey] % self.options.duplicateclearinterval != 0: 374 self.log.debug("duplicateclearinterval dropping useless clear event %r", event) 375 return 376 return event
377
378 - def _trimEventQueue(self, maxOver=0):
379 queueLen = len(self.eventQueue) 380 if queueLen > (self.options.maxqueuelen + maxOver): 381 diff = queueLen - self.options.maxqueuelen 382 self.log.error( 383 'Discarding oldest %d events because maxqueuelen was ' 384 'exceeded: %d/%d', 385 queueLen - self.options.maxqueuelen, 386 queueLen, self.options.maxqueuelen) 387 self.counters['discardedEvents'] += diff 388 self.eventQueue = self.eventQueue[diff:]
389 390 @property
391 - def _performanceEvents(self):
392 if self._performanceEventsQueue is None: 393 self._performanceEventsQueue = collections.deque(maxlen=self.options.maxqueuelen) 394 return self._performanceEventsQueue
395
396 - def _getPerformanceEventsChunk(self):
397 events = [] 398 for i in xrange(0, min(len(self._performanceEvents), self.options.eventflushchunksize)): 399 events.append(self._performanceEvents.pop()) 400 return events
401
402 - def pushEventsLoop(self):
403 """Periodially, wake up and flush events to ZenHub. 404 """ 405 reactor.callLater(self.options.eventflushseconds, self.pushEventsLoop) 406 drive(self.pushEvents) 407 408 # Record the number of events in the queue every 5 minutes. 409 now = time.time() 410 if self.rrdStats.name and now >= (self.lastStats + 300): 411 self.lastStats = now 412 events = self.rrdStats.gauge('eventQueueLength', 413 300, len(self.eventQueue)) 414 self._performanceEvents.extendleft(events)
415
416 - def pushEvents(self, driver):
417 """Flush events to ZenHub. 418 """ 419 # are we already shutting down? 420 if not reactor.running: 421 return 422 if self._sendingEvents: 423 return 424 try: 425 # try to send everything we have, serially 426 self._sendingEvents = True 427 while len(self.eventQueue) or self._heartbeatEvent or len(self._performanceEvents): 428 429 # are still connected to ZenHub? 430 evtSvc = self.services.get('EventService', None) 431 if not evtSvc: 432 self.log.error("No event service: %r", evtSvc) 433 break 434 # send the events in large bundles, carefully reducing 435 # the eventQueue in case we get in here more than once 436 chunkSize = self.options.eventflushchunksize 437 events = self.eventQueue[:chunkSize] 438 self.eventQueue = self.eventQueue[chunkSize:] 439 440 performanceEvents = self._getPerformanceEventsChunk() 441 442 # send the events and wait for the response 443 heartBeat = [self._heartbeatEvent] if self._heartbeatEvent else [] 444 445 self.log.debug("Sending %d events, %d perfevents, %d heartbeats.", len(events), len(performanceEvents), len(heartBeat)) 446 yield evtSvc.callRemote('sendEvents', events + heartBeat + performanceEvents) 447 try: 448 driver.next() 449 performanceEvents = [] 450 events = [] 451 except ConnectionLost, ex: 452 self.log.error('Error sending event: %s' % ex) 453 self.eventQueue = events + self.eventQueue 454 performanceEvents.reverse() 455 self._performanceEvents.extend(performanceEvents) 456 break 457 self.log.debug("Events sent") 458 self._heartbeatEvent = None 459 except Exception, ex: 460 self.log.exception(ex) 461 finally: 462 self._sendingEvents = False
463
464 - def heartbeat(self):
465 'if cycling, send a heartbeat, else, shutdown' 466 if not self.options.cycle: 467 self.stop() 468 return 469 self._heartbeatEvent = self.generateEvent(self.heartbeatEvent, timeout=self.heartbeatTimeout) 470 # heartbeat is normally 3x cycle time 471 self.niceDoggie(self.heartbeatTimeout / 3) 472 473 events = [] 474 # save daemon counter stats 475 for name, value in self.counters.items(): 476 self.log.info("Counter %s, value %d", name, value) 477 events += self.rrdStats.counter(name, 300, value) 478 self.sendEvents(events) 479 480 # persist counters values 481 self.saveCounters()
482
483 - def saveCounters(self):
484 atomicWrite( 485 zenPath('var/%s_counters.pickle' % self.name), 486 pickle.dumps(self.counters), 487 raiseException=False, 488 )
489
490 - def loadCounters(self):
491 try: 492 self.counters = pickle.load(open(zenPath('var/%s_counters.pickle'% self.name))) 493 except Exception: 494 pass
495
496 - def remote_getName(self):
497 return self.name
498 499
500 - def remote_shutdown(self, unused):
501 self.stop() 502 self.sigTerm()
503 504
505 - def remote_setPropertyItems(self, items):
506 pass
507 508 509 @translateError
510 - def remote_updateThresholdClasses(self, classes):
511 from Products.ZenUtils.Utils import importClass 512 self.log.debug("Loading classes %s", classes) 513 for c in classes: 514 try: 515 importClass(c) 516 except ImportError: 517 self.log.error("Unable to import class %s", c)
518 519
520 - def buildOptions(self):
521 self.parser.add_option('--hubhost', 522 dest='hubhost', 523 default=DEFAULT_HUB_HOST, 524 help='Host of zenhub daemon.' 525 ' Default is %s.' % DEFAULT_HUB_HOST) 526 self.parser.add_option('--hubport', 527 dest='hubport', 528 type='int', 529 default=DEFAULT_HUB_PORT, 530 help='Port zenhub listens on.' 531 'Default is %s.' % DEFAULT_HUB_PORT) 532 self.parser.add_option('--hubusername', 533 dest='hubusername', 534 default=DEFAULT_HUB_USERNAME, 535 help='Username for zenhub login.' 536 ' Default is %s.' % DEFAULT_HUB_USERNAME) 537 self.parser.add_option('--hubpassword', 538 dest='hubpassword', 539 default=DEFAULT_HUB_PASSWORD, 540 help='Password for zenhub login.' 541 ' Default is %s.' % DEFAULT_HUB_PASSWORD) 542 self.parser.add_option('--monitor', 543 dest='monitor', 544 default=DEFAULT_HUB_MONITOR, 545 help='Name of monitor instance to use for' 546 ' configuration. Default is %s.' 547 % DEFAULT_HUB_MONITOR) 548 self.parser.add_option('--initialHubTimeout', 549 dest='hubtimeout', 550 type='int', 551 default=30, 552 help='Initial time to wait for a ZenHub ' 553 'connection') 554 self.parser.add_option('--allowduplicateclears', 555 dest='allowduplicateclears', 556 default=False, 557 action='store_true', 558 help='Send clear events even when the most ' 559 'recent event was also a clear event.') 560 561 self.parser.add_option('--duplicateclearinterval', 562 dest='duplicateclearinterval', 563 default=0, 564 type='int', 565 help=('Send a clear event every [DUPLICATECLEARINTEVAL] ' 566 'events.') 567 ) 568 569 self.parser.add_option('--eventflushseconds', 570 dest='eventflushseconds', 571 default=5., 572 type='float', 573 help='Seconds between attempts to flush ' 574 'events to ZenHub.') 575 576 self.parser.add_option('--eventflushchunksize', 577 dest='eventflushchunksize', 578 default=50, 579 type='int', 580 help='Number of events to send to ZenHub' 581 'at one time') 582 583 self.parser.add_option('--maxqueuelen', 584 dest='maxqueuelen', 585 default=5000, 586 type='int', 587 help='Maximum number of events to queue') 588 589 self.parser.add_option('--zenhubpinginterval', 590 dest='zhPingInterval', 591 default=30, 592 type='int', 593 help='How often to ping zenhub') 594 595 ZenDaemon.buildOptions(self)
596