Package ZenStatus :: Module AsyncPing
[hide private]
[frames] | no frames]

Source Code for Module ZenStatus.AsyncPing

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import sys 
 15  import os 
 16  import time 
 17  import socket 
 18  import ip 
 19  import icmp 
 20  import errno 
 21  import logging 
 22  log = logging.getLogger("zen.Ping") 
 23   
 24  from twisted.internet import reactor, defer 
 25  from twisted.spread import pb 
 26   
27 -class PermissionError(Exception):
28 """Not permitted to access resource."""
29
30 -class IpConflict(Exception):
31 """Pinging two jobs simultaneously with different hostnames but the same IP"""
32
33 -class PingJob(pb.Copyable, pb.RemoteCopy):
34 """ 35 Class representing a single target to be pinged. 36 """
37 - def __init__(self, ipaddr, hostname="", status=0, unused_cycle=60):
38 self.parent = False 39 self.ipaddr = ipaddr 40 self.hostname = hostname 41 self.status = status 42 self.reset()
43 44
45 - def reset(self):
46 self.deferred = defer.Deferred() 47 self.rrt = 0 48 self.start = 0 49 self.sent = 0 50 self.message = "" 51 self.severity = 5 52 self.inprocess = False 53 self.pathcheck = 0 54 self.eventState = 0
55 56
57 - def checkpath(self):
58 if self.parent: 59 return self.parent.checkpath()
60 61
62 - def routerpj(self):
63 if self.parent: 64 return self.parent.routerpj()
65 66 pb.setUnjellyableForClass(PingJob, PingJob) 67 68 69 plog = logging.getLogger("zen.Ping")
70 -class Ping(object):
71 """ 72 Class that provides asyncronous icmp ping. 73 """ 74
75 - def __init__(self, tries=2, timeout=2, sock=None):
76 self.tries = tries 77 self.timeout = timeout 78 self.procId = os.getpid() 79 self.jobqueue = {} 80 self.pktdata = 'zenping %s %s' % (socket.getfqdn(), self.procId) 81 self.createPingSocket(sock)
82 83
84 - def createPingSocket(self, sock):
85 """make an ICMP socket to use for sending and receiving pings""" 86 socketargs = socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP 87 if sock is None: 88 try: 89 s = socket 90 self.pingsocket = s.socket(*socketargs) 91 except socket.error, e: 92 err, msg = e.args 93 if err == errno.EACCES: 94 raise PermissionError("must be root to send icmp.") 95 raise e 96 else: 97 self.pingsocket = socket.fromfd(sock, *socketargs) 98 os.close(sock) 99 self.pingsocket.setblocking(0) 100 reactor.addReader(self)
101
102 - def fileno(self):
103 return self.pingsocket.fileno()
104
105 - def doRead(self):
106 self.recvPackets()
107
108 - def connectionLost(self, unused):
109 reactor.removeReader(self) 110 self.pingsocket.close()
111
112 - def logPrefix(self):
113 return None
114
115 - def sendPacket(self, pingJob):
116 """Take a pingjob and send an ICMP packet for it""" 117 #### sockets with bad addresses fail 118 try: 119 pkt = icmp.Echo(self.procId, pingJob.sent, self.pktdata) 120 buf = icmp.assemble(pkt) 121 pingJob.start = time.time() 122 plog.debug("send icmp to '%s'", pingJob.ipaddr) 123 self.pingsocket.sendto(buf, (pingJob.ipaddr, 0)) 124 reactor.callLater(self.timeout, self.checkTimeout, pingJob) 125 pingJob.sent += 1 126 current = self.jobqueue.get(pingJob.ipaddr, None) 127 if current: 128 if pingJob.hostname != current.hostname: 129 raise IpConflict("Host %s and %s are both using ip %s" % 130 (pingJob.hostname, 131 current.hostname, 132 pingJob.ipaddr)) 133 self.jobqueue[pingJob.ipaddr] = pingJob 134 except (SystemExit, KeyboardInterrupt): raise 135 except Exception, e: 136 pingJob.rtt = -1 137 pingJob.message = "%s sendto error %s" % (pingJob.ipaddr, e) 138 self.reportPingJob(pingJob)
139 140
141 - def recvPackets(self):
142 """receive a packet and decode its header""" 143 while reactor.running: 144 try: 145 data, (host, port) = self.pingsocket.recvfrom(1024) 146 if not data: return 147 ipreply = ip.disassemble(data) 148 try: 149 icmppkt = icmp.disassemble(ipreply.data) 150 except ValueError: 151 plog.debug("checksum failure on packet %r", ipreply.data) 152 try: 153 icmppkt = icmp.disassemble(ipreply.data, 0) 154 except ValueError: 155 continue # probably Unknown type 156 except Exception, ex: 157 plog.debug("Unable to decode reply packet payload %s", ex) 158 continue 159 sip = ipreply.src 160 if (icmppkt.get_type() == icmp.ICMP_ECHOREPLY and 161 icmppkt.get_id() == self.procId and 162 self.jobqueue.has_key(sip)): 163 plog.debug("echo reply pkt %s %s", sip, icmppkt) 164 self.pingJobSucceed(self.jobqueue[sip]) 165 elif icmppkt.get_type() == icmp.ICMP_UNREACH: 166 try: 167 origpkt = icmppkt.get_embedded_ip() 168 dip = origpkt.dst 169 if (origpkt.data.find(self.pktdata) > -1 170 and self.jobqueue.has_key(dip)): 171 self.pingJobFail(self.jobqueue[dip]) 172 except ValueError, ex: 173 plog.warn("failed to parse host unreachable packet") 174 else: 175 plog.debug("unexpected pkt %s %s", sip, icmppkt) 176 except (SystemExit, KeyboardInterrupt): raise 177 except socket.error, err: 178 errnum, errmsg = err.args 179 if errnum == errno.EAGAIN: 180 return 181 raise err 182 except Exception, ex: 183 log.exception("receiving packet error: %s" % ex)
184 185
186 - def pingJobSucceed(self, pj):
187 """PingJob completed successfully. 188 """ 189 plog.debug("pj succeed for %s", pj.ipaddr) 190 pj.rtt = time.time() - pj.start 191 pj.message = "ip %s is up" % (pj.ipaddr) 192 self.reportPingJob(pj)
193 194
195 - def pingJobFail(self, pj):
196 """PingJob has failed remove from jobqueue. 197 """ 198 plog.debug("pj fail for %s", pj.ipaddr) 199 pj.rtt = -1 200 pj.message = "ip %s is down" % (pj.ipaddr) 201 self.reportPingJob(pj)
202 203
204 - def reportPingJob(self, pj):
205 try: 206 del self.jobqueue[pj.ipaddr] 207 except KeyError: 208 pass 209 # also free the deferred from further reporting 210 if pj.rtt < 0: 211 pj.deferred.errback(pj) 212 else: 213 pj.deferred.callback(pj)
214 215
216 - def checkTimeout(self, pj):
217 if self.jobqueue.has_key(pj.ipaddr): 218 now = time.time() 219 if now - pj.start > self.timeout: 220 if pj.sent >= self.tries: 221 plog.debug("pj timeout for %s", pj.ipaddr) 222 self.pingJobFail(pj) 223 else: 224 self.sendPacket(pj) 225 else: 226 plog.debug("calling checkTimeout needlessly for %s", pj.ipaddr)
227
228 - def jobCount(self):
229 return len(self.jobqueue)
230
231 - def ping(self, ip):
232 "Ping the ip and return the result in a deferred" 233 pj = PingJob(ip) 234 self.sendPacket(pj) 235 return pj.deferred
236 237
238 -def _printResults(results, start):
239 good = [pj for s, pj in results if s and pj.rtt >= 0] 240 bad = [pj for s, pj in results if s and pj.rtt < 0] 241 if good: print "Good ips: %s" % " ".join([g.ipaddr for g in good]) 242 if bad: print "Bad ips: %s" % " ".join([b.ipaddr for b in bad]) 243 print "Tested %d ips in %.1f seconds" % (len(results), time.time() - start) 244 reactor.stop()
245 246 if __name__ == "__main__": 247 ping = Ping() 248 logging.basicConfig() 249 log = logging.getLogger() 250 log.setLevel(10) 251 if len(sys.argv) > 1: targets = sys.argv[1:] 252 else: targets = ("127.0.0.1",) 253 lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True) 254 lst.addCallback(_printResults, time.time()) 255 reactor.run() 256