| Trees | Indices | Help |
|
|---|
|
|
1 ###########################################################################
2 #
3 # This program is part of Zenoss Core, an open source monitoring platform.
4 # Copyright (C) 2007, 2011 Zenoss Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License version 2 or (at your
8 # option) any later version as published by the Free Software Foundation.
9 #
10 # For complete information please visit: http://www.zenoss.com/oss/
11 #
12 ###########################################################################
13 #! /usr/bin/env python
14 # Notes: database wants events in UTC time
15 # Events page shows local time, as determined on the server where zenoss runs
16
17 __doc__ = """zenpop3
18
19 Turn email messages obtained from POP3 accounts into events.
20
21 """
22
23 import logging
24 import socket
25
26 import Globals
27 import zope.interface
28
29 from twisted.mail.pop3client import POP3Client
30 from twisted.internet.ssl import ClientContextFactory
31 from twisted.internet import reactor, protocol, defer, error
32
33 from Products.ZenCollector.daemon import CollectorDaemon
34 from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\
35 IEventService, \
36 IScheduledTask
37 from Products.ZenCollector.tasks import NullTaskSplitter,\
38 BaseTask, TaskStates
39
40 from Products.ZenEvents.MailProcessor import POPProcessor
41
42
43 COLLECTOR_NAME = 'zenpop3'
44 log = logging.getLogger("zen.%s" % COLLECTOR_NAME)
45
46
48 zope.interface.implements(ICollectorPreferences)
49
51 """
52 Constructs a new PingCollectionPreferences instance and
53 provides default values for needed attributes.
54 """
55 self.collectorName = COLLECTOR_NAME
56 self.defaultRRDCreateCommand = None
57 self.configCycleInterval = 20 # minutes
58 self.cycleInterval = 5 * 60 # seconds
59
60 # The configurationService attribute is the fully qualified class-name
61 # of our configuration service that runs within ZenHub
62 self.configurationService = 'Products.ZenHub.services.NullConfig'
63
64 # Will be filled in based on buildOptions
65 self.options = None
66
67 self.configCycleInterval = 20*60
68
72
74 """
75 Command-line options to be supported
76 """
77 POP3_PORT = 110
78 try:
79 POP3_PORT = socket.getservbyname('pop3', 'tcp')
80 except socket.error:
81 pass
82
83 parser.add_option('--usessl',
84 dest='usessl',
85 default=False,
86 action="store_true",
87 help="Use SSL when connecting to POP server")
88 parser.add_option('--nodelete',
89 dest='nodelete',
90 default=False,
91 action="store_true",
92 help="Leave messages on POP server")
93 parser.add_option('--pophost',
94 dest='pophost',
95 default="pop.zenoss.com",
96 help="POP server from which emails are to be read")
97 parser.add_option('--popport',
98 dest='popport',
99 default=POP3_PORT,
100 type="int",
101 help="POP port from which emails are to be read")
102 parser.add_option('--popuser',
103 dest='popuser',
104 default="zenoss",
105 help="POP user")
106 parser.add_option('--poppass',
107 dest='poppass',
108 default="zenoss",
109 help="POP password")
110 parser.add_option('--cycletime',
111 dest='cycletime',
112 type="int",
113 default=60,
114 help="Frequency (in seconds) to poll the POP server")
115 parser.add_option('--eventseverity',
116 dest='eventseverity',
117 default="2",
118 type="int",
119 help="Severity for events created")
120
123
124
126 """
127 Protocol that is responsible for conversing with a POP server
128 after a connection has been established. Downloads messages (and
129 deletes them by default), and passes the messages back up to the
130 factory to process and turn into events.
131 """
132
133 allowInsecureLogin = True
134 timeout = 15
135 totalMessages = 0
136
138 log.debug('Server greeting received: Logging in...')
139
140 login = self.login(self.factory.user, self.factory.passwd)
141 login.addCallback(self._loggedIn)
142 login.addErrback(self.factory.deferred.errback)
143
147
153
155 self.totalMessages = len(sizes)
156 log.info('Messages to retrieve: %d', self.totalMessages)
157
158 self.sizes = sizes
159
160 retreivers = []
161 for i in range(len(sizes)):
162 log.debug('Retrieving message #%d...' % i)
163 d = self.retrieve(i)
164 d.addCallback(self._gotMessageLines)
165 retreivers.append(d)
166
167 deferreds = defer.DeferredList(retreivers)
168 deferreds.addCallback(self._delete)
169 return deferreds.addCallback(self.scanComplete)
170
172 log.debug('Passing message up to factory')
173 self.factory.handleMessage("\r\n".join(messageLines))
174
176 deleters = []
177 if not self.factory.nodelete:
178 for index in range(len(self.sizes)):
179 log.info('Deleting message #%d...' % index)
180 d = self.delete(index)
181 deleters.append(d)
182
183 deferreds = defer.DeferredList(deleters)
184 return deferreds
185
189
190
192 """
193 Factory that stores the configuration the protocol uses to do
194 its job.
195 """
196 protocol = POPProtocol
197
199 self.user = user
200 self.passwd = passwd
201 self.processor = processor
202 self.deferred = defer.Deferred()
203 self.nodelete = nodelete
204
206 self.processor.process(messageData)
207
209 self.deferred.errback(reason)
210
211
213 zope.interface.implements(IScheduledTask)
214
215 STATE_COLLECTING = 'COLLECTING'
216
219 BaseTask.__init__(self, taskName, configId,
220 scheduleIntervalSeconds, taskConfig)
221 self.log = log
222
223 # Needed for interface
224 self.name = taskName
225 self.configId = configId
226 self.state = TaskStates.STATE_IDLE
227 self._preferences = taskConfig
228 self._daemon = zope.component.getUtility(ICollector)
229 self._eventService = zope.component.queryUtility(IEventService)
230 self._preferences = self._daemon
231
232 self.options = self._daemon.options
233
234 # This will take a bit to catch up, but....
235 self.interval = self.options.cycletime
236
237 # Allow MailProcessor to work unmodified
238 self.sendEvent = self._eventService.sendEvent
239
240 self._daemon.changeUser()
241 self.processor = POPProcessor(self,self.options.eventseverity)
242 self._connection = None
243
247
249 self.factory = POPFactory(self.options.popuser, self.options.poppass,
250 self.processor, self.options.nodelete)
251 self.factory.deferred.addErrback(self.handleError)
252
254 self.state = MailCollectingTask.STATE_COLLECTING
255
256 self.makeFactory()
257 if self.options.usessl:
258 log.debug("Connecting to server %s:%s using SSL as %s",
259 self.options.pophost, self.options.popport, self.options.popuser)
260 self._connection = reactor.connectSSL(self.options.pophost, self.options.popport,
261 self.factory, ClientContextFactory())
262 else:
263 log.debug("Connecting to server %s:%s using plaintext as %s",
264 self.options.pophost, self.options.popport, self.options.popuser)
265 self._connection = reactor.connectTCP(self.options.pophost, self.options.popport,
266 self.factory)
267 return defer.succeed("Connected to server %s:%s" % (
268 self.options.pophost, self.options.popport))
269
271 if self._connection:
272 self._connection.disconnect()
273 if self.factory:
274 message = "Last retrieved %d messages" % self.factory.protocl.totalMessages
275 else:
276 message = "Completed"
277 return defer.succeed(message)
278
280 if err.type == error.TimeoutError:
281 message = "Timed out connecting to %s:%d" % (
282 self.options.pophost, self.options.popport)
283
284 elif err.type == error.ConnectionRefusedError:
285 message = "Connection refused by %s:%d" % (
286 self.options.pophost, self.options.popport)
287
288 elif err.type == error.ConnectError:
289 message = "Connection failed to %s:%d" % (
290 self.options.pophost, self.options.popport)
291 else:
292 message = err.getErrorMessage()
293 self.sendEvent(dict(
294 device=socket.getfqdn(),
295 component=COLLECTOR_NAME,
296 severity=5,
297 summary="Fatal error in %s" % COLLECTOR_NAME,
298 message=message,
299 ))
300
301 # Force the task to quit
302 self.state = TaskStates.STATE_COMPLETED
303
304 log.error(message)
305 return defer.succeed(message)
306
308 self._finished()
309
310
311 if __name__=='__main__':
312 myPreferences = MailPreferences()
313 myTaskSplitter = NullTaskSplitter()
314 daemon = CollectorDaemon(myPreferences, myTaskSplitter)
315 daemon.run()
316
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1.1812 on Tue Oct 11 12:51:41 2011 | http://epydoc.sourceforge.net |