Package ZenEvents :: Module EventServer
[hide private]
[frames] | no frames]

Source Code for Module ZenEvents.EventServer

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__='''EventServer 
 15   
 16  Base class for ZenSyslog, ZenTrap and others 
 17   
 18  $Id$ 
 19  ''' 
 20   
 21  __version__ = "$Revision$"[11:-2] 
 22   
 23  import socket 
 24   
 25  import Globals 
 26   
 27  from Products.ZenHub.PBDaemon import PBDaemon, FakeRemote 
 28  from Products.ZenUtils.DaemonStats import DaemonStats 
 29  from Products.ZenUtils.Driver import drive 
 30   
 31  from Products.ZenEvents.ZenEventClasses import App_Start 
 32  from twisted.internet import reactor 
 33   
 34  import time 
 35   
36 -class Stats:
37 totalTime = 0. 38 totalEvents = 0 39 maxTime = 0. 40
41 - def add(self, moreTime):
42 self.totalEvents += 1 43 self.totalTime += moreTime 44 self.maxTime = max(self.maxTime, moreTime)
45
46 - def report(self):
47 return self.totalTime, self.totalEvents, self.maxTime
48
49 -class EventServer(PBDaemon):
50 'Base class for a daemon whose primary job is to post events' 51 52 name = 'EventServer' 53
54 - def __init__(self):
55 PBDaemon.__init__(self, keeproot=True) 56 self.stats = Stats() 57 self.rrdStats = DaemonStats()
58 59
60 - def connected(self):
61 self.sendEvent(dict(device=self.options.monitor, 62 eventClass=App_Start, 63 summary="%s started" % self.name, 64 severity=0, 65 component=self.name)) 66 self.log.info("started") 67 self.configure()
68 69
70 - def model(self):
71 return self.services.get('EventService', FakeRemote())
72 73
74 - def configure(self):
75 def inner(driver): 76 self.log.info("fetching default RRDCreateCommand") 77 yield self.model().callRemote('getDefaultRRDCreateCommand') 78 createCommand = driver.next() 79 80 self.log.info("getting threshold classes") 81 yield self.model().callRemote('getThresholdClasses') 82 self.remote_updateThresholdClasses(driver.next()) 83 84 self.log.info("getting collector thresholds") 85 yield self.model().callRemote('getCollectorThresholds') 86 self.rrdStats.config(self.options.monitor, self.name, 87 driver.next(), createCommand) 88 self.heartbeat() 89 self.reportCycle()
90 d = drive(inner) 91 def error(result): 92 self.log.error("Unexpected error in configure: %s" % result)
93 d.addErrback(error) 94 return d 95 96
97 - def sendEvent(self, event, **kw):
98 # FIXME: get real event processing stats 99 if event.has_key('firstTime'): 100 self.stats.add(min(time.time() - event['firstTime'], 0)) 101 PBDaemon.sendEvent(self, event, **kw)
102 103
104 - def useUdpFileDescriptor(self, fd):
105 from twisted.internet import udp 106 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_DGRAM) 107 import os 108 os.close(fd) 109 port = s.getsockname()[1] 110 transport = udp.Port(port, self) 111 s.setblocking(0) 112 transport.socket = s 113 transport.fileno = s.fileno 114 transport.connected = 1 115 transport._realPortNumber = port 116 self.transport = transport 117 # hack around startListening not being called 118 self.numPorts = 1 119 transport.startReading()
120 121
122 - def useTcpFileDescriptor(self, fd, factory):
123 import os, socket 124 for i in range(19800, 19999): 125 try: 126 p = reactor.listenTCP(i, factory) 127 os.dup2(fd, p.socket.fileno()) 128 p.socket.listen(p.backlog) 129 p.socket.setblocking(False) 130 p.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 131 os.close(fd) 132 return p 133 except socket.error: 134 pass 135 raise socket.error("Unable to find an open socket to listen on")
136 137
138 - def reportCycle(self):
139 if self.options.statcycle: 140 self.report() 141 reactor.callLater(self.options.statcycle, self.reportCycle)
142 143
144 - def heartbeat(self):
145 """Since we don't do anything on a regular basis, just 146 push heartbeats regularly""" 147 seconds = self.heartbeatTimeout / 3 148 reactor.callLater(self.heartbeatTimeout / 3, self.heartbeat) 149 PBDaemon.heartbeat(self) 150 totalTime, totalEvents, maxTime = self.stats.report() 151 for ev in (self.rrdStats.counter('events', 152 seconds, 153 totalEvents) + 154 self.rrdStats.counter('totalTime', 155 seconds, 156 int(totalTime * 1000))): 157 self.sendEvent(ev)
158 159
160 - def report(self):
161 'report some simple diagnostics at shutdown' 162 totalTime, totalEvents, maxTime = self.stats.report() 163 self.log.info("%d events processed in %.2f seconds", 164 totalEvents, 165 totalTime) 166 if totalEvents > 0: 167 self.log.info("%.5f average seconds per event", 168 (totalTime / totalEvents)) 169 self.log.info("Maximum processing time for one event was %.5f", 170 maxTime)
171 172
173 - def buildOptions(self):
174 PBDaemon.buildOptions(self) 175 self.parser.add_option('--statcycle', 176 dest='statcycle', 177 type='int', 178 help='Number of seconds between the writing of statistics', 179 default=0)
180