Package ZenStatus :: Module AsyncPing
[hide private]
[frames] | no frames]

Source Code for Module ZenStatus.AsyncPing

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  import sys 
 15  import os 
 16  import types 
 17  import time 
 18  import select 
 19  import socket 
 20  import ip 
 21  import icmp 
 22  import errno 
 23  import pprint 
 24  import logging 
 25  log = logging.getLogger("zen.Ping") 
 26   
 27  from twisted.internet import reactor, defer 
 28  from sets import Set 
 29   
30 -class PermissionError(Exception):
31 """Not permitted to access resource."""
32
33 -class IpConflict(Exception):
34 """Pinging two jobs simultaneously with different hostnames but the same IP"""
35 36
37 -class PingJob:
38 """ 39 Class representing a single target to be pinged. 40 """
41 - def __init__(self, ipaddr, hostname="", status=0, unused_cycle=60):
42 self.parent = False 43 self.ipaddr = ipaddr 44 self.hostname = hostname 45 self.status = status 46 self.reset()
47 48
49 - def reset(self):
50 self.deferred = defer.Deferred() 51 self.rrt = 0 52 self.start = 0 53 self.sent = 0 54 self.message = "" 55 self.severity = 5 56 self.inprocess = False 57 self.pathcheck = 0 58 self.eventState = 0
59 60
61 - def checkpath(self):
62 if self.parent: 63 return self.parent.checkpath()
64 65
66 - def routerpj(self):
67 if self.parent: 68 return self.parent.routerpj()
69 70 71 plog = logging.getLogger("zen.Ping")
72 -class Ping(object):
73 """ 74 Class that provides asyncronous icmp ping. 75 """ 76
77 - def __init__(self, tries=2, timeout=2, sock=None):
78 self.tries = tries 79 self.timeout = timeout 80 self.procId = os.getpid() 81 self.jobqueue = {} 82 self.pktdata = 'zenping %s %s' % (socket.getfqdn(), self.procId) 83 self.createPingSocket(sock)
84 85
86 - def createPingSocket(self, sock):
87 """make an ICMP socket to use for sending and receiving pings""" 88 socketargs = socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP 89 if sock is None: 90 try: 91 s = socket 92 self.pingsocket = s.socket(*socketargs) 93 except socket.error, e: 94 err, msg = e.args 95 if err == errno.EACCES: 96 raise PermissionError("must be root to send icmp.") 97 raise e 98 else: 99 self.pingsocket = socket.fromfd(sock, *socketargs) 100 os.close(sock) 101 self.pingsocket.setblocking(0) 102 reactor.addReader(self)
103
104 - def fileno(self):
105 return self.pingsocket.fileno()
106
107 - def doRead(self):
108 self.recvPackets()
109
110 - def connectionLost(self, unused):
111 reactor.removeReader(self) 112 self.pingsocket.close()
113
114 - def logPrefix(self):
115 return None
116
117 - def sendPacket(self, pingJob):
118 """Take a pingjob and send an ICMP packet for it""" 119 #### sockets with bad addresses fail 120 try: 121 pkt = icmp.Echo(self.procId, pingJob.sent, self.pktdata) 122 buf = icmp.assemble(pkt) 123 pingJob.start = time.time() 124 plog.debug("send icmp to '%s'", pingJob.ipaddr) 125 self.pingsocket.sendto(buf, (pingJob.ipaddr, 0)) 126 reactor.callLater(self.timeout, self.checkTimeout, pingJob) 127 pingJob.sent += 1 128 current = self.jobqueue.get(pingJob.ipaddr, None) 129 if current: 130 if pingJob.hostname != current.hostname: 131 raise IpConflict("Host %s and %s are both using ip %s" % 132 (pingJob.hostname, 133 current.hostname, 134 pingJob.ipaddr)) 135 self.jobqueue[pingJob.ipaddr] = pingJob 136 except (SystemExit, KeyboardInterrupt): raise 137 except Exception, e: 138 pingJob.rtt = -1 139 pingJob.message = "%s sendto error %s" % (pingJob.ipaddr, e) 140 self.reportPingJob(pingJob)
141 142
143 - def recvPackets(self):
144 """receive a packet and decode its header""" 145 while 1: 146 try: 147 data, (host, port) = self.pingsocket.recvfrom(1024) 148 if not data: return 149 ipreply = ip.disassemble(data) 150 try: 151 icmppkt = icmp.disassemble(ipreply.data) 152 except ValueError: 153 plog.debug("checksum failure on packet %r", ipreply.data) 154 icmppkt = icmp.disassemble(ipreply.data, 0) 155 sip = ipreply.src 156 if (icmppkt.get_type() == icmp.ICMP_ECHOREPLY and 157 icmppkt.get_id() == self.procId and 158 self.jobqueue.has_key(sip)): 159 plog.debug("echo reply pkt %s %s", sip, icmppkt) 160 self.pingJobSucceed(self.jobqueue[sip]) 161 elif icmppkt.get_type() == icmp.ICMP_UNREACH: 162 try: 163 origpkt = icmppkt.get_embedded_ip() 164 dip = origpkt.dst 165 if (origpkt.data.find(self.pktdata) > -1 166 and self.jobqueue.has_key(dip)): 167 self.pingJobFail(self.jobqueue[dip]) 168 except ValueError, ex: 169 plog.warn("failed to parse host unreachable packet") 170 else: 171 plog.debug("unexpected pkt %s %s", sip, icmppkt) 172 except (SystemExit, KeyboardInterrupt): raise 173 except socket.error, err: 174 err, errmsg = err.args 175 if err == errno.EAGAIN: 176 return 177 raise err 178 except Exception, ex: 179 log.exception("receiving packet error: %s" % ex)
180 181
182 - def pingJobSucceed(self, pj):
183 """PingJob completed successfully. 184 """ 185 plog.debug("pj succeed for %s", pj.ipaddr) 186 pj.rtt = time.time() - pj.start 187 pj.message = "%s ip %s is up" % (pj.hostname, pj.ipaddr) 188 self.reportPingJob(pj)
189 190
191 - def pingJobFail(self, pj):
192 """PingJob has failed remove from jobqueue. 193 """ 194 plog.debug("pj fail for %s", pj.ipaddr) 195 pj.rtt = -1 196 pj.message = "%s ip %s is down" % (pj.hostname, pj.ipaddr) 197 self.reportPingJob(pj)
198 199
200 - def reportPingJob(self, pj):
201 try: 202 del self.jobqueue[pj.ipaddr] 203 except KeyError: 204 pass 205 # also free the deferred from further reporting 206 if pj.rtt < 0: 207 pj.deferred.errback(pj) 208 else: 209 pj.deferred.callback(pj)
210 211
212 - def checkTimeout(self, pj):
213 if self.jobqueue.has_key(pj.ipaddr): 214 now = time.time() 215 if now - pj.start > self.timeout: 216 if pj.sent >= self.tries: 217 plog.debug("pj timeout for %s", pj.ipaddr) 218 self.pingJobFail(pj) 219 else: 220 self.sendPacket(pj) 221 else: 222 plog.debug("calling checkTimeout needlessly for %s", pj.ipaddr)
223
224 - def jobCount(self):
225 return len(self.jobqueue)
226
227 - def ping(self, ip):
228 "Ping the ip and return the result in a deferred" 229 pj = PingJob(ip) 230 self.sendPacket(pj) 231 return pj.deferred
232 233
234 -def _printResults(results, start):
235 good = [pj for s, pj in results if s and pj.rtt >= 0] 236 bad = [pj for s, pj in results if s and pj.rtt < 0] 237 if good: print "Good ips: %s" % " ".join([g.ipaddr for g in good]) 238 if bad: print "Bad ips: %s" % " ".join([b.ipaddr for b in bad]) 239 print "Tested %d ips in %.1f seconds" % (len(results), time.time() - start) 240 reactor.stop()
241 242 if __name__ == "__main__": 243 ping = Ping() 244 logging.basicConfig() 245 log = logging.getLogger() 246 log.setLevel(10) 247 if len(sys.argv) > 1: targets = sys.argv[1:] 248 else: targets = ("127.0.0.1",) 249 lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True) 250 lst.addCallback(_printResults, time.time()) 251 reactor.run() 252