Package ZenHub :: Module PBDaemon
[hide private]
[frames] | no frames]

Source Code for Module ZenHub.PBDaemon

  1  ########################################################################## 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__ = """PBDaemon 
 15   
 16  Base for daemons that connect to zenhub 
 17   
 18  """ 
 19   
 20  import sys 
 21  import traceback 
 22   
 23  import Globals 
 24   
 25  from Products.ZenUtils.ZenDaemon import ZenDaemon 
 26  from Products.ZenEvents.ZenEventClasses import Heartbeat 
 27  from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory 
 28  from Products.ZenUtils.DaemonStats import DaemonStats 
 29  from Products.ZenUtils.Driver import drive 
 30  from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \ 
 31                                                  Clear, Warning 
 32   
 33  from twisted.cred import credentials 
 34  from twisted.internet import reactor, defer 
 35  from twisted.internet.error import ConnectionLost 
 36  from twisted.spread import pb 
 37  from twisted.python.failure import Failure 
 38   
 39  from ZODB.POSException import ConflictError 
 40   
41 -class RemoteException(Exception, pb.Copyable, pb.RemoteCopy):
42 "Exception that can cross the PB barrier"
43 - def __init__(self, msg, tb):
44 Exception.__init__(self, msg) 45 self.traceback = tb
46 - def __str__(self):
47 return Exception.__str__(self) + self.traceback
48 49 pb.setUnjellyableForClass(RemoteException, RemoteException) 50 51 # ZODB conflicts
52 -class RemoteConflictError(RemoteException): pass
53 pb.setUnjellyableForClass(RemoteConflictError, RemoteConflictError) 54 55 # Invalid monitor specified
56 -class RemoteBadMonitor(RemoteException): pass
57
58 -def translateError(callable):
59 """ 60 Decorator function to wrap remote exceptions into something 61 understandable by our daemon. 62 63 @parameter callable: function to wrap 64 @type callable: function 65 @return: function's return or an exception 66 @rtype: various 67 """ 68 def inner(*args, **kw): 69 """ 70 Interior decorator 71 """ 72 try: 73 return callable(*args, **kw) 74 except ConflictError, ex: 75 raise RemoteConflictError( 76 'Remote exception: %s: %s' % (ex.__class__, ex), 77 traceback.format_exc()) 78 except Exception, ex: 79 raise RemoteException( 80 'Remote exception: %s: %s' % (ex.__class__, ex), 81 traceback.format_exc())
82 return inner 83 84 85 PB_PORT = 8789 86 87 startEvent = { 88 'eventClass': App_Start, 89 'summary': 'started', 90 'severity': Clear, 91 } 92 93 stopEvent = { 94 'eventClass':App_Stop, 95 'summary': 'stopped', 96 'severity': Warning, 97 } 98 99 100 DEFAULT_HUB_HOST = 'localhost' 101 DEFAULT_HUB_PORT = PB_PORT 102 DEFAULT_HUB_USERNAME = 'admin' 103 DEFAULT_HUB_PASSWORD = 'zenoss' 104 DEFAULT_HUB_MONITOR = 'localhost' 105
106 -class HubDown(Exception): pass
107
108 -class FakeRemote:
109 - def callRemote(self, *unused):
110 return defer.fail(HubDown("ZenHub is down"))
111
112 -class PBDaemon(ZenDaemon, pb.Referenceable):
113 114 name = 'pbdaemon' 115 initialServices = ['EventService'] 116 heartbeatEvent = {'eventClass':Heartbeat} 117 heartbeatTimeout = 60*3 118 _customexitcode = 0 119 _sendingEvents = False 120
121 - def __init__(self, noopts=0, keeproot=False):
122 try: 123 ZenDaemon.__init__(self, noopts, keeproot) 124 125 except IOError: 126 import traceback 127 self.log.critical( traceback.format_exc( 0 ) ) 128 sys.exit(1) 129 130 self.rrdStats = DaemonStats() 131 self.perspective = None 132 self.services = {} 133 self.eventQueue = [] 134 self.startEvent = startEvent.copy() 135 self.stopEvent = stopEvent.copy() 136 details = dict(component=self.name, device=self.options.monitor) 137 for evt in self.startEvent, self.stopEvent, self.heartbeatEvent: 138 evt.update(details) 139 self.initialConnect = defer.Deferred() 140 self.stopped = False 141 self._eventStatus = {}
142
143 - def gotPerspective(self, perspective):
144 """ 145 This gets called every time we reconnect. 146 147 @parameter perspective: Twisted perspective object 148 @type perspective: Twisted perspective object 149 """ 150 self.log.info("Connected to ZenHub") 151 self.perspective = perspective 152 d2 = self.getInitialServices() 153 if self.initialConnect: 154 self.log.debug('Chaining getInitialServices with d2') 155 self.initialConnect, d = None, self.initialConnect 156 d2.chainDeferred(d)
157 158
159 - def connect(self):
160 factory = ReconnectingPBClientFactory() 161 self.log.info("Connecting to %s:%d" % (self.options.hubhost, 162 self.options.hubport)) 163 reactor.connectTCP(self.options.hubhost, self.options.hubport, factory) 164 username = self.options.hubusername 165 password = self.options.hubpassword 166 self.log.debug("Logging in as %s" % username) 167 c = credentials.UsernamePassword(username, password) 168 factory.gotPerspective = self.gotPerspective 169 factory.startLogin(c) 170 def timeout(d): 171 if not d.called: 172 self.log.error('Timeout connecting to zenhub: is it running?')
173 reactor.callLater(self.options.hubtimeout, timeout, self.initialConnect) 174 return self.initialConnect
175 176
177 - def eventService(self):
178 return self.getServiceNow('EventService')
179 180
181 - def getServiceNow(self, svcName):
182 if not self.services.has_key(svcName): 183 self.log.warning('No service %s named: ZenHub may be disconnected' % svcName) 184 return self.services.get(svcName, None) or FakeRemote()
185 186
187 - def getService(self, serviceName, serviceListeningInterface=None):
188 """ 189 Attempt to get a service from zenhub. Returns a deferred. 190 When service is retrieved it is stashed in self.services with 191 serviceName as the key. When getService is called it will first 192 check self.services and if serviceName is already there it will return 193 the entry from self.services wrapped in a defer.succeed 194 """ 195 if self.services.has_key(serviceName): 196 return defer.succeed(self.services[serviceName]) 197 198 def removeService(ignored): 199 self.log.debug('Removing service %s' % serviceName) 200 if serviceName in self.services: 201 del self.services[serviceName]
202 203 def callback(result, serviceName): 204 self.log.debug('Loaded service %s from zenhub' % serviceName) 205 self.services[serviceName] = result 206 result.notifyOnDisconnect(removeService) 207 return result 208 209 def errback(error, serviceName): 210 self.log.debug('errback after getting service %s' % serviceName) 211 self.log.error('Could not retrieve service %s' % serviceName) 212 if serviceName in self.services: 213 del self.services[serviceName] 214 return error 215 216 d = self.perspective.callRemote('getService', 217 serviceName, 218 self.options.monitor, 219 serviceListeningInterface or self) 220 d.addCallback(callback, serviceName) 221 d.addErrback(errback, serviceName) 222 return d 223
224 - def getInitialServices(self):
225 """ 226 After connecting to zenhub, gather our initial list of services. 227 """ 228 def errback(error): 229 if isinstance(error, Failure): 230 self.log.critical( "Invalid monitor: %s" % self.options.monitor) 231 reactor.stop() 232 return defer.fail(RemoteBadMonitor( 233 "Invalid monitor: %s" % self.options.monitor)) 234 return error
235 236 self.log.debug('Setting up initial services: %s' % \ 237 ', '.join(self.initialServices)) 238 d = defer.DeferredList( 239 [self.getService(name) for name in self.initialServices], 240 fireOnOneErrback=True, consumeErrors=True) 241 d.addErrback(errback) 242 return d 243 244
245 - def connected(self):
246 pass
247
248 - def run(self):
249 self.log.debug('Starting PBDaemon initialization') 250 d = self.connect() 251 def callback(result): 252 self.sendEvent(self.startEvent) 253 self.pushEventsLoop() 254 self.log.debug('Calling connected.') 255 self.connected() 256 return result
257 d.addCallback(callback) 258 reactor.run() 259 self.log.info('%s shutting down' % self.name) 260 if self._customexitcode: 261 sys.exit(self._customexitcode) 262
263 - def sigTerm(self, signum=None, frame=None):
264 try: 265 ZenDaemon.sigTerm(self, signum, frame) 266 except SystemExit: 267 pass
268
269 - def setExitCode(self, exitcode):
270 self._customexitcode = exitcode
271
272 - def stop(self, ignored=''):
273 def stopNow(ignored): 274 if reactor.running: 275 reactor.stop()
276 if reactor.running and not self.stopped: 277 self.stopped = True 278 if 'EventService' in self.services: 279 # send stop event if we don't have an implied --cycle, 280 # or if --cycle has been specified 281 if not hasattr(self.options, 'cycle') or \ 282 getattr(self.options, 'cycle', True): 283 self.sendEvent(self.stopEvent) 284 # give the reactor some time to send the shutdown event 285 drive(self.pushEvents).addBoth(stopNow) 286 # but not too much time 287 reactor.callLater(1, stopNow, True) # requires bogus arg 288 self.log.debug( "Sent a 'stop' event" ) 289 else: 290 self.log.debug( "No event sent as no EventService available." ) 291 else: 292 self.log.debug( "stop() called when not running" ) 293
294 - def sendEvents(self, events):
295 map(self.sendEvent, events)
296
297 - def sendEvent(self, event, **kw):
298 ''' Add event to queue of events to be sent. If we have an event 299 service then process the queue. 300 ''' 301 if not reactor.running: return 302 event = event.copy() 303 event['agent'] = self.name 304 event['manager'] = self.options.monitor 305 event.update(kw) 306 if not self.options.allowduplicateclears: 307 statusKey = ( event['device'], 308 event.get('component', None), 309 event.get('eventKey', None), 310 event.get('eventClass', None) ) 311 severity = event.get('severity', None) 312 status = self._eventStatus.get(statusKey, None) 313 self._eventStatus[statusKey] = severity 314 if severity == Clear and status == Clear: 315 self.log.debug("Dropping useless clear event %r", event) 316 return 317 self.log.debug("Queueing event %r", event) 318 self.eventQueue.append(event) 319 self.log.debug("Total of %d queued events" % len(self.eventQueue))
320
321 - def pushEventsLoop(self):
322 """Periodially, wake up and flush events to ZenHub. 323 """ 324 reactor.callLater(self.options.eventflushseconds, self.pushEventsLoop) 325 drive(self.pushEvents)
326
327 - def pushEvents(self, driver):
328 """Flush events to ZenHub. 329 """ 330 try: 331 # are we already shutting down? 332 if not reactor.running: 333 return 334 if self._sendingEvents: 335 return 336 # try to send everything we have, serially 337 self._sendingEvents = True 338 while self.eventQueue: 339 # are still connected to ZenHub? 340 evtSvc = self.services.get('EventService', None) 341 if not evtSvc: break 342 # send the events in large bundles, carefully reducing 343 # the eventQueue in case we get in here more than once 344 chunkSize = self.options.eventflushchunksize 345 events = self.eventQueue[:chunkSize] 346 self.eventQueue = self.eventQueue[chunkSize:] 347 # send the events and wait for the response 348 yield evtSvc.callRemote('sendEvents', events) 349 try: 350 driver.next() 351 except ConnectionLost, ex: 352 self.log.error('Error sending event: %s' % ex) 353 self.eventQueue = events + self.eventQueue 354 break 355 self._sendingEvents = False 356 except Exception, ex: 357 self._sendingEvents = False 358 self.log.exception(ex)
359
360 - def heartbeat(self):
361 'if cycling, send a heartbeat, else, shutdown' 362 if not self.options.cycle: 363 self.stop() 364 return 365 self.sendEvent(self.heartbeatEvent, timeout=self.heartbeatTimeout) 366 # heartbeat is normally 3x cycle time 367 self.niceDoggie(self.heartbeatTimeout / 3)
368 369
370 - def remote_getName(self):
371 return self.name
372 373
374 - def remote_shutdown(self, unused):
375 self.stop() 376 self.sigTerm()
377 378
379 - def remote_setPropertyItems(self, items):
380 pass
381 382 383 @translateError
384 - def remote_updateThresholdClasses(self, classes):
385 from Products.ZenUtils.Utils import importClass 386 self.log.debug("Loading classes %s", classes) 387 for c in classes: 388 try: 389 importClass(c) 390 except ImportError: 391 self.log.exception("Unable to import class %s", c)
392 393
394 - def buildOptions(self):
395 self.parser.add_option('--hubhost', 396 dest='hubhost', 397 default=DEFAULT_HUB_HOST, 398 help='Host of zenhub daemon.' 399 ' Default is %s.' % DEFAULT_HUB_HOST) 400 self.parser.add_option('--hubport', 401 dest='hubport', 402 type='int', 403 default=DEFAULT_HUB_PORT, 404 help='Port zenhub listens on.' 405 'Default is %s.' % DEFAULT_HUB_PORT) 406 self.parser.add_option('--hubusername', 407 dest='hubusername', 408 default=DEFAULT_HUB_USERNAME, 409 help='Username for zenhub login.' 410 ' Default is %s.' % DEFAULT_HUB_USERNAME) 411 self.parser.add_option('--hubpassword', 412 dest='hubpassword', 413 default=DEFAULT_HUB_PASSWORD, 414 help='Password for zenhub login.' 415 ' Default is %s.' % DEFAULT_HUB_PASSWORD) 416 self.parser.add_option('--monitor', 417 dest='monitor', 418 default=DEFAULT_HUB_MONITOR, 419 help='Name of monitor instance to use for' 420 ' configuration. Default is %s.' 421 % DEFAULT_HUB_MONITOR) 422 self.parser.add_option('--initialHubTimeout', 423 dest='hubtimeout', 424 type='int', 425 default=30, 426 help='Initial time to wait for a ZenHub ' 427 'connection') 428 self.parser.add_option('--allowduplicateclears', 429 dest='allowduplicateclears', 430 default=False, 431 action='store_true', 432 help='Send clear events even when the most ' 433 'recent event was also a clear event.') 434 435 self.parser.add_option('--eventflushseconds', 436 dest='eventflushseconds', 437 default=5., 438 type='float', 439 help='Seconds between attempts to flush ' 440 'events to ZenHub.') 441 442 self.parser.add_option('--eventflushchunksize', 443 dest='eventflushchunksize', 444 default=50, 445 type='int', 446 help='Number of events to send to ZenHub' 447 'at one time') 448 449 450 ZenDaemon.buildOptions(self)
451