Package ZenEvents :: Module EventServer
[hide private]
[frames] | no frames]

Source Code for Module ZenEvents.EventServer

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13  #! /usr/bin/env python  
 14   
 15  __doc__='''EventServer 
 16   
 17  Base class for ZenXEvent and ZenTrap 
 18   
 19  $Id$ 
 20  ''' 
 21   
 22  __version__ = "$Revision$"[11:-2] 
 23   
 24  from twisted.python import threadable 
 25  threadable.init() 
 26   
 27  from Queue import Queue 
 28  from threading import Lock 
 29  import time 
 30  import socket 
 31   
 32  import Globals 
 33   
 34  from Products.ZenUtils.ZCmdBase import ZCmdBase 
 35   
 36  from Event import Event, EventHeartbeat 
 37   
 38  from ZenEventClasses import App_Start, App_Stop 
 39  from twisted.internet import reactor, defer 
 40   
41 -class Stats:
42 totalTime = 0. 43 totalEvents = 0 44 maxTime = 0. 45
46 - def __init__(self):
47 self.lock = Lock()
48
49 - def add(self, moreTime):
50 self.lock.acquire() 51 self.totalEvents += 1 52 self.totalTime += moreTime 53 self.maxTime = max(self.maxTime, moreTime) 54 self.lock.release()
55
56 - def report(self):
57 try: 58 self.lock.acquire() 59 return self.totalTime, self.totalEvents, self.maxTime 60 finally: 61 self.lock.release()
62
63 -class EventServer(ZCmdBase):
64 'Base class for a daemon whose primary job is to post events' 65 66 name = 'EventServer' 67
68 - def __init__(self):
69 ZCmdBase.__init__(self, keeproot=True) 70 self.stats = Stats() 71 self.zem = self.dmd.ZenEventManager 72 self.myfqdn = socket.getfqdn() 73 self.sendEvent(Event(device=self.myfqdn, 74 eventClass=App_Start, 75 summary="%s started" % self.name, 76 severity=0, 77 component=self.name)) 78 self.q = Queue() 79 self.log.info("started") 80 self.heartbeat() 81 self.reportCycle()
82
83 - def useUdpFileDescriptor(self, fd):
84 from twisted.internet import udp 85 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_DGRAM) 86 import os 87 os.close(fd) 88 port = s.getsockname()[1] 89 transport = udp.Port(port, self) 90 s.setblocking(0) 91 transport.socket = s 92 transport.fileno = s.fileno 93 transport.connected = 1 94 transport._realPortNumber = port 95 self.transport = transport 96 # hack around startListening not being called 97 self.numPorts = 1 98 transport.startReading()
99
100 - def useTcpFileDescriptor(self, fd, factory):
101 import os, socket 102 for i in range(19800, 19999): 103 try: 104 p = reactor.listenTCP(i, factory) 105 os.dup2(fd, p.socket.fileno()) 106 p.socket.listen(p.backlog) 107 p.socket.setblocking(False) 108 p.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 109 os.close(fd) 110 return p 111 except socket.error: 112 pass 113 raise socket.error("Unable to find an open socket to listen on")
114
115 - def reportCycle(self):
116 if self.options.statcycle: 117 self.report() 118 reactor.callLater(self.options.statcycle, self.reportCycle)
119
120 - def run(self):
121 'method to process events in a thread' 122 try: 123 while 1: 124 args = self.q.get() 125 if args is None: 126 break 127 try: 128 if isinstance(args, Event): 129 self.sendEvent(args) 130 else: 131 self.doHandleRequest(*args) 132 diff = time.time() - args[-1] 133 self.stats.add(diff) 134 except Exception, ex: 135 self.log.exception(ex) 136 self.syncdb() 137 finally: 138 if reactor.running: 139 reactor.stop()
140 141
142 - def sendEvent(self, evt):
143 "wrapper for sending an event" 144 self.zem._p_jar.sync() 145 if type(evt) == dict: 146 evt['manager'] = self.myfqdn 147 else: 148 evt.manager = self.myfqdn 149 self.zem.sendEvent(evt)
150 151
152 - def sendEvents(self, evts):
153 """Send multiple events to database syncing only one time. 154 """ 155 self.zem._p_jar.sync() 156 for e in evts: 157 self.zem.sendEvent(e)
158 159
160 - def heartbeat(self):
161 """Since we don't do anything on a regular basis, just 162 push heartbeats regularly""" 163 seconds = 60 164 evt = EventHeartbeat(self.myfqdn, self.name, 3*seconds) 165 self.q.put(evt) 166 reactor.callLater(seconds, self.heartbeat)
167 168
169 - def sigTerm(self, signum, frame):
170 'controlled shutdown of main loop on interrupt' 171 try: 172 ZCmdBase.sigTerm(self, signum, frame) 173 except SystemExit: 174 if reactor.running: 175 reactor.stop()
176
177 - def report(self):
178 'report some simple diagnostics at shutdown' 179 totalTime, totalEvents, maxTime = self.stats.report() 180 self.log.info("%d events processed in %.2f seconds", 181 totalEvents, 182 totalTime) 183 if totalEvents > 0: 184 self.log.info("%.5f average seconds per event", 185 (totalTime / totalEvents)) 186 self.log.info("Maximum processing time for one event was %.5f", 187 maxTime)
188
189 - def finish(self):
190 'things to do at shutdown: thread cleanup, logs and events' 191 self.q.put(None) 192 self.report() 193 self.sendEvent(Event(device=self.myfqdn, 194 eventClass=App_Stop, 195 summary="%s stopped" % self.name, 196 severity=4, 197 component=self.name))
198
199 - def buildOptions(self):
200 ZCmdBase.buildOptions(self) 201 self.parser.add_option('--statcycle', 202 dest='statcycle', 203 type='int', 204 help='Number of seconds between the writing of statistics', 205 default=0)
206 209
210 - def main(self):
211 reactor.callInThread(self.run) 212 reactor.addSystemEventTrigger('before', 'shutdown', self.finish) 213 self._wakeUpReactorAndHandleSignals() 214 reactor.run(installSignalHandlers=False)
215