Package Products :: Package ZenEvents :: Module zenpop3
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.zenpop3

  1  #! /usr/bin/env python  
  2  ############################################################################## 
  3  #  
  4  # Copyright (C) Zenoss, Inc. 2007, 2011, all rights reserved. 
  5  #  
  6  # This content is made available according to terms specified in 
  7  # License.zenoss under the directory where your Zenoss product is installed. 
  8  #  
  9  ############################################################################## 
 10   
 11   
 12  # Notes: database wants events in UTC time 
 13  # Events page shows local time, as determined on the server where zenoss runs 
 14   
 15  __doc__ = """zenpop3 
 16   
 17  Turn email messages obtained from POP3 accounts into events. 
 18   
 19  """ 
 20   
 21  import logging 
 22  import socket 
 23   
 24  import Globals 
 25  import zope.interface 
 26   
 27  from twisted.mail.pop3client import POP3Client 
 28  from twisted.internet.ssl import ClientContextFactory 
 29  from twisted.internet import reactor, protocol, defer, error 
 30   
 31  from Products.ZenCollector.daemon import CollectorDaemon 
 32  from Products.ZenCollector.interfaces import ICollector, ICollectorPreferences,\ 
 33                                               IEventService, \ 
 34                                               IScheduledTask 
 35  from Products.ZenCollector.tasks import NullTaskSplitter,\ 
 36                                          BaseTask, TaskStates 
 37   
 38  # Invalidation issues arise if we don't import 
 39  from Products.ZenCollector.services.config import DeviceProxy 
 40   
 41  from Products.ZenEvents.MailProcessor import POPProcessor 
 42   
 43   
 44  COLLECTOR_NAME = 'zenpop3' 
 45  log = logging.getLogger("zen.%s" % COLLECTOR_NAME) 
 46   
 47   
48 -class MailPreferences(object):
49 zope.interface.implements(ICollectorPreferences) 50
51 - def __init__(self):
52 """ 53 Constructs a new PingCollectionPreferences instance and 54 provides default values for needed attributes. 55 """ 56 self.collectorName = COLLECTOR_NAME 57 self.defaultRRDCreateCommand = None 58 self.configCycleInterval = 20 # minutes 59 self.cycleInterval = 5 * 60 # seconds 60 61 # The configurationService attribute is the fully qualified class-name 62 # of our configuration service that runs within ZenHub 63 self.configurationService = 'Products.ZenHub.services.NullConfig' 64 65 # Will be filled in based on buildOptions 66 self.options = None 67 68 self.configCycleInterval = 20*60
69
70 - def postStartupTasks(self):
71 task = MailCollectingTask(COLLECTOR_NAME, configId=COLLECTOR_NAME) 72 yield task
73
74 - def buildOptions(self, parser):
75 """ 76 Command-line options to be supported 77 """ 78 POP3_PORT = 110 79 try: 80 POP3_PORT = socket.getservbyname('pop3', 'tcp') 81 except socket.error: 82 pass 83 84 parser.add_option('--usessl', 85 dest='usessl', 86 default=False, 87 action="store_true", 88 help="Use SSL when connecting to POP server") 89 parser.add_option('--nodelete', 90 dest='nodelete', 91 default=False, 92 action="store_true", 93 help="Leave messages on POP server") 94 parser.add_option('--pophost', 95 dest='pophost', 96 default="pop.zenoss.com", 97 help="POP server from which emails are to be read") 98 parser.add_option('--popport', 99 dest='popport', 100 default=POP3_PORT, 101 type="int", 102 help="POP port from which emails are to be read") 103 parser.add_option('--popuser', 104 dest='popuser', 105 default="zenoss", 106 help="POP user") 107 parser.add_option('--poppass', 108 dest='poppass', 109 default="zenoss", 110 help="POP password") 111 parser.add_option('--cycletime', 112 dest='cycletime', 113 type="int", 114 default=60, 115 help="Frequency (in seconds) to poll the POP server") 116 parser.add_option('--eventseverity', 117 dest='eventseverity', 118 default="2", 119 type="int", 120 help="Severity for events created")
121
122 - def postStartup(self):
123 pass
124 125
126 -class POPProtocol(POP3Client):
127 """ 128 Protocol that is responsible for conversing with a POP server 129 after a connection has been established. Downloads messages (and 130 deletes them by default), and passes the messages back up to the 131 factory to process and turn into events. 132 """ 133 134 allowInsecureLogin = True 135 timeout = 15 136 totalMessages = 0 137
138 - def serverGreeting(self, unused):
139 log.debug('Server greeting received: Logging in...') 140 141 login = self.login(self.factory.user, self.factory.passwd) 142 login.addCallback(self._loggedIn) 143 login.addErrback(self.factory.deferred.errback)
144
145 - def _loggedIn(self, unused):
146 log.debug('Logged in') 147 return self.retrieveAndParse()
148
149 - def retrieveAndParse(self):
150 d = self.listSize() 151 d.addCallback(self._gotMessageSizes) 152 153 return d
154
155 - def _gotMessageSizes(self, sizes):
156 self.totalMessages = len(sizes) 157 log.info('Messages to retrieve: %d', self.totalMessages) 158 159 self.sizes = sizes 160 161 retreivers = [] 162 for i in range(len(sizes)): 163 log.debug('Retrieving message #%d...' % i) 164 d = self.retrieve(i) 165 d.addCallback(self._gotMessageLines) 166 retreivers.append(d) 167 168 deferreds = defer.DeferredList(retreivers) 169 deferreds.addCallback(self._delete) 170 return deferreds.addCallback(self.scanComplete)
171
172 - def _gotMessageLines(self, messageLines):
173 log.debug('Passing message up to factory') 174 self.factory.handleMessage("\r\n".join(messageLines))
175
176 - def _delete(self, unused):
177 deleters = [] 178 if not self.factory.nodelete: 179 for index in range(len(self.sizes)): 180 log.info('Deleting message #%d...' % index) 181 d = self.delete(index) 182 deleters.append(d) 183 184 deferreds = defer.DeferredList(deleters) 185 return deferreds
186
187 - def scanComplete(self, unused):
188 log.debug("Scan complete") 189 self.quit()
190 191
192 -class POPFactory(protocol.ClientFactory):
193 """ 194 Factory that stores the configuration the protocol uses to do 195 its job. 196 """ 197 protocol = POPProtocol 198
199 - def __init__(self, user, passwd, processor, nodelete):
200 self.user = user 201 self.passwd = passwd 202 self.processor = processor 203 self.deferred = defer.Deferred() 204 self.nodelete = nodelete
205
206 - def handleMessage(self, messageData):
207 self.processor.process(messageData)
208
209 - def clientConnectionFailed(self, unused, reason):
210 self.deferred.errback(reason)
211 212
213 -class MailCollectingTask(BaseTask):
214 zope.interface.implements(IScheduledTask) 215 216 STATE_COLLECTING = 'COLLECTING' 217
218 - def __init__(self, taskName, configId, 219 scheduleIntervalSeconds=60, taskConfig=None):
220 BaseTask.__init__(self, taskName, configId, 221 scheduleIntervalSeconds, taskConfig) 222 self.log = log 223 224 # Needed for interface 225 self.name = taskName 226 self.configId = configId 227 self.state = TaskStates.STATE_IDLE 228 self._preferences = taskConfig 229 self._daemon = zope.component.getUtility(ICollector) 230 self._eventService = zope.component.queryUtility(IEventService) 231 self._preferences = self._daemon 232 233 self.options = self._daemon.options 234 235 # This will take a bit to catch up, but.... 236 self.interval = self.options.cycletime 237 238 # Allow MailProcessor to work unmodified 239 self.sendEvent = self._eventService.sendEvent 240 241 self._daemon.changeUser() 242 self.processor = POPProcessor(self,self.options.eventseverity) 243 self._connection = None
244
245 - def doTask(self):
246 d = defer.maybeDeferred(self.checkForMessages) 247 return d
248
249 - def makeFactory(self):
250 self.factory = POPFactory(self.options.popuser, self.options.poppass, 251 self.processor, self.options.nodelete) 252 self.factory.deferred.addErrback(self.handleError)
253
254 - def checkForMessages(self):
255 self.state = MailCollectingTask.STATE_COLLECTING 256 257 self.makeFactory() 258 if self.options.usessl: 259 log.debug("Connecting to server %s:%s using SSL as %s", 260 self.options.pophost, self.options.popport, self.options.popuser) 261 self._connection = reactor.connectSSL(self.options.pophost, self.options.popport, 262 self.factory, ClientContextFactory()) 263 else: 264 log.debug("Connecting to server %s:%s using plaintext as %s", 265 self.options.pophost, self.options.popport, self.options.popuser) 266 self._connection = reactor.connectTCP(self.options.pophost, self.options.popport, 267 self.factory) 268 return defer.succeed("Connected to server %s:%s" % ( 269 self.options.pophost, self.options.popport))
270
271 - def _finished(self, result=None):
272 if self._connection: 273 self._connection.disconnect() 274 if self.factory: 275 message = "Last retrieved %d messages" % self.factory.protocol.totalMessages 276 else: 277 message = "Completed" 278 return defer.succeed(message)
279
280 - def handleError(self, err):
281 if err.type == error.TimeoutError: 282 message = "Timed out connecting to %s:%d" % ( 283 self.options.pophost, self.options.popport) 284 285 elif err.type == error.ConnectionRefusedError: 286 message = "Connection refused by %s:%d" % ( 287 self.options.pophost, self.options.popport) 288 289 elif err.type == error.ConnectError: 290 message = "Connection failed to %s:%d" % ( 291 self.options.pophost, self.options.popport) 292 else: 293 message = err.getErrorMessage() 294 self.sendEvent(dict( 295 device=socket.getfqdn(), 296 component=COLLECTOR_NAME, 297 severity=5, 298 summary="Fatal error in %s" % COLLECTOR_NAME, 299 message=message, 300 )) 301 302 # Force the task to quit 303 self.state = TaskStates.STATE_COMPLETED 304 305 log.error(message) 306 return defer.succeed(message)
307
308 - def cleanup(self):
309 self._finished()
310 311 312 if __name__=='__main__': 313 myPreferences = MailPreferences() 314 myTaskSplitter = NullTaskSplitter() 315 daemon = CollectorDaemon(myPreferences, myTaskSplitter) 316 daemon.run() 317