1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """PBDaemon
15
16 Base for daemons that connect to zenhub
17
18 """
19
20 import sys
21 import traceback
22
23 import Globals
24
25 from Products.ZenUtils.ZenDaemon import ZenDaemon
26 from Products.ZenEvents.ZenEventClasses import Heartbeat
27 from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory
28 from Products.ZenUtils.DaemonStats import DaemonStats
29 from Products.ZenUtils.Driver import drive
30 from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \
31 Clear, Warning
32
33 from twisted.cred import credentials
34 from twisted.internet import reactor, defer
35 from twisted.internet.error import ConnectionLost
36 from twisted.spread import pb
37 from twisted.python.failure import Failure
38
39 from ZODB.POSException import ConflictError
40
42 "Exception that can cross the PB barrier"
44 Exception.__init__(self, msg)
45 self.traceback = tb
47 return Exception.__str__(self) + self.traceback
48
49 pb.setUnjellyableForClass(RemoteException, RemoteException)
50
51
53 pb.setUnjellyableForClass(RemoteConflictError, RemoteConflictError)
54
55
57
59 """
60 Decorator function to wrap remote exceptions into something
61 understandable by our daemon.
62
63 @parameter callable: function to wrap
64 @type callable: function
65 @return: function's return or an exception
66 @rtype: various
67 """
68 def inner(*args, **kw):
69 """
70 Interior decorator
71 """
72 try:
73 return callable(*args, **kw)
74 except ConflictError, ex:
75 raise RemoteConflictError(
76 'Remote exception: %s: %s' % (ex.__class__, ex),
77 traceback.format_exc())
78 except Exception, ex:
79 raise RemoteException(
80 'Remote exception: %s: %s' % (ex.__class__, ex),
81 traceback.format_exc())
82 return inner
83
84
85 PB_PORT = 8789
86
87 startEvent = {
88 'eventClass': App_Start,
89 'summary': 'started',
90 'severity': Clear,
91 }
92
93 stopEvent = {
94 'eventClass':App_Stop,
95 'summary': 'stopped',
96 'severity': Warning,
97 }
98
99
100 DEFAULT_HUB_HOST = 'localhost'
101 DEFAULT_HUB_PORT = PB_PORT
102 DEFAULT_HUB_USERNAME = 'admin'
103 DEFAULT_HUB_PASSWORD = 'zenoss'
104 DEFAULT_HUB_MONITOR = 'localhost'
105
107
111
112 -class PBDaemon(ZenDaemon, pb.Referenceable):
113
114 name = 'pbdaemon'
115 initialServices = ['EventService']
116 heartbeatEvent = {'eventClass':Heartbeat}
117 heartbeatTimeout = 60*3
118 _customexitcode = 0
119 _sendingEvents = False
120
121 - def __init__(self, noopts=0, keeproot=False):
142
144 """
145 This gets called every time we reconnect.
146
147 @parameter perspective: Twisted perspective object
148 @type perspective: Twisted perspective object
149 """
150 self.log.info("Connected to ZenHub")
151 self.perspective = perspective
152 d2 = self.getInitialServices()
153 if self.initialConnect:
154 self.log.debug('Chaining getInitialServices with d2')
155 self.initialConnect, d = None, self.initialConnect
156 d2.chainDeferred(d)
157
158
173 reactor.callLater(self.options.hubtimeout, timeout, self.initialConnect)
174 return self.initialConnect
175
176
179
180
185
186
187 - def getService(self, serviceName, serviceListeningInterface=None):
188 """
189 Attempt to get a service from zenhub. Returns a deferred.
190 When service is retrieved it is stashed in self.services with
191 serviceName as the key. When getService is called it will first
192 check self.services and if serviceName is already there it will return
193 the entry from self.services wrapped in a defer.succeed
194 """
195 if self.services.has_key(serviceName):
196 return defer.succeed(self.services[serviceName])
197
198 def removeService(ignored):
199 self.log.debug('Removing service %s' % serviceName)
200 if serviceName in self.services:
201 del self.services[serviceName]
202
203 def callback(result, serviceName):
204 self.log.debug('Loaded service %s from zenhub' % serviceName)
205 self.services[serviceName] = result
206 result.notifyOnDisconnect(removeService)
207 return result
208
209 def errback(error, serviceName):
210 self.log.debug('errback after getting service %s' % serviceName)
211 self.log.error('Could not retrieve service %s' % serviceName)
212 if serviceName in self.services:
213 del self.services[serviceName]
214 return error
215
216 d = self.perspective.callRemote('getService',
217 serviceName,
218 self.options.monitor,
219 serviceListeningInterface or self)
220 d.addCallback(callback, serviceName)
221 d.addErrback(errback, serviceName)
222 return d
223
225 """
226 After connecting to zenhub, gather our initial list of services.
227 """
228 def errback(error):
229 if isinstance(error, Failure):
230 self.log.critical( "Invalid monitor: %s" % self.options.monitor)
231 reactor.stop()
232 return defer.fail(RemoteBadMonitor(
233 "Invalid monitor: %s" % self.options.monitor))
234 return error
235
236 self.log.debug('Setting up initial services: %s' % \
237 ', '.join(self.initialServices))
238 d = defer.DeferredList(
239 [self.getService(name) for name in self.initialServices],
240 fireOnOneErrback=True, consumeErrors=True)
241 d.addErrback(errback)
242 return d
243
244
247
257 d.addCallback(callback)
258 reactor.run()
259 self.log.info('%s shutting down' % self.name)
260 if self._customexitcode:
261 sys.exit(self._customexitcode)
262
263 - def sigTerm(self, signum=None, frame=None):
264 try:
265 ZenDaemon.sigTerm(self, signum, frame)
266 except SystemExit:
267 pass
268
271
272 - def stop(self, ignored=''):
273 def stopNow(ignored):
274 if reactor.running:
275 reactor.stop()
276 if reactor.running and not self.stopped:
277 self.stopped = True
278 if 'EventService' in self.services:
279
280
281 if not hasattr(self.options, 'cycle') or \
282 getattr(self.options, 'cycle', True):
283 self.sendEvent(self.stopEvent)
284
285 drive(self.pushEvents).addBoth(stopNow)
286
287 reactor.callLater(1, stopNow, True)
288 self.log.debug( "Sent a 'stop' event" )
289 else:
290 self.log.debug( "No event sent as no EventService available." )
291 else:
292 self.log.debug( "stop() called when not running" )
293
296
298 ''' Add event to queue of events to be sent. If we have an event
299 service then process the queue.
300 '''
301 if not reactor.running: return
302 event = event.copy()
303 event['agent'] = self.name
304 event['manager'] = self.options.monitor
305 event.update(kw)
306 if not self.options.allowduplicateclears:
307 statusKey = ( event['device'],
308 event.get('component', None),
309 event.get('eventKey', None),
310 event.get('eventClass', None) )
311 severity = event.get('severity', None)
312 status = self._eventStatus.get(statusKey, None)
313 self._eventStatus[statusKey] = severity
314 if severity == Clear and status == Clear:
315 self.log.debug("Dropping useless clear event %r", event)
316 return
317 self.log.debug("Queueing event %r", event)
318 self.eventQueue.append(event)
319 self.log.debug("Total of %d queued events" % len(self.eventQueue))
320
326
328 """Flush events to ZenHub.
329 """
330 try:
331
332 if not reactor.running:
333 return
334 if self._sendingEvents:
335 return
336
337 self._sendingEvents = True
338 while self.eventQueue:
339
340 evtSvc = self.services.get('EventService', None)
341 if not evtSvc: break
342
343
344 chunkSize = self.options.eventflushchunksize
345 events = self.eventQueue[:chunkSize]
346 self.eventQueue = self.eventQueue[chunkSize:]
347
348 yield evtSvc.callRemote('sendEvents', events)
349 try:
350 driver.next()
351 except ConnectionLost, ex:
352 self.log.error('Error sending event: %s' % ex)
353 self.eventQueue = events + self.eventQueue
354 break
355 self._sendingEvents = False
356 except Exception, ex:
357 self._sendingEvents = False
358 self.log.exception(ex)
359
368
369
372
373
377
378
381
382
383 @translateError
385 from Products.ZenUtils.Utils import importClass
386 self.log.debug("Loading classes %s", classes)
387 for c in classes:
388 try:
389 importClass(c)
390 except ImportError:
391 self.log.exception("Unable to import class %s", c)
392
393
395 self.parser.add_option('--hubhost',
396 dest='hubhost',
397 default=DEFAULT_HUB_HOST,
398 help='Host of zenhub daemon.'
399 ' Default is %s.' % DEFAULT_HUB_HOST)
400 self.parser.add_option('--hubport',
401 dest='hubport',
402 type='int',
403 default=DEFAULT_HUB_PORT,
404 help='Port zenhub listens on.'
405 'Default is %s.' % DEFAULT_HUB_PORT)
406 self.parser.add_option('--hubusername',
407 dest='hubusername',
408 default=DEFAULT_HUB_USERNAME,
409 help='Username for zenhub login.'
410 ' Default is %s.' % DEFAULT_HUB_USERNAME)
411 self.parser.add_option('--hubpassword',
412 dest='hubpassword',
413 default=DEFAULT_HUB_PASSWORD,
414 help='Password for zenhub login.'
415 ' Default is %s.' % DEFAULT_HUB_PASSWORD)
416 self.parser.add_option('--monitor',
417 dest='monitor',
418 default=DEFAULT_HUB_MONITOR,
419 help='Name of monitor instance to use for'
420 ' configuration. Default is %s.'
421 % DEFAULT_HUB_MONITOR)
422 self.parser.add_option('--initialHubTimeout',
423 dest='hubtimeout',
424 type='int',
425 default=30,
426 help='Initial time to wait for a ZenHub '
427 'connection')
428 self.parser.add_option('--allowduplicateclears',
429 dest='allowduplicateclears',
430 default=False,
431 action='store_true',
432 help='Send clear events even when the most '
433 'recent event was also a clear event.')
434
435 self.parser.add_option('--eventflushseconds',
436 dest='eventflushseconds',
437 default=5.,
438 type='float',
439 help='Seconds between attempts to flush '
440 'events to ZenHub.')
441
442 self.parser.add_option('--eventflushchunksize',
443 dest='eventflushchunksize',
444 default=50,
445 type='int',
446 help='Number of events to send to ZenHub'
447 'at one time')
448
449
450 ZenDaemon.buildOptions(self)
451