1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """PBDaemon
15
16 Base for daemons that connect to zenhub
17
18 """
19
20 import sys
21 import traceback
22
23 import Globals
24
25 from Products.ZenUtils.ZenDaemon import ZenDaemon
26 from Products.ZenEvents.ZenEventClasses import Heartbeat
27 from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory
28 from Products.ZenUtils.DaemonStats import DaemonStats
29 from Products.ZenUtils.Driver import drive
30 from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \
31 Clear, Warning
32
33 from twisted.cred import credentials
34 from twisted.internet import reactor, defer
35 from twisted.internet.error import ConnectionLost
36 from twisted.spread import pb
37 from twisted.python.failure import Failure
38
39 from ZODB.POSException import ConflictError
40
42 "Exception that can cross the PB barrier"
44 Exception.__init__(self, msg)
45 self.traceback = tb
47 return Exception.__str__(self) + self.traceback
48
49 pb.setUnjellyableForClass(RemoteException, RemoteException)
50
51
53 pb.setUnjellyableForClass(RemoteConflictError, RemoteConflictError)
54
55
57
59 """
60 Decorator function to wrap remote exceptions into something
61 understandable by our daemon.
62
63 @parameter callable: function to wrap
64 @type callable: function
65 @return: function's return or an exception
66 @rtype: various
67 """
68 def inner(*args, **kw):
69 """
70 Interior decorator
71 """
72 try:
73 return callable(*args, **kw)
74 except ConflictError, ex:
75 raise RemoteConflictError(
76 'Remote exception: %s: %s' % (ex.__class__, ex),
77 traceback.format_exc())
78 except Exception, ex:
79 raise RemoteException(
80 'Remote exception: %s: %s' % (ex.__class__, ex),
81 traceback.format_exc())
82 return inner
83
84
85 PB_PORT = 8789
86
87 startEvent = {
88 'eventClass': App_Start,
89 'summary': 'started',
90 'severity': Clear,
91 }
92
93 stopEvent = {
94 'eventClass':App_Stop,
95 'summary': 'stopped',
96 'severity': Warning,
97 }
98
99
100 DEFAULT_HUB_HOST = 'localhost'
101 DEFAULT_HUB_PORT = PB_PORT
102 DEFAULT_HUB_USERNAME = 'admin'
103 DEFAULT_HUB_PASSWORD = 'zenoss'
104 DEFAULT_HUB_MONITOR = 'localhost'
105
107
111
112 -class PBDaemon(ZenDaemon, pb.Referenceable):
113
114 name = 'pbdaemon'
115 initialServices = ['EventService']
116 heartbeatEvent = {'eventClass':Heartbeat}
117 heartbeatTimeout = 60*3
118 _customexitcode = 0
119 _sendingEvents = False
120
121 - def __init__(self, noopts=0, keeproot=False):
142
144 """
145 This gets called every time we reconnect.
146
147 @parameter perspective: Twisted perspective object
148 @type perspective: Twisted perspective object
149 """
150 self.log.info("Connected to ZenHub")
151 self.perspective = perspective
152 d2 = self.getInitialServices()
153 if self.initialConnect:
154 self.log.debug('Chaining getInitialServices with d2')
155 self.initialConnect, d = None, self.initialConnect
156 d2.chainDeferred(d)
157
158
173 reactor.callLater(self.options.hubtimeout, timeout, self.initialConnect)
174 return self.initialConnect
175
176
179
180
185
186
187 - def getService(self, serviceName, serviceListeningInterface=None):
188 """
189 Attempt to get a service from zenhub. Returns a deferred.
190 When service is retrieved it is stashed in self.services with
191 serviceName as the key. When getService is called it will first
192 check self.services and if serviceName is already there it will return
193 the entry from self.services wrapped in a defer.succeed
194 """
195 if self.services.has_key(serviceName):
196 return defer.succeed(self.services[serviceName])
197
198 def removeService(ignored):
199 self.log.debug('Removing service %s' % serviceName)
200 if serviceName in self.services:
201 del self.services[serviceName]
202
203 def callback(result, serviceName):
204 self.log.debug('Loaded service %s from zenhub' % serviceName)
205 self.services[serviceName] = result
206 result.notifyOnDisconnect(removeService)
207 return result
208
209 def errback(error, serviceName):
210 self.log.debug('errback after getting service %s' % serviceName)
211 self.log.error('Could not retrieve service %s' % serviceName)
212 if serviceName in self.services:
213 del self.services[serviceName]
214 return error
215
216 d = self.perspective.callRemote('getService',
217 serviceName,
218 self.options.monitor,
219 serviceListeningInterface or self)
220 d.addCallback(callback, serviceName)
221 d.addErrback(errback, serviceName)
222 return d
223
225 """
226 After connecting to zenhub, gather our initial list of services.
227 """
228 def errback(error):
229 if isinstance(error, Failure):
230 self.log.critical( "Invalid monitor: %s" % self.options.monitor)
231 reactor.stop()
232 return defer.fail(RemoteBadMonitor(
233 "Invalid monitor: %s" % self.options.monitor))
234 return error
235
236 self.log.debug('Setting up initial services: %s' % \
237 ', '.join(self.initialServices))
238 d = defer.DeferredList(
239 [self.getService(name) for name in self.initialServices],
240 fireOnOneErrback=True, consumeErrors=True)
241 d.addErrback(errback)
242 return d
243
244
247
257 d.addCallback(callback)
258 reactor.run()
259 self.log.info('%s shutting down' % self.name)
260 if self._customexitcode:
261 sys.exit(self._customexitcode)
262
263 - def sigTerm(self, signum=None, frame=None):
264 try:
265 ZenDaemon.sigTerm(self, signum, frame)
266 except SystemExit:
267 pass
268
271
272 - def stop(self, ignored=''):
273 def stopNow(ignored):
274 if reactor.running:
275 reactor.stop()
276 if reactor.running and not self.stopped:
277 self.stopped = True
278 if 'EventService' in self.services:
279
280
281 if not hasattr(self.options, 'cycle') or \
282 getattr(self.options, 'cycle', True):
283 self.sendEvent(self.stopEvent)
284
285 drive(self.pushEvents).addBoth(stopNow)
286
287 reactor.callLater(1, stopNow, True)
288 self.log.debug( "Sent a 'stop' event" )
289 else:
290 self.log.debug( "No event sent as no EventService available." )
291 else:
292 self.log.debug( "stop() called when not running" )
293
296
298 ''' Add event to queue of events to be sent. If we have an event
299 service then process the queue.
300 '''
301 if not reactor.running: return
302 event = event.copy()
303 event['agent'] = self.name
304 event['manager'] = self.options.monitor
305 event.update(kw)
306 if not self.options.allowduplicateclears:
307 statusKey = ( event['device'],
308 event.get('component', None),
309 event.get('eventKey', None),
310 event.get('eventClass', None) )
311 severity = event.get('severity', None)
312 status = self._eventStatus.get(statusKey, None)
313 self._eventStatus[statusKey] = severity
314 if severity == Clear and status == Clear:
315 self.log.debug("Dropping useless clear event %r", event)
316 return
317 self.log.debug("Queueing event %r", event)
318 self.eventQueue.append(event)
319 self.log.debug("Total of %d queued events" % len(self.eventQueue))
320
326
328 """Flush events to ZenHub.
329 """
330 try:
331
332 queueLen = len(self.eventQueue)
333 if queueLen > self.options.maxqueuelen:
334 self.log.warn('Queue exceeded maximum length: %d/%d. Trimming',
335 queueLen, self.options.maxqueuelen)
336 diff = queueLen - self.options.maxqueuelen
337 self.eventQueue = self.eventQueue[diff:]
338
339
340 if not reactor.running:
341 return
342 if self._sendingEvents:
343 return
344
345 self._sendingEvents = True
346 while self.eventQueue:
347
348 evtSvc = self.services.get('EventService', None)
349 if not evtSvc: break
350
351
352 chunkSize = self.options.eventflushchunksize
353 events = self.eventQueue[:chunkSize]
354 self.eventQueue = self.eventQueue[chunkSize:]
355
356 yield evtSvc.callRemote('sendEvents', events)
357 try:
358 driver.next()
359 except ConnectionLost, ex:
360 self.log.error('Error sending event: %s' % ex)
361 self.eventQueue = events + self.eventQueue
362 break
363 self._sendingEvents = False
364 except Exception, ex:
365 self._sendingEvents = False
366 self.log.exception(ex)
367
376
377
380
381
385
386
389
390
391 @translateError
393 from Products.ZenUtils.Utils import importClass
394 self.log.debug("Loading classes %s", classes)
395 for c in classes:
396 try:
397 importClass(c)
398 except ImportError:
399 self.log.exception("Unable to import class %s", c)
400
401
403 self.parser.add_option('--hubhost',
404 dest='hubhost',
405 default=DEFAULT_HUB_HOST,
406 help='Host of zenhub daemon.'
407 ' Default is %s.' % DEFAULT_HUB_HOST)
408 self.parser.add_option('--hubport',
409 dest='hubport',
410 type='int',
411 default=DEFAULT_HUB_PORT,
412 help='Port zenhub listens on.'
413 'Default is %s.' % DEFAULT_HUB_PORT)
414 self.parser.add_option('--hubusername',
415 dest='hubusername',
416 default=DEFAULT_HUB_USERNAME,
417 help='Username for zenhub login.'
418 ' Default is %s.' % DEFAULT_HUB_USERNAME)
419 self.parser.add_option('--hubpassword',
420 dest='hubpassword',
421 default=DEFAULT_HUB_PASSWORD,
422 help='Password for zenhub login.'
423 ' Default is %s.' % DEFAULT_HUB_PASSWORD)
424 self.parser.add_option('--monitor',
425 dest='monitor',
426 default=DEFAULT_HUB_MONITOR,
427 help='Name of monitor instance to use for'
428 ' configuration. Default is %s.'
429 % DEFAULT_HUB_MONITOR)
430 self.parser.add_option('--initialHubTimeout',
431 dest='hubtimeout',
432 type='int',
433 default=30,
434 help='Initial time to wait for a ZenHub '
435 'connection')
436 self.parser.add_option('--allowduplicateclears',
437 dest='allowduplicateclears',
438 default=False,
439 action='store_true',
440 help='Send clear events even when the most '
441 'recent event was also a clear event.')
442
443 self.parser.add_option('--eventflushseconds',
444 dest='eventflushseconds',
445 default=5.,
446 type='float',
447 help='Seconds between attempts to flush '
448 'events to ZenHub.')
449
450 self.parser.add_option('--eventflushchunksize',
451 dest='eventflushchunksize',
452 default=50,
453 type='int',
454 help='Number of events to send to ZenHub'
455 'at one time')
456
457 self.parser.add_option('--maxqueuelen',
458 dest='maxqueuelen',
459 default=5000,
460 type='int',
461 help='Maximum number of events to queue')
462
463
464 ZenDaemon.buildOptions(self)
465