Package ZenHub :: Module PBDaemon
[hide private]
[frames] | no frames]

Source Code for Module ZenHub.PBDaemon

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  #!/usr/bin/python 
 14   
 15  __doc__='''PBDaemon 
 16   
 17  Base for daemons that connect to zenhub 
 18   
 19  ''' 
 20   
 21  import Globals 
 22  from Products.ZenUtils.ZenDaemon import ZenDaemon 
 23  #from Products.ZenUtils.Step import Step 
 24  import Products.ZenEvents.Event as Event 
 25  from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory 
 26   
 27  import socket 
 28   
 29  from twisted.internet import reactor, defer 
 30  from twisted.cred import credentials 
 31  from twisted.spread import pb 
 32   
 33  from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \ 
 34                                                  Clear, Warning 
 35   
 36  from socket import getfqdn 
 37   
 38  PB_PORT = 8789 
 39   
 40  startEvent = { 
 41      'eventClass': App_Start,  
 42      'summary': 'started', 
 43      'severity': Clear, 
 44      } 
 45   
 46  stopEvent = { 
 47      'eventClass':App_Stop,  
 48      'summary': 'stopped', 
 49      'severity': Warning, 
 50      } 
 51   
 52   
 53  DEFAULT_HUB_HOST = 'localhost' 
 54  DEFAULT_HUB_PORT = PB_PORT 
 55  DEFAULT_HUB_USERNAME = 'admin' 
 56  DEFAULT_HUB_PASSWORD = 'zenoss' 
 57  DEFAULT_HUB_MONITOR = 'localhost' 
 58   
59 -class HubDown(Exception): pass
60
61 -class FakeRemote:
62 - def callRemote(self, name, *args):
63 from twisted.internet import defer 64 return defer.fail(HubDown("ZenHub is down"))
65
66 -class PBDaemon(ZenDaemon, pb.Referenceable):
67 68 name = 'pbdaemon' 69 initialServices = ['EventService'] 70
71 - def __init__(self, noopts=0, keeproot=False):
72 ZenDaemon.__init__(self, noopts, keeproot) 73 self.perspective = None 74 self.services = {} 75 self.eventQueue = [] 76 self.startEvent = startEvent.copy() 77 self.stopEvent = stopEvent.copy() 78 for evt in self.startEvent, self.stopEvent: 79 evt.update(dict(component=self.name, device=getfqdn())) 80 self.initialConnect = defer.Deferred() 81 self.stopped = False
82 83
84 - def gotPerspective(self, perspective):
85 ''' This gets called every time we reconnect. 86 ''' 87 self.log.warning("Reconnected to ZenHub") 88 self.perspective = perspective 89 d2 = self.getInitialServices() 90 if self.initialConnect: 91 self.log.debug('chaining getInitialServices with d2') 92 self.initialConnect, d = None, self.initialConnect 93 d2.chainDeferred(d)
94 95
96 - def connect(self):
97 factory = ReconnectingPBClientFactory() 98 self.log.debug("Connecting to %s", self.options.hubhost) 99 reactor.connectTCP(self.options.hubhost, self.options.hubport, factory) 100 username = self.options.username 101 password = self.options.password 102 self.log.debug("Logging in as %s", username) 103 c = credentials.UsernamePassword(username, password) 104 factory.gotPerspective = self.gotPerspective 105 factory.startLogin(c) 106 return self.initialConnect
107 108
109 - def eventService(self):
110 return self.getServiceNow('EventService')
111 112
113 - def getServiceNow(self, svcName):
114 if not self.services.has_key(svcName): 115 self.log.error('getServiceNow returning FakeRemote for %s' % svcName) 116 return self.services.get(svcName, None) or FakeRemote()
117 118
119 - def getService(self, serviceName, serviceListeningInterface=None):
120 ''' Attempt to get a service from zenhub. Returns a deferred. 121 When service is retrieved it is stashed in self.services with 122 serviceName as the key. When getService is called it will first 123 check self.services and if serviceName is already there it will return 124 the entry from self.services wrapped in a defer.succeed 125 ''' 126 if self.services.has_key(serviceName): 127 return defer.succeed(self.services[serviceName]) 128 def removeService(ignored): 129 self.log.debug('removing service %s' % serviceName) 130 if serviceName in self.services: 131 del self.services[serviceName]
132 def callback(result, serviceName): 133 self.log.debug('callback after getting service %s' % serviceName) 134 self.services[serviceName] = result 135 result.notifyOnDisconnect(removeService) 136 return result
137 def errback(error, serviceName): 138 self.log.debug('errback after getting service %s' % serviceName) 139 self.log.error('Could not retrieve service %s' % serviceName) 140 if serviceName in self.service: 141 del self.services[serviceName] 142 #return error 143 d = self.perspective.callRemote('getService', 144 serviceName, 145 self.options.monitor, 146 serviceListeningInterface or self) 147 d.addCallback(callback, serviceName) 148 d.addErrback(errback, serviceName) 149 return d 150
151 - def getInitialServices(self):
152 self.log.debug('setting up services %s' % 153 ', '.join([n for n in self.initialServices])) 154 d = defer.DeferredList( 155 [self.getService(name) for name in self.initialServices], 156 fireOnOneErrback=True, consumeErrors=True) 157 return d
158 159
160 - def connected(self):
161 pass
162
163 - def run(self):
164 self.log.debug('run') 165 d = self.connect() 166 def callback(result): 167 self.log.debug('Calling connected.') 168 self.log.debug('connected') 169 self.sendEvent(self.startEvent) 170 self.connected() 171 return result
172 def errback(error): 173 self.log.error('Unable to connect to zenhub: \n%s' % error) 174 self.stop() 175 d.addCallbacks(callback, errback) 176 reactor.run() 177 self.log.info('%s shutting down' % self.name) 178
179 - def sigTerm(self, *unused):
180 try: 181 ZenDaemon.sigTerm(self, *unused) 182 except SystemExit: 183 pass
184
185 - def stop(self):
186 if reactor.running and not self.stopped: 187 self.stopped = True 188 if 'EventService' in self.services: 189 self.sendEvent(self.stopEvent) 190 # give the reactor some time to send the shutdown event 191 # we could get more creative an add callbacks for event 192 # sends, which would mean we could wait longer, only as long 193 # as it took to send 194 reactor.callLater(1, reactor.stop) 195 else: 196 reactor.stop()
197
198 - def sendEvent(self, event, **kw):
199 ''' Add event to queue of events to be sent. If we have an event 200 service then process the queue. 201 ''' 202 event = event.copy() 203 event['agent'] = self.name 204 event['manager'] = self.options.monitor 205 event.update(kw) 206 self.log.debug("Sending event %r", event) 207 def errback(error, event): 208 # If we get an error when sending an event we add it back to the 209 # queue. This is great if the eventservice is just temporarily 210 # unavailable. This is not so good if there is a problem with 211 # this event in particular, in which case we'll repeatedly 212 # attempt to send it. We need to do some analysis of the error 213 # before sticking event back in the queue. 214 # 215 # Maybe this is overkill and if we have an operable 216 # event service we should just log events that don't get sent 217 # and then drop them. 218 self.log.error('Error sending event: %s' % error) 219 self.eventQueue.append(event)
220 if event: 221 self.eventQueue.append(event) 222 evtSvc = self.services.get('EventService', None) 223 if evtSvc: 224 for i in range(len(self.eventQueue)): 225 event = self.eventQueue[0] 226 del self.eventQueue[0] 227 d = evtSvc.callRemote('sendEvent', event) 228 d.addErrback(errback, event) 229 230
231 - def remote_getName(self):
232 return self.name
233 234
235 - def remote_shutdown(self, result):
236 self.stop() 237 self.sigTerm()
238 239
240 - def buildOptions(self):
241 self.parser.add_option('--hub-host', 242 dest='hubhost', 243 default=DEFAULT_HUB_HOST, 244 help='Host of zenhub daemon.' 245 ' Default is %s.' % DEFAULT_HUB_HOST) 246 self.parser.add_option('--hub-port', 247 dest='hubport', 248 default=DEFAULT_HUB_PORT, 249 help='Port zenhub listens on.' 250 'Default is %s.' % DEFAULT_HUB_PORT) 251 self.parser.add_option('--username', 252 dest='username', 253 default=DEFAULT_HUB_USERNAME, 254 help='Username for zenhub login.' 255 ' Default is %s.' % DEFAULT_HUB_USERNAME) 256 self.parser.add_option('--password', 257 dest='password', 258 default=DEFAULT_HUB_PASSWORD, 259 help='Password for zenhub login.' 260 ' Default is %s.' % DEFAULT_HUB_PASSWORD) 261 self.parser.add_option('--monitor', 262 dest='monitor', 263 default=DEFAULT_HUB_MONITOR, 264 help='Name of monitor instance to use for' 265 ' configuration. Default is %s.' 266 % DEFAULT_HUB_MONITOR) 267 268 ZenDaemon.buildOptions(self)
269