1
2
3
4
5
6
7
8
9
10
11
12 __doc__ = """zensyslog
13
14 Turn syslog messages into events.
15
16 """
17
18 import time
19 import socket
20 import os
21 import logging
22
23 from twisted.internet.protocol import DatagramProtocol
24 from twisted.internet import reactor, defer, udp
25 from twisted.python import failure
26
27 import Globals
28 import zope.interface
29 import zope.component
30
31
32 from Products.ZenCollector.daemon import CollectorDaemon
33 from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\
34 IEventService, \
35 IScheduledTask
36 from Products.ZenCollector.tasks import SimpleTaskFactory,\
37 SimpleTaskSplitter,\
38 BaseTask, TaskStates
39 from Products.ZenUtils.observable import ObservableMixin
40
41 from Products.ZenEvents.SyslogProcessing import SyslogProcessor
42
43 from Products.ZenUtils.Utils import zenPath
44 from Products.ZenUtils.IpUtil import asyncNameLookup
45
46 from Products.ZenEvents.EventServer import Stats
47 from Products.ZenUtils.Utils import unused
48 from Products.ZenCollector.services.config import DeviceProxy
49 unused(Globals, DeviceProxy)
50
51 COLLECTOR_NAME = 'zensyslog'
52 log = logging.getLogger("zen.%s" % COLLECTOR_NAME)
53
54
56 zope.interface.implements(ICollectorPreferences)
57
76
80
82 """
83 Command-line options to be supported
84 """
85 SYSLOG_PORT = 514
86 try:
87 SYSLOG_PORT = socket.getservbyname('syslog', 'udp')
88 except socket.error:
89 pass
90
91 parser.add_option('--parsehost', dest='parsehost',
92 action='store_true', default=False,
93 help='Try to parse the hostname part of a syslog HEADER'
94 )
95 parser.add_option('--stats', dest='stats',
96 action='store_true', default=False,
97 help='Print statistics to log every 2 secs')
98 parser.add_option('--logorig', dest='logorig',
99 action='store_true', default=False,
100 help='Log the original message')
101 parser.add_option('--logformat', dest='logformat',
102 default='human',
103 help='Human-readable (/var/log/messages) or raw (wire)'
104 )
105 parser.add_option('--minpriority', dest='minpriority',
106 default=6, type='int',
107 help='Minimum priority message that zensyslog will accept'
108 )
109 parser.add_option('--syslogport', dest='syslogport',
110 default=SYSLOG_PORT, type='int',
111 help='Port number to use for syslog events'
112 )
113 parser.add_option('--listenip', dest='listenip',
114 default='0.0.0.0',
115 help='IP address to listen on. Default is %default'
116 )
117 parser.add_option('--useFileDescriptor',
118 dest='useFileDescriptor', type='int',
119 help='Read from an existing connection rather opening a new port.'
120 , default=None)
121 parser.add_option('--noreverseLookup', dest='noreverseLookup',
122 action='store_true', default=False,
123 help="Don't convert the remote device's IP address to a hostname."
124 )
125
126 - def postStartup(self):
127 daemon = zope.component.getUtility(ICollector)
128 daemon.defaultPriority = 1
129
130
132 """
133 Listen for syslog messages and turn them into events
134 Connects to the TrapService service in zenhub.
135 """
136 zope.interface.implements(IScheduledTask)
137
138 SYSLOG_DATE_FORMAT = '%b %d %H:%M:%S'
139 SAMPLE_DATE = 'Apr 10 15:19:22'
140
141 - def __init__(self, taskName, configId,
142 scheduleIntervalSeconds=3600, taskConfig=None):
143 BaseTask.__init__(self, taskName, configId,
144 scheduleIntervalSeconds, taskConfig)
145 self.log = log
146
147
148 self.name = taskName
149 self.configId = configId
150 self.state = TaskStates.STATE_IDLE
151 self.interval = scheduleIntervalSeconds
152 self._preferences = taskConfig
153 self._daemon = zope.component.getUtility(ICollector)
154 self._eventService = zope.component.queryUtility(IEventService)
155 self._preferences = self._daemon
156
157 self.options = self._daemon.options
158
159 self.stats = Stats()
160
161 if not self.options.useFileDescriptor\
162 and self.options.syslogport < 1024:
163 self._daemon.openPrivilegedPort('--listen', '--proto=udp',
164 '--port=%s:%d'
165 % (self.options.listenip,
166 self.options.syslogport))
167 self._daemon.changeUser()
168 self.minpriority = self.options.minpriority
169 self.processor = None
170
171 if self.options.logorig:
172 self.olog = logging.getLogger('origsyslog')
173 self.olog.setLevel(20)
174 self.olog.propagate = False
175 lname = zenPath('log/origsyslog.log')
176 hdlr = logging.FileHandler(lname)
177 hdlr.setFormatter(logging.Formatter('%(message)s'))
178 self.olog.addHandler(hdlr)
179
180 if self.options.useFileDescriptor is not None:
181 self.useUdpFileDescriptor(int(self.options.useFileDescriptor))
182 else:
183 reactor.listenUDP(self.options.syslogport, self,
184 interface=self.options.listenip)
185
186
187 self.processor = SyslogProcessor(self._eventService.sendEvent,
188 self.options.minpriority, self.options.parsehost,
189 self.options.monitor, self._daemon.defaultPriority)
190
192 """
193 This is a wait-around task since we really are called
194 asynchronously.
195 """
196 return defer.succeed("Waiting for syslog messages...")
197
199 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_DGRAM)
200 os.close(fd)
201 port = s.getsockname()[1]
202 transport = udp.Port(port, self)
203 s.setblocking(0)
204 transport.socket = s
205 transport.fileno = s.fileno
206 transport.connected = 1
207 transport._realPortNumber = port
208 self.transport = transport
209
210 self.numPorts = 1
211 transport.startReading()
212
213 - def expand(self, msg, client_address):
259
261 """
262 Consume the network packet
263
264 @param msg: syslog message
265 @type msg: string
266 @param client_address: IP info of the remote device (ipaddr, port)
267 @type client_address: tuple of (string, number)
268 """
269 (ipaddr, port) = client_address
270 if self.options.logorig:
271 if self.options.logformat == 'human':
272 message = self.expand(msg, client_address)
273 else:
274 message = msg
275 self.olog.info(message)
276
277 if self.options.noreverseLookup:
278 d = defer.succeed(ipaddr)
279 else:
280 d = asyncNameLookup(ipaddr)
281 d.addBoth(self.gotHostname, (msg, ipaddr, time.time()))
282
284 """
285 Send the resolved address, if possible, and the event via the thread
286
287 @param response: Twisted response
288 @type response: Twisted response
289 @param data: (msg, ipaddr, rtime)
290 @type data: tuple of (string, string, datetime object)
291 """
292 (msg, ipaddr, rtime) = data
293 if isinstance(response, failure.Failure):
294 host = ipaddr
295 else:
296 host = response
297 if self.processor:
298 self.processor.process(msg, ipaddr, host, rtime)
299
311
315
316
318 """
319 Receive a configuration object containing the default priority
320 """
321 zope.interface.implements(IScheduledTask)
322
323 - def __init__(self, taskName, configId,
324 scheduleIntervalSeconds=3600, taskConfig=None):
336
338 return defer.succeed("Already updated default syslog priority...")
339
342
343
347
348
349 if __name__=='__main__':
350 myPreferences = SyslogPreferences()
351 myTaskFactory = SimpleTaskFactory(SyslogConfigTask)
352 myTaskSplitter = SimpleTaskSplitter(myTaskFactory)
353 daemon = SyslogDaemon(myPreferences, myTaskSplitter)
354 daemon.run()
355