Package Products :: Package ZenStatus :: Module PingService
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenStatus.PingService

  1  ############################################################################## 
  2  #  
  3  # Copyright (C) Zenoss, Inc. 2007, 2011, all rights reserved. 
  4  #  
  5  # This content is made available according to terms specified in 
  6  # License.zenoss under the directory where your Zenoss product is installed. 
  7  #  
  8  ############################################################################## 
  9   
 10   
 11  __doc__ = """PingService 
 12  Class that provides a way to asynchronously ping (ICMP packets) IP addresses. 
 13  """ 
 14   
 15  import sys 
 16  import os 
 17  import time 
 18  import socket 
 19  import errno 
 20  import logging 
 21  log = logging.getLogger("zen.PingService") 
 22   
 23  # Zenoss custom ICMP library 
 24  from icmpecho.Ping import Ping4, Ping6 
 25   
 26  from twisted.internet import reactor, defer 
 27  from twisted.python.failure import Failure 
 28   
 29  import Globals 
 30  from Products.ZenStatus.PingJob import PingJob 
 31   
32 -class PermissionError(Exception):
33 """Not permitted to access resource."""
34
35 -class IpConflict(Exception):
36 """Pinging two IP pingjobs simultaneously with different hostnames"""
37
38 -class PingJobError(Exception):
39 - def __init__(self, error_message, ipaddr):
40 Exception.__init__(self, error_message) 41 self.ipaddr = ipaddr
42
43 -class PingService(object):
44
45 - def __init__(self, protocol, timeout=2, defaultTries=2):
46 self.reconfigure(timeout) 47 self.procId = os.getpid() 48 self.defaultTries = defaultTries 49 self.jobqueue = {} 50 self.pktdata = 'zenping %s %s' % (socket.getfqdn(), self.procId) 51 52 self._protocol = protocol 53 reactor.addReader(self)
54
55 - def reconfigure(self, timeout=2):
56 self.timeout = timeout
57
58 - def fileno(self):
59 """ 60 The reactor will do reads only if we support a file-like interface 61 """ 62 return self._protocol.fileno()
63
64 - def logPrefix(self):
65 """ 66 The reactor will do reads only if we support a file-like interface 67 """ 68 return None
69
70 - def connectionLost(self, unused):
71 reactor.removeReader(self) 72 self._protocol.close()
73
74 - def ping(self, ip):
75 """ 76 Ping the IP address and return the result in a deferred 77 """ 78 if isinstance(ip, PingJob): 79 pj = ip 80 else: 81 pj = PingJob(ip, maxtries=self.defaultTries) 82 self._ping(pj) 83 return pj.deferred
84
85 - def _ping(self, pingJob):
86 """ 87 Take a pingjob and send an ICMP packet for it 88 """ 89 try: 90 family, sockaddr, echo_kwargs, socket_kwargs = \ 91 pingJob.pingArgs() 92 pingJob.start = self._protocol.send(sockaddr, 93 socket_kwargs, 94 echo_kwargs) 95 pingJob.sent += 1 96 97 reactor.callLater(self.timeout, self.checkTimeout, pingJob) 98 current = self.jobqueue.get(pingJob.ipaddr, None) 99 if current and pingJob.hostname != current.hostname: 100 raise IpConflict("Host %s and %s are both using IP %s" % 101 (pingJob.hostname, 102 current.hostname, 103 pingJob.ipaddr)) 104 self.jobqueue[pingJob.ipaddr] = pingJob 105 except Exception, e: # Note: sockets with bad addresses fail 106 log.debug("%s sendto error %s" % (pingJob.ipaddr, e)) 107 self.pingJobFail(pingJob)
108
109 - def _processPacket(self, reply):
110 """ 111 Examine the parsed reply and determine what to do with it. 112 """ 113 sourceIp = reply['address'] 114 pj = self.jobqueue.get(sourceIp) 115 if reply['alive'] and pj: 116 pj.rcvCount += 1 117 pj.rtt = time.time() - pj.start 118 pj.results.append(pj.rtt) 119 log.debug("%d bytes from %s: icmp_seq=%d time=%0.3f ms", 120 reply['data_size'], sourceIp, reply['sequence'], 121 pj.rtt * 1000) 122 123 if pj.rcvCount >= pj.sampleSize: 124 self.pingJobSucceed(pj) 125 else: 126 self._ping(pj) 127 128 elif not reply['alive'] and pj: 129 log.debug("ICMP unreachable message for %s", pj.ipaddr) 130 self.pingJobFail(pj)
131 132 #else: 133 #log.debug("Unexpected ICMP packet %s %s", sourceIp, reply) 134
135 - def doRead(self):
136 """ 137 Receive packets from the socket and process them. 138 139 The name is required by the reactor select() functionality 140 """ 141 try: 142 for reply, sockaddr in self._protocol.receive(): 143 if not reactor.running: 144 return 145 self._processPacket(reply) 146 except socket.error, err: 147 errnum, errmsg = err.args 148 if errnum == errno.EAGAIN: 149 return 150 raise err 151 except Exception, ex: 152 log.exception("Error while receiving packet: %s" % ex)
153
154 - def pingJobSucceed(self, pj):
155 """ 156 PingJob completed successfully. 157 """ 158 pj.message = "IP %s is up" % pj.ipaddr 159 pj.severity = 0 160 self.dequePingJob(pj) 161 if not pj.deferred.called: 162 pj.deferred.callback(pj)
163
164 - def pingJobFail(self, pj):
165 """ 166 PingJob has failed -- remove from jobqueue. 167 """ 168 pj.rtt = -1 169 pj.message = "IP %s is down" % pj.ipaddr 170 self.dequePingJob(pj) 171 if not pj.deferred.called: 172 pj.deferred.errback(Failure(PingJobError(pj.message, pj.ipaddr)))
173
174 - def dequePingJob(self, pj):
175 try: 176 del self.jobqueue[pj.ipaddr] 177 except KeyError: 178 pass
179
180 - def checkTimeout(self, pj):
181 if pj.ipaddr in self.jobqueue: 182 runtime = time.time() - pj.start 183 if runtime > self.timeout: 184 pj.loss += 1 185 log.debug("%s pingjob timeout on attempt %d (timeout=%ss, max tries=%s)", 186 pj.ipaddr, pj.loss, self.timeout, pj.maxtries) 187 if pj.loss >= pj.maxtries: 188 self.pingJobFail(pj) 189 else: 190 self._ping(pj) 191 else: 192 log.debug("Calling checkTimeout needlessly for %s", pj.ipaddr)
193
194 - def jobCount(self):
195 return len(self.jobqueue)
196 197
198 -def _printResults(results, start):
199 good = [pj for s, pj in results if s and pj.rtt >= 0] 200 bad = [pj for s, pj in results if s and pj.rtt < 0] 201 if good: print "Good IPs: %s" % " ".join(g.ipaddr for g in good) 202 if bad: print "Bad IPs: %s" % " ".join(b.ipaddr for b in bad) 203 print "Tested %d IPs in %.2f seconds" % (len(results), time.time() - start) 204 reactor.stop()
205 206 if __name__ == "__main__": 207 # Sockets are injected into the main module by pyraw 208 # pyraw PingService.py [ip_addresses] 209 210 protocol = Ping4(IPV4_SOCKET) 211 ping = PingService(protocol) 212 logging.basicConfig() 213 log = logging.getLogger() 214 log.setLevel(10) 215 if len(sys.argv) > 1: 216 targets = sys.argv[1:] 217 else: 218 targets = ("127.0.0.1",) 219 lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True) 220 lst.addCallback(_printResults, time.time()) 221 reactor.run() 222