1
2
3
4
5
6
7
8
9
10
11 __doc__ = """PBDaemon
12
13 Base for daemons that connect to zenhub
14
15 """
16
17 import cPickle as pickle
18 import collections
19 import sys
20 import time
21 import traceback
22
23 import Globals
24
25 from Products.ZenUtils.ZenDaemon import ZenDaemon
26 from Products.ZenEvents.ZenEventClasses import Heartbeat
27 from Products.ZenUtils.PBUtil import ReconnectingPBClientFactory
28 from Products.ZenUtils.DaemonStats import DaemonStats
29 from Products.ZenUtils.Utils import zenPath, atomicWrite
30 from Products.ZenUtils.Driver import drive
31 from Products.ZenEvents.ZenEventClasses import App_Start, App_Stop, \
32 Clear, Warning
33
34 from twisted.cred import credentials
35 from twisted.internet import reactor, defer
36 from twisted.internet.error import ConnectionLost, ReactorNotRunning
37 from twisted.spread import pb
38 from twisted.python.failure import Failure
39 import twisted.python.log
40
41 from ZODB.POSException import ConflictError
44 "Exception that can cross the PB barrier"
46 Exception.__init__(self, msg)
47 self.traceback = tb
49 return Exception.__str__(self) + self.traceback
50
51 pb.setUnjellyableForClass(RemoteException, RemoteException)
55 pb.setUnjellyableForClass(RemoteConflictError, RemoteConflictError)
59
61 """
62 Decorator function to wrap remote exceptions into something
63 understandable by our daemon.
64
65 @parameter callable: function to wrap
66 @type callable: function
67 @return: function's return or an exception
68 @rtype: various
69 """
70 def inner(*args, **kw):
71 """
72 Interior decorator
73 """
74 try:
75 return callable(*args, **kw)
76 except ConflictError, ex:
77 raise RemoteConflictError(
78 'Remote exception: %s: %s' % (ex.__class__, ex),
79 traceback.format_exc())
80 except Exception, ex:
81 raise RemoteException(
82 'Remote exception: %s: %s' % (ex.__class__, ex),
83 traceback.format_exc())
84 return inner
85
86
87 PB_PORT = 8789
88
89 startEvent = {
90 'eventClass': App_Start,
91 'summary': 'started',
92 'severity': Clear,
93 }
94
95 stopEvent = {
96 'eventClass':App_Stop,
97 'summary': 'stopped',
98 'severity': Warning,
99 }
100
101
102 DEFAULT_HUB_HOST = 'localhost'
103 DEFAULT_HUB_PORT = PB_PORT
104 DEFAULT_HUB_USERNAME = 'admin'
105 DEFAULT_HUB_PASSWORD = 'zenoss'
106 DEFAULT_HUB_MONITOR = 'localhost'
109
114
115 -class PBDaemon(ZenDaemon, pb.Referenceable):
116
117 name = 'pbdaemon'
118 initialServices = ['EventService']
119 heartbeatEvent = {'eventClass':Heartbeat}
120 heartbeatTimeout = 60*3
121 _customexitcode = 0
122 _sendingEvents = False
123
124 - def __init__(self, noopts=0, keeproot=False, name=None):
125
126
127 if name is not None:
128 self.name = name
129 self.mname = name
130
131 try:
132 ZenDaemon.__init__(self, noopts, keeproot)
133
134 except IOError:
135 import traceback
136 self.log.critical( traceback.format_exc( 0 ) )
137 sys.exit(1)
138
139 self.rrdStats = DaemonStats()
140 self.lastStats = 0
141 self.perspective = None
142 self.services = {}
143 self.eventQueue = []
144 self.startEvent = startEvent.copy()
145 self.stopEvent = stopEvent.copy()
146 details = dict(component=self.name, device=self.options.monitor)
147 for evt in self.startEvent, self.stopEvent, self.heartbeatEvent:
148 evt.update(details)
149 self.initialConnect = defer.Deferred()
150 self.stopped = False
151 self._eventStatus = {}
152 self._eventStatusCount = collections.defaultdict(int)
153 self.counters = collections.Counter()
154 self.loadCounters()
155 self._heartbeatEvent = None
156 self._performanceEventsQueue = None
157 self._pingedZenhub = None
158
160 """
161 Called when about to connect to zenhub
162 """
163 self.log.info("Attempting to connect to zenhub")
164
166 """
167 This gets called every time we reconnect.
168
169 @parameter perspective: Twisted perspective object
170 @type perspective: Twisted perspective object
171 """
172 self.log.info("Connected to ZenHub")
173 self.perspective = perspective
174 d2 = self.getInitialServices()
175 if self.initialConnect:
176 self.log.debug('Chaining getInitialServices with d2')
177 self.initialConnect, d = None, self.initialConnect
178 d2.chainDeferred(d)
179
180
198 reactor.callLater(self.options.hubtimeout, timeout, self.initialConnect)
199 return self.initialConnect
200
202 self.log.error('Timeout connecting to zenhub: is it running?')
203 pass
204
207
208
210 if not svcName in self.services:
211 self.log.warning('No service named %r: ZenHub may be disconnected' % svcName)
212 return self.services.get(svcName, None) or FakeRemote()
213
214
215 - def getService(self, serviceName, serviceListeningInterface=None):
216 """
217 Attempt to get a service from zenhub. Returns a deferred.
218 When service is retrieved it is stashed in self.services with
219 serviceName as the key. When getService is called it will first
220 check self.services and if serviceName is already there it will return
221 the entry from self.services wrapped in a defer.succeed
222 """
223 if serviceName in self.services:
224 return defer.succeed(self.services[serviceName])
225
226 def removeService(ignored):
227 self.log.debug('Removing service %s' % serviceName)
228 if serviceName in self.services:
229 del self.services[serviceName]
230
231 def callback(result, serviceName):
232 self.log.debug('Loaded service %s from zenhub' % serviceName)
233 self.services[serviceName] = result
234 result.notifyOnDisconnect(removeService)
235 return result
236
237 def errback(error, serviceName):
238 self.log.debug('errback after getting service %s' % serviceName)
239 self.log.error('Could not retrieve service %s' % serviceName)
240 if serviceName in self.services:
241 del self.services[serviceName]
242 return error
243
244 d = self.perspective.callRemote('getService',
245 serviceName,
246 self.options.monitor,
247 serviceListeningInterface or self)
248 d.addCallback(callback, serviceName)
249 d.addErrback(errback, serviceName)
250 return d
251
253 """
254 After connecting to zenhub, gather our initial list of services.
255 """
256 def errback(error):
257 if isinstance(error, Failure):
258 self.log.critical( "Invalid monitor: %s" % self.options.monitor)
259 reactor.stop()
260 return defer.fail(RemoteBadMonitor(
261 "Invalid monitor: %s" % self.options.monitor, ''))
262 return error
263
264 self.log.debug('Setting up initial services: %s' % \
265 ', '.join(self.initialServices))
266 d = defer.DeferredList(
267 [self.getService(name) for name in self.initialServices],
268 fireOnOneErrback=True, consumeErrors=True)
269 d.addErrback(errback)
270 return d
271
272
275
286 d.addCallback(callback)
287 d.addErrback(twisted.python.log.err)
288 reactor.run()
289 if self._customexitcode:
290 sys.exit(self._customexitcode)
291
292 - def sigTerm(self, signum=None, frame=None):
293 try:
294 ZenDaemon.sigTerm(self, signum, frame)
295 except SystemExit:
296 pass
297
300
301 - def stop(self, ignored=''):
302 def stopNow(ignored):
303 if reactor.running:
304 try:
305 self.saveCounters()
306 reactor.stop()
307 except ReactorNotRunning:
308 self.log.debug("Tried to stop reactor that was stopped")
309 if reactor.running and not self.stopped:
310 self.stopped = True
311 if 'EventService' in self.services:
312
313
314 if not hasattr(self.options, 'cycle') or \
315 getattr(self.options, 'cycle', True):
316 self.sendEvent(self.stopEvent)
317
318 drive(self.pushEvents).addBoth(stopNow)
319 self.log.debug( "Sent a 'stop' event" )
320 else:
321 self.log.debug( "No event sent as no EventService available." )
322
323 reactor.callLater(1, stopNow, True)
324 else:
325 self.log.debug( "stop() called when not running" )
326
329
331 ''' Add event to queue of events to be sent. If we have an event
332 service then process the queue.
333 '''
334 generatedEvent = self.generateEvent(event, **kw)
335 if generatedEvent:
336 self.eventQueue.append(generatedEvent)
337 self.counters['eventCount'] += 1
338 self.log.debug("Queued event (total of %d) %r",
339 len(self.eventQueue),
340 event)
341
342
343 self._trimEventQueue(maxOver=self.options.eventflushchunksize)
344
346 ''' Add event to queue of events to be sent. If we have an event
347 service then process the queue.
348 '''
349 if not reactor.running: return
350 event = event.copy()
351 event['agent'] = self.name
352 event['monitor'] = self.options.monitor
353 event['manager'] = self.fqdn
354 event.update(kw)
355 if not self.options.allowduplicateclears or self.options.duplicateclearinterval > 0:
356 statusKey = ( event['device'],
357 event.get('component', ''),
358 event.get('eventKey', ''),
359 event.get('eventClass', '') )
360 severity = event.get('severity', -1)
361 status = self._eventStatus.get(statusKey, -1)
362 if severity != -1:
363 if severity != status:
364 self._eventStatusCount[statusKey] = 0
365 else:
366 self._eventStatusCount[statusKey] += 1
367 self._eventStatus[statusKey] = severity
368 if severity == Clear and status == Clear:
369 if not self.options.allowduplicateclears:
370 self.log.debug("allowduplicateclears dropping useless clear event %r", event)
371 return
372 if self.options.duplicateclearinterval > 0 \
373 and self._eventStatusCount[statusKey] % self.options.duplicateclearinterval != 0:
374 self.log.debug("duplicateclearinterval dropping useless clear event %r", event)
375 return
376 return event
377
379 queueLen = len(self.eventQueue)
380 if queueLen > (self.options.maxqueuelen + maxOver):
381 diff = queueLen - self.options.maxqueuelen
382 self.log.error(
383 'Discarding oldest %d events because maxqueuelen was '
384 'exceeded: %d/%d',
385 queueLen - self.options.maxqueuelen,
386 queueLen, self.options.maxqueuelen)
387 self.counters['discardedEvents'] += diff
388 self.eventQueue = self.eventQueue[diff:]
389
390 @property
395
401
415
417 """Flush events to ZenHub.
418 """
419
420 if not reactor.running:
421 return
422 if self._sendingEvents:
423 return
424 try:
425
426 self._sendingEvents = True
427 while len(self.eventQueue) or self._heartbeatEvent or len(self._performanceEvents):
428
429
430 evtSvc = self.services.get('EventService', None)
431 if not evtSvc:
432 self.log.error("No event service: %r", evtSvc)
433 break
434
435
436 chunkSize = self.options.eventflushchunksize
437 events = self.eventQueue[:chunkSize]
438 self.eventQueue = self.eventQueue[chunkSize:]
439
440 performanceEvents = self._getPerformanceEventsChunk()
441
442
443 heartBeat = [self._heartbeatEvent] if self._heartbeatEvent else []
444
445 self.log.debug("Sending %d events, %d perfevents, %d heartbeats.", len(events), len(performanceEvents), len(heartBeat))
446 yield evtSvc.callRemote('sendEvents', events + heartBeat + performanceEvents)
447 try:
448 driver.next()
449 performanceEvents = []
450 events = []
451 except ConnectionLost, ex:
452 self.log.error('Error sending event: %s' % ex)
453 self.eventQueue = events + self.eventQueue
454 performanceEvents.reverse()
455 self._performanceEvents.extend(performanceEvents)
456 break
457 self.log.debug("Events sent")
458 self._heartbeatEvent = None
459 except Exception, ex:
460 self.log.exception(ex)
461 finally:
462 self._sendingEvents = False
463
465 'if cycling, send a heartbeat, else, shutdown'
466 if not self.options.cycle:
467 self.stop()
468 return
469 self._heartbeatEvent = self.generateEvent(self.heartbeatEvent, timeout=self.heartbeatTimeout)
470
471 self.niceDoggie(self.heartbeatTimeout / 3)
472
473 events = []
474
475 for name, value in self.counters.items():
476 self.log.info("Counter %s, value %d", name, value)
477 events += self.rrdStats.counter(name, 300, value)
478 self.sendEvents(events)
479
480
481 self.saveCounters()
482
484 atomicWrite(
485 zenPath('var/%s_counters.pickle' % self.name),
486 pickle.dumps(self.counters),
487 raiseException=False,
488 )
489
491 try:
492 self.counters = pickle.load(open(zenPath('var/%s_counters.pickle'% self.name)))
493 except Exception:
494 pass
495
498
499
503
504
507
508
509 @translateError
518
519
521 self.parser.add_option('--hubhost',
522 dest='hubhost',
523 default=DEFAULT_HUB_HOST,
524 help='Host of zenhub daemon.'
525 ' Default is %s.' % DEFAULT_HUB_HOST)
526 self.parser.add_option('--hubport',
527 dest='hubport',
528 type='int',
529 default=DEFAULT_HUB_PORT,
530 help='Port zenhub listens on.'
531 'Default is %s.' % DEFAULT_HUB_PORT)
532 self.parser.add_option('--hubusername',
533 dest='hubusername',
534 default=DEFAULT_HUB_USERNAME,
535 help='Username for zenhub login.'
536 ' Default is %s.' % DEFAULT_HUB_USERNAME)
537 self.parser.add_option('--hubpassword',
538 dest='hubpassword',
539 default=DEFAULT_HUB_PASSWORD,
540 help='Password for zenhub login.'
541 ' Default is %s.' % DEFAULT_HUB_PASSWORD)
542 self.parser.add_option('--monitor',
543 dest='monitor',
544 default=DEFAULT_HUB_MONITOR,
545 help='Name of monitor instance to use for'
546 ' configuration. Default is %s.'
547 % DEFAULT_HUB_MONITOR)
548 self.parser.add_option('--initialHubTimeout',
549 dest='hubtimeout',
550 type='int',
551 default=30,
552 help='Initial time to wait for a ZenHub '
553 'connection')
554 self.parser.add_option('--allowduplicateclears',
555 dest='allowduplicateclears',
556 default=False,
557 action='store_true',
558 help='Send clear events even when the most '
559 'recent event was also a clear event.')
560
561 self.parser.add_option('--duplicateclearinterval',
562 dest='duplicateclearinterval',
563 default=0,
564 type='int',
565 help=('Send a clear event every [DUPLICATECLEARINTEVAL] '
566 'events.')
567 )
568
569 self.parser.add_option('--eventflushseconds',
570 dest='eventflushseconds',
571 default=5.,
572 type='float',
573 help='Seconds between attempts to flush '
574 'events to ZenHub.')
575
576 self.parser.add_option('--eventflushchunksize',
577 dest='eventflushchunksize',
578 default=50,
579 type='int',
580 help='Number of events to send to ZenHub'
581 'at one time')
582
583 self.parser.add_option('--maxqueuelen',
584 dest='maxqueuelen',
585 default=5000,
586 type='int',
587 help='Maximum number of events to queue')
588
589 self.parser.add_option('--zenhubpinginterval',
590 dest='zhPingInterval',
591 default=30,
592 type='int',
593 help='How often to ping zenhub')
594
595 ZenDaemon.buildOptions(self)
596