Package ZenHub :: Module PBDaemon
[hide private]
[frames] | no frames]

Source Code for Module ZenHub.PBDaemon

  1  ########################################################################## 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """PBDaemon 
 15   
 16  Base for daemons that connect to zenhub 
 17   
 18  """ 
 19   
 20  import sys 
 21  import traceback 
 22   
 23  import Globals 
 24   
 25  from Products.ZenUtils.ZenDaemon import ZenDaemon 
 26  from Products.ZenEvents.ZenEventClasses import Heartbeat 
 27  from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory 
 28  from Products.ZenUtils.DaemonStats import DaemonStats 
 29  from Products.ZenUtils.Driver import drive 
 30  from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \ 
 31                                                  Clear, Warning 
 32   
 33  from twisted.cred import credentials 
 34  from twisted.internet import reactor, defer 
 35  from twisted.internet.error import ConnectionLost 
 36  from twisted.spread import pb 
 37  from twisted.python.failure import Failure 
 38   
 39  from ZODB.POSException import ConflictError 
 40   
41 -class RemoteException(Exception, pb.Copyable, pb.RemoteCopy):
42 "Exception that can cross the PB barrier"
43 - def __init__(self, msg, tb):
44 Exception.__init__(self, msg) 45 self.traceback = tb
46 - def __str__(self):
47 return Exception.__str__(self) + self.traceback
48 49 pb.setUnjellyableForClass(RemoteException, RemoteException) 50 51 # ZODB conflicts
52 -class RemoteConflictError(RemoteException): pass
53 pb.setUnjellyableForClass(RemoteConflictError, RemoteConflictError) 54 55 # Invalid monitor specified
56 -class RemoteBadMonitor(RemoteException): pass
57
58 -def translateError(callable):
59 """ 60 Decorator function to wrap remote exceptions into something 61 understandable by our daemon. 62 63 @parameter callable: function to wrap 64 @type callable: function 65 @return: function's return or an exception 66 @rtype: various 67 """ 68 def inner(*args, **kw): 69 """ 70 Interior decorator 71 """ 72 try: 73 return callable(*args, **kw) 74 except ConflictError, ex: 75 raise RemoteConflictError( 76 'Remote exception: %s: %s' % (ex.__class__, ex), 77 traceback.format_exc()) 78 except Exception, ex: 79 raise RemoteException( 80 'Remote exception: %s: %s' % (ex.__class__, ex), 81 traceback.format_exc())
82 return inner 83 84 85 PB_PORT = 8789 86 87 startEvent = { 88 'eventClass': App_Start, 89 'summary': 'started', 90 'severity': Clear, 91 } 92 93 stopEvent = { 94 'eventClass':App_Stop, 95 'summary': 'stopped', 96 'severity': Warning, 97 } 98 99 100 DEFAULT_HUB_HOST = 'localhost' 101 DEFAULT_HUB_PORT = PB_PORT 102 DEFAULT_HUB_USERNAME = 'admin' 103 DEFAULT_HUB_PASSWORD = 'zenoss' 104 DEFAULT_HUB_MONITOR = 'localhost' 105
106 -class HubDown(Exception): pass
107
108 -class FakeRemote:
109 - def callRemote(self, *unused):
110 return defer.fail(HubDown("ZenHub is down"))
111
112 -class PBDaemon(ZenDaemon, pb.Referenceable):
113 114 name = 'pbdaemon' 115 initialServices = ['EventService'] 116 heartbeatEvent = {'eventClass':Heartbeat} 117 heartbeatTimeout = 60*3 118 _customexitcode = 0 119 _sendingEvents = False 120
121 - def __init__(self, noopts=0, keeproot=False):
122 try: 123 ZenDaemon.__init__(self, noopts, keeproot) 124 125 except IOError: 126 import traceback 127 self.log.critical( traceback.format_exc( 0 ) ) 128 sys.exit(1) 129 130 self.rrdStats = DaemonStats() 131 self.perspective = None 132 self.services = {} 133 self.eventQueue = [] 134 self.startEvent = startEvent.copy() 135 self.stopEvent = stopEvent.copy() 136 details = dict(component=self.name, device=self.options.monitor) 137 for evt in self.startEvent, self.stopEvent, self.heartbeatEvent: 138 evt.update(details) 139 self.initialConnect = defer.Deferred() 140 self.stopped = False 141 self._eventStatus = {}
142
143 - def gotPerspective(self, perspective):
144 """ 145 This gets called every time we reconnect. 146 147 @parameter perspective: Twisted perspective object 148 @type perspective: Twisted perspective object 149 """ 150 self.log.info("Connected to ZenHub") 151 self.perspective = perspective 152 d2 = self.getInitialServices() 153 if self.initialConnect: 154 self.log.debug('Chaining getInitialServices with d2') 155 self.initialConnect, d = None, self.initialConnect 156 d2.chainDeferred(d)
157 158
159 - def connect(self):
160 factory = ReconnectingPBClientFactory() 161 self.log.info("Connecting to %s:%d" % (self.options.hubhost, 162 self.options.hubport)) 163 reactor.connectTCP(self.options.hubhost, self.options.hubport, factory) 164 username = self.options.hubusername 165 password = self.options.hubpassword 166 self.log.debug("Logging in as %s" % username) 167 c = credentials.UsernamePassword(username, password) 168 factory.gotPerspective = self.gotPerspective 169 factory.startLogin(c) 170 def timeout(d): 171 if not d.called: 172 self.log.error('Timeout connecting to zenhub: is it running?')
173 reactor.callLater(self.options.hubtimeout, timeout, self.initialConnect) 174 return self.initialConnect
175 176
177 - def eventService(self):
178 return self.getServiceNow('EventService')
179 180
181 - def getServiceNow(self, svcName):
182 if not self.services.has_key(svcName): 183 self.log.warning('No service %s named: ZenHub may be disconnected' % svcName) 184 return self.services.get(svcName, None) or FakeRemote()
185 186
187 - def getService(self, serviceName, serviceListeningInterface=None):
188 """ 189 Attempt to get a service from zenhub. Returns a deferred. 190 When service is retrieved it is stashed in self.services with 191 serviceName as the key. When getService is called it will first 192 check self.services and if serviceName is already there it will return 193 the entry from self.services wrapped in a defer.succeed 194 """ 195 if self.services.has_key(serviceName): 196 return defer.succeed(self.services[serviceName]) 197 198 def removeService(ignored): 199 self.log.debug('Removing service %s' % serviceName) 200 if serviceName in self.services: 201 del self.services[serviceName]
202 203 def callback(result, serviceName): 204 self.log.debug('Loaded service %s from zenhub' % serviceName) 205 self.services[serviceName] = result 206 result.notifyOnDisconnect(removeService) 207 return result 208 209 def errback(error, serviceName): 210 self.log.debug('errback after getting service %s' % serviceName) 211 self.log.error('Could not retrieve service %s' % serviceName) 212 if serviceName in self.services: 213 del self.services[serviceName] 214 return error 215 216 d = self.perspective.callRemote('getService', 217 serviceName, 218 self.options.monitor, 219 serviceListeningInterface or self) 220 d.addCallback(callback, serviceName) 221 d.addErrback(errback, serviceName) 222 return d 223
224 - def getInitialServices(self):
225 """ 226 After connecting to zenhub, gather our initial list of services. 227 """ 228 def errback(error): 229 if isinstance(error, Failure): 230 self.log.critical( "Invalid monitor: %s" % self.options.monitor) 231 reactor.stop() 232 return defer.fail(RemoteBadMonitor( 233 "Invalid monitor: %s" % self.options.monitor)) 234 return error
235 236 self.log.debug('Setting up initial services: %s' % \ 237 ', '.join(self.initialServices)) 238 d = defer.DeferredList( 239 [self.getService(name) for name in self.initialServices], 240 fireOnOneErrback=True, consumeErrors=True) 241 d.addErrback(errback) 242 return d 243 244
245 - def connected(self):
246 pass
247
248 - def run(self):
249 self.log.debug('Starting PBDaemon initialization') 250 d = self.connect() 251 def callback(result): 252 self.sendEvent(self.startEvent) 253 self.pushEventsLoop() 254 self.log.debug('Calling connected.') 255 self.connected() 256 return result
257 d.addCallback(callback) 258 reactor.run() 259 self.log.info('%s shutting down' % self.name) 260 if self._customexitcode: 261 sys.exit(self._customexitcode) 262
263 - def sigTerm(self, signum=None, frame=None):
264 try: 265 ZenDaemon.sigTerm(self, signum, frame) 266 except SystemExit: 267 pass
268
269 - def setExitCode(self, exitcode):
270 self._customexitcode = exitcode
271
272 - def stop(self, ignored=''):
273 def stopNow(ignored): 274 if reactor.running: 275 reactor.stop()
276 if reactor.running and not self.stopped: 277 self.stopped = True 278 if 'EventService' in self.services: 279 # send stop event if we don't have an implied --cycle, 280 # or if --cycle has been specified 281 if not hasattr(self.options, 'cycle') or \ 282 getattr(self.options, 'cycle', True): 283 self.sendEvent(self.stopEvent) 284 # give the reactor some time to send the shutdown event 285 drive(self.pushEvents).addBoth(stopNow) 286 # but not too much time 287 reactor.callLater(1, stopNow, True) # requires bogus arg 288 self.log.debug( "Sent a 'stop' event" ) 289 else: 290 self.log.debug( "No event sent as no EventService available." ) 291 else: 292 self.log.debug( "stop() called when not running" ) 293
294 - def sendEvents(self, events):
295 map(self.sendEvent, events)
296
297 - def sendEvent(self, event, **kw):
298 ''' Add event to queue of events to be sent. If we have an event 299 service then process the queue. 300 ''' 301 if not reactor.running: return 302 event = event.copy() 303 event['agent'] = self.name 304 event['manager'] = self.options.monitor 305 event.update(kw) 306 if not self.options.allowduplicateclears: 307 statusKey = ( event['device'], 308 event.get('component', None), 309 event.get('eventKey', None), 310 event.get('eventClass', None) ) 311 severity = event.get('severity', None) 312 status = self._eventStatus.get(statusKey, None) 313 self._eventStatus[statusKey] = severity 314 if severity == Clear and status == Clear: 315 self.log.debug("Dropping useless clear event %r", event) 316 return 317 self.log.debug("Queueing event %r", event) 318 self.eventQueue.append(event) 319 self.log.debug("Total of %d queued events" % len(self.eventQueue))
320
321 - def pushEventsLoop(self):
322 """Periodially, wake up and flush events to ZenHub. 323 """ 324 reactor.callLater(self.options.eventflushseconds, self.pushEventsLoop) 325 drive(self.pushEvents)
326
327 - def pushEvents(self, driver):
328 """Flush events to ZenHub. 329 """ 330 try: 331 # Set a maximum size on the eventQueue to avoid consuming all RAM. 332 queueLen = len(self.eventQueue) 333 if queueLen > self.options.maxqueuelen: 334 self.log.warn('Queue exceeded maximum length: %d/%d. Trimming', 335 queueLen, self.options.maxqueuelen) 336 diff = queueLen - self.options.maxqueuelen 337 self.eventQueue = self.eventQueue[diff:] 338 339 # are we already shutting down? 340 if not reactor.running: 341 return 342 if self._sendingEvents: 343 return 344 # try to send everything we have, serially 345 self._sendingEvents = True 346 while self.eventQueue: 347 # are still connected to ZenHub? 348 evtSvc = self.services.get('EventService', None) 349 if not evtSvc: break 350 # send the events in large bundles, carefully reducing 351 # the eventQueue in case we get in here more than once 352 chunkSize = self.options.eventflushchunksize 353 events = self.eventQueue[:chunkSize] 354 self.eventQueue = self.eventQueue[chunkSize:] 355 # send the events and wait for the response 356 yield evtSvc.callRemote('sendEvents', events) 357 try: 358 driver.next() 359 except ConnectionLost, ex: 360 self.log.error('Error sending event: %s' % ex) 361 self.eventQueue = events + self.eventQueue 362 break 363 self._sendingEvents = False 364 except Exception, ex: 365 self._sendingEvents = False 366 self.log.exception(ex)
367
368 - def heartbeat(self):
369 'if cycling, send a heartbeat, else, shutdown' 370 if not self.options.cycle: 371 self.stop() 372 return 373 self.sendEvent(self.heartbeatEvent, timeout=self.heartbeatTimeout) 374 # heartbeat is normally 3x cycle time 375 self.niceDoggie(self.heartbeatTimeout / 3)
376 377
378 - def remote_getName(self):
379 return self.name
380 381
382 - def remote_shutdown(self, unused):
383 self.stop() 384 self.sigTerm()
385 386
387 - def remote_setPropertyItems(self, items):
388 pass
389 390 391 @translateError
392 - def remote_updateThresholdClasses(self, classes):
393 from Products.ZenUtils.Utils import importClass 394 self.log.debug("Loading classes %s", classes) 395 for c in classes: 396 try: 397 importClass(c) 398 except ImportError: 399 self.log.exception("Unable to import class %s", c)
400 401
402 - def buildOptions(self):
403 self.parser.add_option('--hubhost', 404 dest='hubhost', 405 default=DEFAULT_HUB_HOST, 406 help='Host of zenhub daemon.' 407 ' Default is %s.' % DEFAULT_HUB_HOST) 408 self.parser.add_option('--hubport', 409 dest='hubport', 410 type='int', 411 default=DEFAULT_HUB_PORT, 412 help='Port zenhub listens on.' 413 'Default is %s.' % DEFAULT_HUB_PORT) 414 self.parser.add_option('--hubusername', 415 dest='hubusername', 416 default=DEFAULT_HUB_USERNAME, 417 help='Username for zenhub login.' 418 ' Default is %s.' % DEFAULT_HUB_USERNAME) 419 self.parser.add_option('--hubpassword', 420 dest='hubpassword', 421 default=DEFAULT_HUB_PASSWORD, 422 help='Password for zenhub login.' 423 ' Default is %s.' % DEFAULT_HUB_PASSWORD) 424 self.parser.add_option('--monitor', 425 dest='monitor', 426 default=DEFAULT_HUB_MONITOR, 427 help='Name of monitor instance to use for' 428 ' configuration. Default is %s.' 429 % DEFAULT_HUB_MONITOR) 430 self.parser.add_option('--initialHubTimeout', 431 dest='hubtimeout', 432 type='int', 433 default=30, 434 help='Initial time to wait for a ZenHub ' 435 'connection') 436 self.parser.add_option('--allowduplicateclears', 437 dest='allowduplicateclears', 438 default=False, 439 action='store_true', 440 help='Send clear events even when the most ' 441 'recent event was also a clear event.') 442 443 self.parser.add_option('--eventflushseconds', 444 dest='eventflushseconds', 445 default=5., 446 type='float', 447 help='Seconds between attempts to flush ' 448 'events to ZenHub.') 449 450 self.parser.add_option('--eventflushchunksize', 451 dest='eventflushchunksize', 452 default=50, 453 type='int', 454 help='Number of events to send to ZenHub' 455 'at one time') 456 457 self.parser.add_option('--maxqueuelen', 458 dest='maxqueuelen', 459 default=5000, 460 type='int', 461 help='Maximum number of events to queue') 462 463 464 ZenDaemon.buildOptions(self)
465