Package Products :: Package ZenStatus :: Module AsyncPing
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenStatus.AsyncPing

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import sys 
 15  import os 
 16  import time 
 17  import socket 
 18  import ip 
 19  import icmp 
 20  import errno 
 21  import logging 
 22  log = logging.getLogger("zen.Ping") 
 23   
 24  from twisted.internet import reactor, defer 
 25  from twisted.spread import pb 
 26   
27 -class PermissionError(Exception):
28 """Not permitted to access resource."""
29
30 -class IpConflict(Exception):
31 """Pinging two jobs simultaneously with different hostnames but the same IP"""
32
33 -class PingJob(pb.Copyable, pb.RemoteCopy):
34 """ 35 Class representing a single target to be pinged. 36 """
37 - def __init__(self, ipaddr, hostname="", status=0, unused_cycle=60):
38 self.parent = False 39 self.ipaddr = ipaddr 40 self.hostname = hostname 41 self.status = status 42 self.reset()
43 44
45 - def reset(self):
46 self.deferred = defer.Deferred() 47 self.rrt = 0 48 self.start = 0 49 self.sent = 0 50 self.message = "" 51 self.severity = 5 52 self.inprocess = False 53 self.pathcheck = 0 54 self.eventState = 0
55 56
57 - def checkpath(self):
58 if self.parent: 59 return self.parent.checkpath()
60 61
62 - def routerpj(self):
63 if self.parent: 64 return self.parent.routerpj()
65 66 pb.setUnjellyableForClass(PingJob, PingJob) 67 68 69 plog = logging.getLogger("zen.Ping")
70 -class Ping(object):
71 """ 72 Class that provides asyncronous icmp ping. 73 """ 74
75 - def __init__(self, tries=2, timeout=2, sock=None):
76 self.reconfigure(tries, timeout) 77 self.procId = os.getpid() 78 self.jobqueue = {} 79 self.pktdata = 'zenping %s %s' % (socket.getfqdn(), self.procId) 80 self.createPingSocket(sock)
81 82
83 - def reconfigure(self, tries=2, timeout=2):
84 self.tries = tries 85 self.timeout = timeout
86 87
88 - def createPingSocket(self, sock):
89 """make an ICMP socket to use for sending and receiving pings""" 90 socketargs = socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP 91 if sock is None: 92 try: 93 s = socket 94 self.pingsocket = s.socket(*socketargs) 95 except socket.error, e: 96 err, msg = e.args 97 if err == errno.EACCES: 98 raise PermissionError("must be root to send icmp.") 99 raise e 100 else: 101 self.pingsocket = socket.fromfd(sock, *socketargs) 102 os.close(sock) 103 self.pingsocket.setblocking(0) 104 reactor.addReader(self)
105
106 - def fileno(self):
107 return self.pingsocket.fileno()
108
109 - def doRead(self):
110 self.recvPackets()
111
112 - def connectionLost(self, unused):
113 reactor.removeReader(self) 114 self.pingsocket.close()
115
116 - def logPrefix(self):
117 return None
118
119 - def sendPacket(self, pingJob):
120 """Take a pingjob and send an ICMP packet for it""" 121 #### sockets with bad addresses fail 122 try: 123 pkt = icmp.Echo(self.procId, pingJob.sent, self.pktdata) 124 buf = icmp.assemble(pkt) 125 pingJob.start = time.time() 126 plog.debug("send icmp to '%s'", pingJob.ipaddr) 127 self.pingsocket.sendto(buf, (pingJob.ipaddr, 0)) 128 reactor.callLater(self.timeout, self.checkTimeout, pingJob) 129 pingJob.sent += 1 130 current = self.jobqueue.get(pingJob.ipaddr, None) 131 if current: 132 if pingJob.hostname != current.hostname: 133 raise IpConflict("Host %s and %s are both using ip %s" % 134 (pingJob.hostname, 135 current.hostname, 136 pingJob.ipaddr)) 137 self.jobqueue[pingJob.ipaddr] = pingJob 138 except (SystemExit, KeyboardInterrupt): raise 139 except Exception, e: 140 pingJob.rtt = -1 141 pingJob.message = "%s sendto error %s" % (pingJob.ipaddr, e) 142 self.reportPingJob(pingJob)
143 144
145 - def recvPackets(self):
146 """receive a packet and decode its header""" 147 while reactor.running: 148 try: 149 data, (host, port) = self.pingsocket.recvfrom(1024) 150 if not data: return 151 ipreply = ip.disassemble(data) 152 try: 153 icmppkt = icmp.disassemble(ipreply.data) 154 except ValueError: 155 plog.debug("checksum failure on packet %r", ipreply.data) 156 try: 157 icmppkt = icmp.disassemble(ipreply.data, 0) 158 except ValueError: 159 continue # probably Unknown type 160 except Exception, ex: 161 plog.debug("Unable to decode reply packet payload %s", ex) 162 continue 163 sip = ipreply.src 164 if (icmppkt.get_type() == icmp.ICMP_ECHOREPLY and 165 icmppkt.get_id() == self.procId and 166 self.jobqueue.has_key(sip)): 167 plog.debug("echo reply pkt %s %s", sip, icmppkt) 168 self.pingJobSucceed(self.jobqueue[sip]) 169 elif icmppkt.get_type() == icmp.ICMP_UNREACH: 170 try: 171 origpkt = icmppkt.get_embedded_ip() 172 dip = origpkt.dst 173 if (origpkt.data.find(self.pktdata) > -1 174 and self.jobqueue.has_key(dip)): 175 self.pingJobFail(self.jobqueue[dip]) 176 except ValueError, ex: 177 plog.warn("failed to parse host unreachable packet") 178 else: 179 plog.debug("unexpected pkt %s %s", sip, icmppkt) 180 except (SystemExit, KeyboardInterrupt): raise 181 except socket.error, err: 182 errnum, errmsg = err.args 183 if errnum == errno.EAGAIN: 184 return 185 raise err 186 except Exception, ex: 187 log.exception("receiving packet error: %s" % ex)
188 189
190 - def pingJobSucceed(self, pj):
191 """PingJob completed successfully. 192 """ 193 plog.debug("pj succeed for %s", pj.ipaddr) 194 pj.rtt = time.time() - pj.start 195 pj.message = "ip %s is up" % (pj.ipaddr) 196 self.reportPingJob(pj)
197 198
199 - def pingJobFail(self, pj):
200 """PingJob has failed remove from jobqueue. 201 """ 202 plog.debug("pj fail for %s", pj.ipaddr) 203 pj.rtt = -1 204 pj.message = "ip %s is down" % (pj.ipaddr) 205 self.reportPingJob(pj)
206 207
208 - def reportPingJob(self, pj):
209 try: 210 del self.jobqueue[pj.ipaddr] 211 except KeyError: 212 pass 213 # also free the deferred from further reporting 214 if pj.rtt < 0: 215 pj.deferred.errback(pj) 216 else: 217 pj.deferred.callback(pj)
218 219
220 - def checkTimeout(self, pj):
221 if self.jobqueue.has_key(pj.ipaddr): 222 now = time.time() 223 if now - pj.start > self.timeout: 224 if pj.sent >= self.tries: 225 plog.debug("pj timeout for %s", pj.ipaddr) 226 self.pingJobFail(pj) 227 else: 228 self.sendPacket(pj) 229 else: 230 plog.debug("calling checkTimeout needlessly for %s", pj.ipaddr)
231
232 - def jobCount(self):
233 return len(self.jobqueue)
234
235 - def ping(self, ip):
236 "Ping the ip and return the result in a deferred" 237 pj = PingJob(ip) 238 self.sendPacket(pj) 239 return pj.deferred
240 241
242 -def _printResults(results, start):
243 good = [pj for s, pj in results if s and pj.rtt >= 0] 244 bad = [pj for s, pj in results if s and pj.rtt < 0] 245 if good: print "Good ips: %s" % " ".join([g.ipaddr for g in good]) 246 if bad: print "Bad ips: %s" % " ".join([b.ipaddr for b in bad]) 247 print "Tested %d ips in %.1f seconds" % (len(results), time.time() - start) 248 reactor.stop()
249 250 if __name__ == "__main__": 251 ping = Ping() 252 logging.basicConfig() 253 log = logging.getLogger() 254 log.setLevel(10) 255 if len(sys.argv) > 1: targets = sys.argv[1:] 256 else: targets = ("127.0.0.1",) 257 lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True) 258 lst.addCallback(_printResults, time.time()) 259 reactor.run() 260