Package ZenHub :: Module zenhub
[hide private]
[frames] | no frames]

Source Code for Module ZenHub.zenhub

  1  #! /usr/bin/env python  
  2  ########################################################################### 
  3  # 
  4  # This program is part of Zenoss Core, an open source monitoring platform. 
  5  # Copyright (C) 2007, Zenoss Inc. 
  6  # 
  7  # This program is free software; you can redistribute it and/or modify it 
  8  # under the terms of the GNU General Public License version 2 as published by 
  9  # the Free Software Foundation. 
 10  # 
 11  # For complete information please visit: http://www.zenoss.com/oss/ 
 12  # 
 13  ########################################################################### 
 14   
 15  __doc__='''zenxevent 
 16   
 17  Creates events from xml rpc calls. 
 18   
 19  $Id$ 
 20  ''' 
 21   
 22  __version__ = "$Revision$"[11:-2] 
 23   
 24  from socket import getfqdn 
 25  import os 
 26   
 27   
 28  from twisted.cred import portal, checkers, error, credentials 
 29  from twisted.spread import pb 
 30   
 31  from twisted.internet import reactor, defer 
 32  from twisted.python import failure 
 33  from twisted.web import server, xmlrpc 
 34  from zope.interface import implements 
 35   
 36  import Globals 
 37   
 38  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 39  from Products.ZenUtils.Utils import zenPath 
 40  from Products.ZenEvents.Event import Event, EventHeartbeat 
 41  from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop 
 42  import transaction 
 43  from zExceptions import NotFound 
 44   
 45  from XmlRpcService import XmlRpcService 
 46   
 47  import logging 
 48  log = logging.getLogger('zenhub') 
 49   
 50  XML_RPC_PORT = 8081 
 51  PB_PORT = 8789 
 52   
53 -class AuthXmlRpcService(XmlRpcService):
54 "Provide some level of authentication for XML/RPC calls" 55
56 - def __init__(self, dmd, checker):
57 XmlRpcService.__init__(self, dmd) 58 self.checker = checker
59 60
61 - def doRender(self, avatar, request):
62 """ 63 Call the inherited render engine after authentication succeeds. 64 See @L{XmlRpcService.XmlRpcService.Render}. 65 """ 66 return XmlRpcService.render(self, request)
67 68
69 - def unauthorized(self, request):
70 """ 71 Render an XMLRPC error indicating an authentication failure. 72 @type request: HTTPRequest 73 @param request: the request for this xmlrpc call. 74 @return: None 75 """ 76 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
77 78
79 - def render(self, request):
80 """ 81 Unpack the authorization header and check the credentials. 82 @type request: HTTPRequest 83 @param request: the request for this xmlrpc call. 84 @return: NOT_DONE_YET 85 """ 86 auth = request.received_headers.get('authorization', None) 87 if not auth: 88 self.unauthorized(request) 89 else: 90 try: 91 type, encoded = auth.split() 92 if type not in ('Basic',): 93 self.unauthorized(request) 94 else: 95 user, passwd = encoded.decode('base64').split(':') 96 c = credentials.UsernamePassword(user, passwd) 97 d = self.checker.requestAvatarId(c) 98 d.addCallback(self.doRender, request) 99 def error(reason, request): 100 self.unauthorized(request)
101 d.addErrback(error, request) 102 except Exception: 103 self.unauthorized() 104 return server.NOT_DONE_YET
105 106
107 -class HubAvitar(pb.Avatar):
108 "Connect collectors to their configuration Services" 109
110 - def __init__(self, hub):
111 self.hub = hub
112
113 - def perspective_getService(self, 114 serviceName, 115 instance = None, 116 listener = None):
117 """ 118 Allow a collector to find a Hub service by name. It also 119 associates the service with a collector so that changes can be 120 pushed back out to collectors. 121 122 @type serviceName: string 123 @param serviceName: a name, like 'EventService' 124 @type instance: string 125 @param instance: the collector's instance name, like 'localhost' 126 @type listener: a remote reference to the collector 127 @param listener: the callback interface to the collector 128 @return a remote reference to a service 129 """ 130 service = self.hub.getService(serviceName, instance) 131 if listener: 132 service.addListener(listener) 133 return service
134 135
136 -class HubRealm(object):
137 """ 138 Following the Twisted authentication framework. 139 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html 140 """ 141 implements(portal.IRealm) 142
143 - def __init__(self, hub):
144 self.hubAvitar = HubAvitar(hub)
145
146 - def requestAvatar(self, collName, mind, *interfaces):
147 if pb.IPerspective not in interfaces: 148 raise NotImplementedError 149 return pb.IPerspective, self.hubAvitar, lambda:None
150 151
152 -class ZenHub(ZCmdBase):
153 """ 154 Listen for changes to objects in the Zeo database and update the 155 collectors' configuration. 156 157 The remote collectors connect the ZenHub and request configuration 158 information and stay connected. When changes are detected in the 159 Zeo database configuration updates are sent out to collectors 160 asynchronously. In this way, changes made in the web GUI can 161 affect collection immediately, instead of waiting for a 162 configuration cycle. 163 164 Each collector uses a different, pluggable service within ZenHub 165 to translate objects into configuration and data. ZenPacks can 166 add services for their collectors. Collectors communicate using 167 Twisted's Perspective Broker, which provides authenticated, 168 asynchronous, bidirectional method invocation. 169 170 ZenHub also provides an XmlRPC interface to some common services 171 to support collectors written in other languages. 172 """ 173 174 totalTime = 0. 175 totalEvents = 0 176 maxTime = 0. 177 name = 'zenhub' 178
179 - def __init__(self):
180 """ 181 Hook ourselves up to the Zeo database and wait for collectors 182 to connect. 183 """ 184 self.changes = [] 185 ZCmdBase.__init__(self) 186 self.zem = self.dmd.ZenEventManager 187 self.services = {} 188 189 er = HubRealm(self) 190 checker = self.loadChecker() 191 pt = portal.Portal(er, [checker]) 192 reactor.listenTCP(self.options.pbport, pb.PBServerFactory(pt)) 193 194 xmlsvc = AuthXmlRpcService(self.dmd, checker) 195 reactor.listenTCP(self.options.xmlrpcport, server.Site(xmlsvc)) 196 197 self.sendEvent(eventClass=App_Start, 198 summary="%s started" % self.name, 199 severity=0) 200 reactor.callLater(5, self.processQueue)
201 202
203 - def zeoConnect(self):
204 """ 205 Override the kind of zeo connection we have so we can listen 206 to Zeo object updates. Updates comes as OID invalidations. 207 208 @return: None 209 """ 210 from ZEO.cache import ClientCache as ClientCacheBase 211 class ClientCache(ClientCacheBase): 212 def invalidate(s, oid, version, tid): 213 self.changes.insert(0, oid) 214 ClientCacheBase.invalidate(s, oid, version, tid)
215 216 from ZEO.ClientStorage import ClientStorage as ClientStorageBase 217 class ClientStorage(ClientStorageBase): 218 ClientCacheClass = ClientCache 219 220 # the cache needs to be persistent to get changes 221 # made when it was not running 222 if self.options.pcachename is None: 223 self.options.pcachename = 'zenhub' 224 storage = ClientStorage((self.options.host, self.options.port), 225 client=self.options.pcachename, 226 var=self.options.pcachedir, 227 cache_size=self.options.pcachesize*1024*1024) 228 from ZODB import DB 229 self.db = DB(storage, cache_size=self.options.cachesize) 230 231
232 - def processQueue(self):
233 """ 234 Periodically (once a second) process database changes 235 236 @return: None 237 """ 238 self.syncdb() # reads the object invalidations 239 try: 240 self.doProcessQueue() 241 except Exception, ex: 242 self.log.exception(ex) 243 reactor.callLater(1, self.processQueue)
244 245
246 - def doProcessQueue(self):
247 """ 248 Perform one cycle of update notifications. 249 250 @return: None 251 """ 252 while self.changes: 253 oid = self.changes.pop() 254 self.log.debug("Got oid %r" % oid) 255 obj = self.dmd._p_jar[oid] 256 self.log.debug("Object %r changed" % obj) 257 try: 258 obj = obj.__of__(self.dmd).primaryAq() 259 self.log.debug("Noticing object %s changed" % obj.getPrimaryUrlPath()) 260 except AttributeError, ex: 261 self.log.debug("Noticing object %s " % obj) 262 for s in self.services.values(): 263 s.deleted(obj) 264 else: 265 for s in self.services.values(): 266 s.update(obj)
267 268
269 - def sendEvent(self, **kw):
270 """ 271 Useful method for posting events to the EventManager. 272 273 @type kw: keywords (dict) 274 @param kw: the values for an event: device, summary, etc. 275 @return: None 276 """ 277 if not 'device' in kw: 278 kw['device'] = getfqdn() 279 if not 'component' in kw: 280 kw['component'] = self.name 281 try: 282 self.zem.sendEvent(Event(**kw)) 283 except: 284 self.log.exception("Unable to send an event")
285 286
287 - def loadChecker(self):
288 """ 289 Load the password file 290 291 @return: an object satisfying the ICredentialsChecker 292 interface using a password file or an empty list if the file 293 is not available. Uses the file specified in the --passwd 294 command line option. 295 """ 296 try: 297 return checkers.FilePasswordDB(self.options.passwordfile) 298 except Exception, ex: 299 self.log.exception("Unable to load %s", self.options.passwordfile) 300 return []
301 302
303 - def getService(self, name, instance):
304 """ 305 Helper method to load services dynamically for a collector. 306 Returned instances are cached: reconnecting collectors will 307 get the same service object. 308 309 @type name: string 310 @param name: the dotted-name of the module to load 311 (uses @L{Products.ZenUtils.Utils.importClass}) 312 @param instance: string 313 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name. 314 @return: a service loaded from ZenHub/services or one of the zenpacks. 315 """ 316 try: 317 return self.services[name, instance] 318 except KeyError: 319 from Products.ZenUtils.Utils import importClass 320 try: 321 ctor = importClass(name) 322 except ImportError: 323 ctor = importClass('Products.ZenHub.services.%s' % name, name) 324 svc = ctor(self.dmd, instance) 325 self.services[name, instance] = svc 326 return svc
327 328
329 - def heartbeat(self):
330 """ 331 Since we don't do anything on a regular basis, just 332 push heartbeats regularly. 333 334 @return: None 335 """ 336 seconds = 30 337 evt = EventHeartbeat(getfqdn(), self.name, 3*seconds) 338 self.zem.sendEvent(evt) 339 reactor.callLater(seconds, self.heartbeat)
340 341
342 - def sigTerm(self, signum, frame):
343 """ 344 Start a controlled shutdown of main loop on interrupt. 345 346 @param signum: unused. 347 @param frame: unused. 348 @return: None 349 """ 350 try: 351 ZCmdBase.sigTerm(self, signum, frame) 352 except SystemExit: 353 self.sendEvent(eventClass=App_Stop, 354 summary="%s stopped" % self.name, 355 severity=4) 356 if reactor.running: 357 reactor.callLater(1, reactor.stop)
358 359
360 - def main(self):
361 """ 362 Start the main event loop. 363 364 @return: None 365 """ 366 reactor.run(installSignalHandlers=False)
367 368
369 - def buildOptions(self):
370 """ 371 Adds our command line options to ZCmdBase command line options. 372 373 @return: None 374 """ 375 ZCmdBase.buildOptions(self) 376 self.parser.add_option('--xport', 377 '-x', 378 dest='xmlrpcport', 379 type='int', 380 help='Port to use for XML-based Remote Procedure Calls (RPC)', 381 default=XML_RPC_PORT) 382 self.parser.add_option('--pbport', 383 dest='pbport', 384 type='int', 385 help="Port to use for Twisted's pb service", 386 default=PB_PORT) 387 self.parser.add_option('--passwd', 388 dest='passwordfile', 389 type='string', 390 help='File where passwords are stored', 391 default=zenPath('etc','hubpasswd'))
392 393 394 if __name__ == '__main__': 395 z = ZenHub() 396 z.main() 397