Package Products :: Package ZenEvents :: Module zensyslog
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zensyslog

  1  #! /usr/bin/env python 
  2  ############################################################################## 
  3  #  
  4  # Copyright (C) Zenoss, Inc. 2008, 2011, all rights reserved. 
  5  #  
  6  # This content is made available according to terms specified in 
  7  # License.zenoss under the directory where your Zenoss product is installed. 
  8  #  
  9  ############################################################################## 
 10   
 11   
 12  __doc__ = """zensyslog 
 13   
 14  Turn syslog messages into events. 
 15   
 16  """ 
 17   
 18  import time 
 19  import socket 
 20  import os 
 21  import logging 
 22   
 23  from twisted.internet.protocol import DatagramProtocol 
 24  from twisted.internet import reactor, defer, udp 
 25  from twisted.python import failure 
 26   
 27  import Globals 
 28  import zope.interface 
 29  import zope.component 
 30   
 31   
 32  from Products.ZenCollector.daemon import CollectorDaemon 
 33  from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\ 
 34                                               IEventService, \ 
 35                                               IScheduledTask 
 36  from Products.ZenCollector.tasks import SimpleTaskFactory,\ 
 37                                          SimpleTaskSplitter,\ 
 38                                          BaseTask, TaskStates 
 39  from Products.ZenUtils.observable import ObservableMixin 
 40   
 41  from Products.ZenEvents.SyslogProcessing import SyslogProcessor 
 42   
 43  from Products.ZenUtils.Utils import zenPath 
 44  from Products.ZenUtils.IpUtil import asyncNameLookup 
 45   
 46  from Products.ZenEvents.EventServer import Stats 
 47  from Products.ZenUtils.Utils import unused 
 48  from Products.ZenCollector.services.config import DeviceProxy 
 49  unused(Globals, DeviceProxy) 
 50   
 51  COLLECTOR_NAME = 'zensyslog' 
 52  log = logging.getLogger("zen.%s" % COLLECTOR_NAME) 
 53   
 54   
55 -class SyslogPreferences(object):
56 zope.interface.implements(ICollectorPreferences) 57
58 - def __init__(self):
59 """ 60 Constructs a new PingCollectionPreferences instance and 61 provides default values for needed attributes. 62 """ 63 self.collectorName = COLLECTOR_NAME 64 self.defaultRRDCreateCommand = None 65 self.configCycleInterval = 20 # minutes 66 self.cycleInterval = 5 * 60 # seconds 67 68 # The configurationService attribute is the fully qualified class-name 69 # of our configuration service that runs within ZenHub 70 self.configurationService = 'Products.ZenHub.services.SyslogConfig' 71 72 # Will be filled in based on buildOptions 73 self.options = None 74 75 self.configCycleInterval = 20*60
76
77 - def postStartupTasks(self):
78 task = SyslogTask(COLLECTOR_NAME, configId=COLLECTOR_NAME) 79 yield task
80
81 - def buildOptions(self, parser):
82 """ 83 Command-line options to be supported 84 """ 85 SYSLOG_PORT = 514 86 try: 87 SYSLOG_PORT = socket.getservbyname('syslog', 'udp') 88 except socket.error: 89 pass 90 91 parser.add_option('--parsehost', dest='parsehost', 92 action='store_true', default=False, 93 help='Try to parse the hostname part of a syslog HEADER' 94 ) 95 parser.add_option('--stats', dest='stats', 96 action='store_true', default=False, 97 help='Print statistics to log every 2 secs') 98 parser.add_option('--logorig', dest='logorig', 99 action='store_true', default=False, 100 help='Log the original message') 101 parser.add_option('--logformat', dest='logformat', 102 default='human', 103 help='Human-readable (/var/log/messages) or raw (wire)' 104 ) 105 parser.add_option('--minpriority', dest='minpriority', 106 default=6, type='int', 107 help='Minimum priority message that zensyslog will accept' 108 ) 109 parser.add_option('--syslogport', dest='syslogport', 110 default=SYSLOG_PORT, type='int', 111 help='Port number to use for syslog events' 112 ) 113 parser.add_option('--listenip', dest='listenip', 114 default='0.0.0.0', 115 help='IP address to listen on. Default is %default' 116 ) 117 parser.add_option('--useFileDescriptor', 118 dest='useFileDescriptor', type='int', 119 help='Read from an existing connection rather opening a new port.' 120 , default=None) 121 parser.add_option('--noreverseLookup', dest='noreverseLookup', 122 action='store_true', default=False, 123 help="Don't convert the remote device's IP address to a hostname." 124 )
125
126 - def postStartup(self):
127 daemon = zope.component.getUtility(ICollector) 128 daemon.defaultPriority = 1
129 130
131 -class SyslogTask(BaseTask, DatagramProtocol):
132 """ 133 Listen for syslog messages and turn them into events 134 Connects to the TrapService service in zenhub. 135 """ 136 zope.interface.implements(IScheduledTask) 137 138 SYSLOG_DATE_FORMAT = '%b %d %H:%M:%S' 139 SAMPLE_DATE = 'Apr 10 15:19:22' 140
141 - def __init__(self, taskName, configId, 142 scheduleIntervalSeconds=3600, taskConfig=None):
143 BaseTask.__init__(self, taskName, configId, 144 scheduleIntervalSeconds, taskConfig) 145 self.log = log 146 147 # Needed for interface 148 self.name = taskName 149 self.configId = configId 150 self.state = TaskStates.STATE_IDLE 151 self.interval = scheduleIntervalSeconds 152 self._preferences = taskConfig 153 self._daemon = zope.component.getUtility(ICollector) 154 self._eventService = zope.component.queryUtility(IEventService) 155 self._preferences = self._daemon 156 157 self.options = self._daemon.options 158 159 self.stats = Stats() 160 161 if not self.options.useFileDescriptor\ 162 and self.options.syslogport < 1024: 163 self._daemon.openPrivilegedPort('--listen', '--proto=udp', 164 '--port=%s:%d' 165 % (self.options.listenip, 166 self.options.syslogport)) 167 self._daemon.changeUser() 168 self.minpriority = self.options.minpriority 169 self.processor = None 170 171 if self.options.logorig: 172 self.olog = logging.getLogger('origsyslog') 173 self.olog.setLevel(20) 174 self.olog.propagate = False 175 lname = zenPath('log/origsyslog.log') 176 hdlr = logging.FileHandler(lname) 177 hdlr.setFormatter(logging.Formatter('%(message)s')) 178 self.olog.addHandler(hdlr) 179 180 if self.options.useFileDescriptor is not None: 181 self.useUdpFileDescriptor(int(self.options.useFileDescriptor)) 182 else: 183 reactor.listenUDP(self.options.syslogport, self, 184 interface=self.options.listenip) 185 186 # yield self.model().callRemote('getDefaultPriority') 187 self.processor = SyslogProcessor(self._eventService.sendEvent, 188 self.options.minpriority, self.options.parsehost, 189 self.options.monitor, self._daemon.defaultPriority)
190
191 - def doTask(self):
192 """ 193 This is a wait-around task since we really are called 194 asynchronously. 195 """ 196 return defer.succeed("Waiting for syslog messages...")
197
198 - def useUdpFileDescriptor(self, fd):
199 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_DGRAM) 200 os.close(fd) 201 port = s.getsockname()[1] 202 transport = udp.Port(port, self) 203 s.setblocking(0) 204 transport.socket = s 205 transport.fileno = s.fileno 206 transport.connected = 1 207 transport._realPortNumber = port 208 self.transport = transport 209 # hack around startListening not being called 210 self.numPorts = 1 211 transport.startReading()
212
213 - def expand(self, msg, client_address):
214 """ 215 Expands a syslog message into a string format suitable for writing 216 to the filesystem such that it appears the same as it would 217 had the message been logged by the syslog daemon. 218 219 @param msg: syslog message 220 @type msg: string 221 @param client_address: IP info of the remote device (ipaddr, port) 222 @type client_address: tuple of (string, number) 223 @return: message 224 @rtype: string 225 """ 226 # pri := facility * severity 227 stop = msg.find('>') 228 229 # check for a datestamp. default to right now if date not present 230 start = stop + 1 231 stop = start + len(SyslogTask.SAMPLE_DATE) 232 dateField = msg[start:stop] 233 try: 234 date = time.strptime(dateField, 235 SyslogTask.SYSLOG_DATE_FORMAT) 236 year = time.localtime()[0] 237 date = (year, ) + date[1:] 238 start = stop + 1 239 except ValueError: 240 241 # date not present, so use today's date 242 date = time.localtime() 243 244 # check for a hostname. default to localhost if not present 245 stop = msg.find(' ', start) 246 if msg[stop - 1] == ':': 247 hostname = client_address[0] 248 else: 249 hostname = msg[start:stop] 250 start = stop + 1 251 252 # the message content 253 body = msg[start:] 254 255 # assemble the message 256 prettyTime = time.strftime(SyslogTask.SYSLOG_DATE_FORMAT, date) 257 message = '%s %s %s' % (prettyTime, hostname, body) 258 return message
259
260 - def datagramReceived(self, msg, client_address):
261 """ 262 Consume the network packet 263 264 @param msg: syslog message 265 @type msg: string 266 @param client_address: IP info of the remote device (ipaddr, port) 267 @type client_address: tuple of (string, number) 268 """ 269 (ipaddr, port) = client_address 270 if self.options.logorig: 271 if self.options.logformat == 'human': 272 message = self.expand(msg, client_address) 273 else: 274 message = msg 275 self.olog.info(message) 276 277 if self.options.noreverseLookup: 278 d = defer.succeed(ipaddr) 279 else: 280 d = asyncNameLookup(ipaddr) 281 d.addBoth(self.gotHostname, (msg, ipaddr, time.time()))
282
283 - def gotHostname(self, response, data):
284 """ 285 Send the resolved address, if possible, and the event via the thread 286 287 @param response: Twisted response 288 @type response: Twisted response 289 @param data: (msg, ipaddr, rtime) 290 @type data: tuple of (string, string, datetime object) 291 """ 292 (msg, ipaddr, rtime) = data 293 if isinstance(response, failure.Failure): 294 host = ipaddr 295 else: 296 host = response 297 if self.processor: 298 self.processor.process(msg, ipaddr, host, rtime)
299
300 - def displayStatistics(self):
301 totalTime, totalEvents, maxTime = self.stats.report() 302 display = "%d events processed in %.2f seconds" % ( 303 totalEvents, 304 totalTime) 305 if totalEvents > 0: 306 display += """ 307 %.5f average seconds per event 308 Maximum processing time for one event was %.5f""" % ( 309 (totalTime / totalEvents), maxTime) 310 return display
311
312 - def cleanup(self):
313 status = self.displayStatistics() 314 self.log.info(status)
315 316
317 -class SyslogConfigTask(ObservableMixin):
318 """ 319 Receive a configuration object containing the default priority 320 """ 321 zope.interface.implements(IScheduledTask) 322
323 - def __init__(self, taskName, configId, 324 scheduleIntervalSeconds=3600, taskConfig=None):
325 super(SyslogConfigTask, self).__init__() 326 327 # Needed for ZCA interface contract 328 self.name = taskName 329 self.configId = configId 330 self.state = TaskStates.STATE_IDLE 331 self.interval = scheduleIntervalSeconds 332 self._preferences = taskConfig 333 self._daemon = zope.component.getUtility(ICollector) 334 335 self._daemon.defaultPriority = self._preferences.defaultPriority
336
337 - def doTask(self):
338 return defer.succeed("Already updated default syslog priority...")
339
340 - def cleanup(self):
341 pass
342 343
344 -class SyslogDaemon(CollectorDaemon):
345 346 _frameworkFactoryName = "nosip"
347 348 349 if __name__=='__main__': 350 myPreferences = SyslogPreferences() 351 myTaskFactory = SimpleTaskFactory(SyslogConfigTask) 352 myTaskSplitter = SimpleTaskSplitter(myTaskFactory) 353 daemon = SyslogDaemon(myPreferences, myTaskSplitter) 354 daemon.run() 355