1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 __doc__='''zenxevent
16
17 Creates events from xml rpc calls.
18
19 $Id$
20 '''
21
22 __version__ = "$Revision$"[11:-2]
23
24 from socket import getfqdn
25 import os
26
27
28 from twisted.cred import portal, checkers, error, credentials
29 from twisted.spread import pb
30
31 from twisted.internet import reactor, defer
32 from twisted.python import failure
33 from twisted.web import server, xmlrpc
34 from zope.interface import implements
35
36 import Globals
37
38 from Products.ZenUtils.ZCmdBase import ZCmdBase
39 from Products.ZenUtils.Utils import zenPath
40 from Products.ZenEvents.Event import Event, EventHeartbeat
41 from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop
42 import transaction
43 from zExceptions import NotFound
44
45 from XmlRpcService import XmlRpcService
46
47 import logging
48 log = logging.getLogger('zenhub')
49
50 XML_RPC_PORT = 8081
51 PB_PORT = 8789
52
54 "Provide some level of authentication for XML/RPC calls"
55
59
60
62 """
63 Call the inherited render engine after authentication succeeds.
64 See @L{XmlRpcService.XmlRpcService.Render}.
65 """
66 return XmlRpcService.render(self, request)
67
68
70 """
71 Render an XMLRPC error indicating an authentication failure.
72 @type request: HTTPRequest
73 @param request: the request for this xmlrpc call.
74 @return: None
75 """
76 self._cbRender(xmlrpc.Fault(self.FAILURE, "Unauthorized"), request)
77
78
80 """
81 Unpack the authorization header and check the credentials.
82 @type request: HTTPRequest
83 @param request: the request for this xmlrpc call.
84 @return: NOT_DONE_YET
85 """
86 auth = request.received_headers.get('authorization', None)
87 if not auth:
88 self.unauthorized(request)
89 else:
90 try:
91 type, encoded = auth.split()
92 if type not in ('Basic',):
93 self.unauthorized(request)
94 else:
95 user, passwd = encoded.decode('base64').split(':')
96 c = credentials.UsernamePassword(user, passwd)
97 d = self.checker.requestAvatarId(c)
98 d.addCallback(self.doRender, request)
99 def error(reason, request):
100 self.unauthorized(request)
101 d.addErrback(error, request)
102 except Exception:
103 self.unauthorized()
104 return server.NOT_DONE_YET
105
106
108 "Connect collectors to their configuration Services"
109
112
117 """
118 Allow a collector to find a Hub service by name. It also
119 associates the service with a collector so that changes can be
120 pushed back out to collectors.
121
122 @type serviceName: string
123 @param serviceName: a name, like 'EventService'
124 @type instance: string
125 @param instance: the collector's instance name, like 'localhost'
126 @type listener: a remote reference to the collector
127 @param listener: the callback interface to the collector
128 @return a remote reference to a service
129 """
130 service = self.hub.getService(serviceName, instance)
131 if listener:
132 service.addListener(listener)
133 return service
134
135
137 """
138 Following the Twisted authentication framework.
139 See http://twistedmatrix.com/projects/core/documentation/howto/cred.html
140 """
141 implements(portal.IRealm)
142
145
150
151
153 """
154 Listen for changes to objects in the Zeo database and update the
155 collectors' configuration.
156
157 The remote collectors connect the ZenHub and request configuration
158 information and stay connected. When changes are detected in the
159 Zeo database configuration updates are sent out to collectors
160 asynchronously. In this way, changes made in the web GUI can
161 affect collection immediately, instead of waiting for a
162 configuration cycle.
163
164 Each collector uses a different, pluggable service within ZenHub
165 to translate objects into configuration and data. ZenPacks can
166 add services for their collectors. Collectors communicate using
167 Twisted's Perspective Broker, which provides authenticated,
168 asynchronous, bidirectional method invocation.
169
170 ZenHub also provides an XmlRPC interface to some common services
171 to support collectors written in other languages.
172 """
173
174 totalTime = 0.
175 totalEvents = 0
176 maxTime = 0.
177 name = 'zenhub'
178
201
202
204 """
205 Override the kind of zeo connection we have so we can listen
206 to Zeo object updates. Updates comes as OID invalidations.
207
208 @return: None
209 """
210 from ZEO.cache import ClientCache as ClientCacheBase
211 class ClientCache(ClientCacheBase):
212 def invalidate(s, oid, version, tid):
213 self.changes.insert(0, oid)
214 ClientCacheBase.invalidate(s, oid, version, tid)
215
216 from ZEO.ClientStorage import ClientStorage as ClientStorageBase
217 class ClientStorage(ClientStorageBase):
218 ClientCacheClass = ClientCache
219
220
221
222 if self.options.pcachename is None:
223 self.options.pcachename = 'zenhub'
224 storage = ClientStorage((self.options.host, self.options.port),
225 client=self.options.pcachename,
226 var=self.options.pcachedir,
227 cache_size=self.options.pcachesize*1024*1024)
228 from ZODB import DB
229 self.db = DB(storage, cache_size=self.options.cachesize)
230
231
244
245
247 """
248 Perform one cycle of update notifications.
249
250 @return: None
251 """
252 while self.changes:
253 oid = self.changes.pop()
254 self.log.debug("Got oid %r" % oid)
255 obj = self.dmd._p_jar[oid]
256 self.log.debug("Object %r changed" % obj)
257 try:
258 obj = obj.__of__(self.dmd).primaryAq()
259 self.log.debug("Noticing object %s changed" % obj.getPrimaryUrlPath())
260 except AttributeError, ex:
261 self.log.debug("Noticing object %s " % obj)
262 for s in self.services.values():
263 s.deleted(obj)
264 else:
265 for s in self.services.values():
266 s.update(obj)
267
268
270 """
271 Useful method for posting events to the EventManager.
272
273 @type kw: keywords (dict)
274 @param kw: the values for an event: device, summary, etc.
275 @return: None
276 """
277 if not 'device' in kw:
278 kw['device'] = getfqdn()
279 if not 'component' in kw:
280 kw['component'] = self.name
281 try:
282 self.zem.sendEvent(Event(**kw))
283 except:
284 self.log.exception("Unable to send an event")
285
286
288 """
289 Load the password file
290
291 @return: an object satisfying the ICredentialsChecker
292 interface using a password file or an empty list if the file
293 is not available. Uses the file specified in the --passwd
294 command line option.
295 """
296 try:
297 return checkers.FilePasswordDB(self.options.passwordfile)
298 except Exception, ex:
299 self.log.exception("Unable to load %s", self.options.passwordfile)
300 return []
301
302
304 """
305 Helper method to load services dynamically for a collector.
306 Returned instances are cached: reconnecting collectors will
307 get the same service object.
308
309 @type name: string
310 @param name: the dotted-name of the module to load
311 (uses @L{Products.ZenUtils.Utils.importClass})
312 @param instance: string
313 @param instance: each service serves only one specific collector instances (like 'localhost'). instance defines the collector's instance name.
314 @return: a service loaded from ZenHub/services or one of the zenpacks.
315 """
316 try:
317 return self.services[name, instance]
318 except KeyError:
319 from Products.ZenUtils.Utils import importClass
320 try:
321 ctor = importClass(name)
322 except ImportError:
323 ctor = importClass('Products.ZenHub.services.%s' % name, name)
324 svc = ctor(self.dmd, instance)
325 self.services[name, instance] = svc
326 return svc
327
328
330 """
331 Since we don't do anything on a regular basis, just
332 push heartbeats regularly.
333
334 @return: None
335 """
336 seconds = 30
337 evt = EventHeartbeat(getfqdn(), self.name, 3*seconds)
338 self.zem.sendEvent(evt)
339 reactor.callLater(seconds, self.heartbeat)
340
341
358
359
361 """
362 Start the main event loop.
363
364 @return: None
365 """
366 reactor.run(installSignalHandlers=False)
367
368
370 """
371 Adds our command line options to ZCmdBase command line options.
372
373 @return: None
374 """
375 ZCmdBase.buildOptions(self)
376 self.parser.add_option('--xport',
377 '-x',
378 dest='xmlrpcport',
379 type='int',
380 help='Port to use for XML-based Remote Procedure Calls (RPC)',
381 default=XML_RPC_PORT)
382 self.parser.add_option('--pbport',
383 dest='pbport',
384 type='int',
385 help="Port to use for Twisted's pb service",
386 default=PB_PORT)
387 self.parser.add_option('--passwd',
388 dest='passwordfile',
389 type='string',
390 help='File where passwords are stored',
391 default=zenPath('etc','hubpasswd'))
392
393
394 if __name__ == '__main__':
395 z = ZenHub()
396 z.main()
397