Package Products :: Package ZenEvents :: Module EventServer
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.EventServer

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2007, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  __doc__='''EventServer 
 12   
 13  Base class for ZenSyslog, ZenTrap and others 
 14   
 15  $Id$ 
 16  ''' 
 17   
 18  __version__ = "$Revision$"[11:-2] 
 19   
 20  import socket 
 21   
 22  import Globals 
 23   
 24  from Products.ZenHub.PBDaemon import PBDaemon, FakeRemote 
 25  from Products.ZenUtils.DaemonStats import DaemonStats 
 26  from Products.ZenUtils.Driver import drive 
 27   
 28  from Products.ZenEvents.ZenEventClasses import App_Start 
 29  from twisted.internet import reactor 
 30   
 31  import time 
 32   
33 -class Stats:
34 totalTime = 0. 35 totalEvents = 0 36 maxTime = 0. 37
38 - def add(self, moreTime):
39 self.totalEvents += 1 40 self.totalTime += moreTime 41 self.maxTime = max(self.maxTime, moreTime)
42
43 - def report(self):
44 return self.totalTime, self.totalEvents, self.maxTime
45
46 -class EventServer(PBDaemon):
47 'Base class for a daemon whose primary job is to post events' 48 49 name = 'EventServer' 50
51 - def __init__(self):
52 PBDaemon.__init__(self, keeproot=True) 53 self.stats = Stats() 54 self.rrdStats = DaemonStats()
55 56
57 - def connected(self):
58 self.sendEvent(dict(device=self.options.monitor, 59 eventClass=App_Start, 60 summary="%s started" % self.name, 61 severity=0, 62 component=self.name)) 63 self.log.info("started") 64 self.configure()
65 66
67 - def model(self):
68 return self.services.get('EventService', FakeRemote())
69 70
71 - def configure(self):
72 def inner(driver): 73 self.log.info("fetching default RRDCreateCommand") 74 yield self.model().callRemote('getDefaultRRDCreateCommand') 75 createCommand = driver.next() 76 77 self.log.info("getting threshold classes") 78 yield self.model().callRemote('getThresholdClasses') 79 self.remote_updateThresholdClasses(driver.next()) 80 81 self.log.info("getting collector thresholds") 82 yield self.model().callRemote('getCollectorThresholds') 83 self.rrdStats.config(self.options.monitor, self.name, 84 driver.next(), createCommand) 85 self.heartbeat() 86 self.reportCycle()
87 d = drive(inner) 88 def error(result): 89 self.log.error("Unexpected error in configure: %s" % result)
90 d.addErrback(error) 91 return d 92 93
94 - def sendEvent(self, event, **kw):
95 # FIXME: get real event processing stats 96 if 'firstTime' in event: 97 self.stats.add(min(time.time() - event['firstTime'], 0)) 98 PBDaemon.sendEvent(self, event, **kw)
99 100
101 - def useUdpFileDescriptor(self, fd):
102 from twisted.internet import udp 103 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_DGRAM) 104 import os 105 os.close(fd) 106 port = s.getsockname()[1] 107 transport = udp.Port(port, self) 108 s.setblocking(0) 109 transport.socket = s 110 transport.fileno = s.fileno 111 transport.connected = 1 112 transport._realPortNumber = port 113 self.transport = transport 114 # hack around startListening not being called 115 self.numPorts = 1 116 transport.startReading()
117 118
119 - def useTcpFileDescriptor(self, fd, factory):
120 import os, socket 121 for i in range(19800, 19999): 122 try: 123 p = reactor.listenTCP(i, factory) 124 os.dup2(fd, p.socket.fileno()) 125 p.socket.listen(p.backlog) 126 p.socket.setblocking(False) 127 p.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 128 os.close(fd) 129 return p 130 except socket.error: 131 pass 132 raise socket.error("Unable to find an open socket to listen on")
133 134
135 - def reportCycle(self):
136 if self.options.statcycle: 137 self.report() 138 reactor.callLater(self.options.statcycle, self.reportCycle)
139 140
141 - def heartbeat(self):
142 """Since we don't do anything on a regular basis, just 143 push heartbeats regularly""" 144 seconds = self.heartbeatTimeout / 3 145 reactor.callLater(self.heartbeatTimeout / 3, self.heartbeat) 146 PBDaemon.heartbeat(self) 147 totalTime, totalEvents, maxTime = self.stats.report() 148 for ev in (self.rrdStats.counter('events', 149 seconds, 150 totalEvents) + 151 self.rrdStats.counter('totalTime', 152 seconds, 153 int(totalTime * 1000))): 154 self.sendEvent(ev)
155 156
157 - def report(self):
158 'report some simple diagnostics at shutdown' 159 totalTime, totalEvents, maxTime = self.stats.report() 160 self.log.info("%d events processed in %.2f seconds", 161 totalEvents, 162 totalTime) 163 if totalEvents > 0: 164 self.log.info("%.5f average seconds per event", 165 (totalTime / totalEvents)) 166 self.log.info("Maximum processing time for one event was %.5f", 167 maxTime)
168 169
170 - def buildOptions(self):
171 PBDaemon.buildOptions(self) 172 self.parser.add_option('--statcycle', 173 dest='statcycle', 174 type='int', 175 help='Number of seconds between the writing of statistics', 176 default=0)
177