1
2
3
4
5
6
7
8
9
10
11
12
13
14 import sys
15 import os
16 import types
17 import time
18 import select
19 import socket
20 import ip
21 import icmp
22 import errno
23 import pprint
24 import logging
25 log = logging.getLogger("zen.Ping")
26
27 from twisted.internet import reactor, defer
28 from sets import Set
29
31 """Not permitted to access resource."""
32
34 """Pinging two jobs simultaneously with different hostnames but the same IP"""
35
36
38 """
39 Class representing a single target to be pinged.
40 """
41 - def __init__(self, ipaddr, hostname="", status=0, unused_cycle=60):
47
48
50 self.deferred = defer.Deferred()
51 self.rrt = 0
52 self.start = 0
53 self.sent = 0
54 self.message = ""
55 self.severity = 5
56 self.inprocess = False
57 self.pathcheck = 0
58 self.eventState = 0
59
60
62 if self.parent:
63 return self.parent.checkpath()
64
65
67 if self.parent:
68 return self.parent.routerpj()
69
70
71 plog = logging.getLogger("zen.Ping")
73 """
74 Class that provides asyncronous icmp ping.
75 """
76
84
85
87 """make an ICMP socket to use for sending and receiving pings"""
88 socketargs = socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP
89 if sock is None:
90 try:
91 s = socket
92 self.pingsocket = s.socket(*socketargs)
93 except socket.error, e:
94 err, msg = e.args
95 if err == errno.EACCES:
96 raise PermissionError("must be root to send icmp.")
97 raise e
98 else:
99 self.pingsocket = socket.fromfd(sock, *socketargs)
100 os.close(sock)
101 self.pingsocket.setblocking(0)
102 reactor.addReader(self)
103
105 return self.pingsocket.fileno()
106
109
111 reactor.removeReader(self)
112 self.pingsocket.close()
113
116
118 """Take a pingjob and send an ICMP packet for it"""
119
120 try:
121 pkt = icmp.Echo(self.procId, pingJob.sent, self.pktdata)
122 buf = icmp.assemble(pkt)
123 pingJob.start = time.time()
124 plog.debug("send icmp to '%s'", pingJob.ipaddr)
125 self.pingsocket.sendto(buf, (pingJob.ipaddr, 0))
126 reactor.callLater(self.timeout, self.checkTimeout, pingJob)
127 pingJob.sent += 1
128 current = self.jobqueue.get(pingJob.ipaddr, None)
129 if current:
130 if pingJob.hostname != current.hostname:
131 raise IpConflict("Host %s and %s are both using ip %s" %
132 (pingJob.hostname,
133 current.hostname,
134 pingJob.ipaddr))
135 self.jobqueue[pingJob.ipaddr] = pingJob
136 except (SystemExit, KeyboardInterrupt): raise
137 except Exception, e:
138 pingJob.rtt = -1
139 pingJob.message = "%s sendto error %s" % (pingJob.ipaddr, e)
140 self.reportPingJob(pingJob)
141
142
144 """receive a packet and decode its header"""
145 while 1:
146 try:
147 data, (host, port) = self.pingsocket.recvfrom(1024)
148 if not data: return
149 ipreply = ip.disassemble(data)
150 try:
151 icmppkt = icmp.disassemble(ipreply.data)
152 except ValueError:
153 plog.debug("checksum failure on packet %r", ipreply.data)
154 icmppkt = icmp.disassemble(ipreply.data, 0)
155 sip = ipreply.src
156 if (icmppkt.get_type() == icmp.ICMP_ECHOREPLY and
157 icmppkt.get_id() == self.procId and
158 self.jobqueue.has_key(sip)):
159 plog.debug("echo reply pkt %s %s", sip, icmppkt)
160 self.pingJobSucceed(self.jobqueue[sip])
161 elif icmppkt.get_type() == icmp.ICMP_UNREACH:
162 try:
163 origpkt = icmppkt.get_embedded_ip()
164 dip = origpkt.dst
165 if (origpkt.data.find(self.pktdata) > -1
166 and self.jobqueue.has_key(dip)):
167 self.pingJobFail(self.jobqueue[dip])
168 except ValueError, ex:
169 plog.warn("failed to parse host unreachable packet")
170 else:
171 plog.debug("unexpected pkt %s %s", sip, icmppkt)
172 except (SystemExit, KeyboardInterrupt): raise
173 except socket.error, err:
174 err, errmsg = err.args
175 if err == errno.EAGAIN:
176 return
177 raise err
178 except Exception, ex:
179 log.exception("receiving packet error: %s" % ex)
180
181
183 """PingJob completed successfully.
184 """
185 plog.debug("pj succeed for %s", pj.ipaddr)
186 pj.rtt = time.time() - pj.start
187 pj.message = "%s ip %s is up" % (pj.hostname, pj.ipaddr)
188 self.reportPingJob(pj)
189
190
192 """PingJob has failed remove from jobqueue.
193 """
194 plog.debug("pj fail for %s", pj.ipaddr)
195 pj.rtt = -1
196 pj.message = "%s ip %s is down" % (pj.hostname, pj.ipaddr)
197 self.reportPingJob(pj)
198
199
201 try:
202 del self.jobqueue[pj.ipaddr]
203 except KeyError:
204 pass
205
206 if pj.rtt < 0:
207 pj.deferred.errback(pj)
208 else:
209 pj.deferred.callback(pj)
210
211
223
225 return len(self.jobqueue)
226
227 - def ping(self, ip):
232
233
235 good = [pj for s, pj in results if s and pj.rtt >= 0]
236 bad = [pj for s, pj in results if s and pj.rtt < 0]
237 if good: print "Good ips: %s" % " ".join([g.ipaddr for g in good])
238 if bad: print "Bad ips: %s" % " ".join([b.ipaddr for b in bad])
239 print "Tested %d ips in %.1f seconds" % (len(results), time.time() - start)
240 reactor.stop()
241
242 if __name__ == "__main__":
243 ping = Ping()
244 logging.basicConfig()
245 log = logging.getLogger()
246 log.setLevel(10)
247 if len(sys.argv) > 1: targets = sys.argv[1:]
248 else: targets = ("127.0.0.1",)
249 lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True)
250 lst.addCallback(_printResults, time.time())
251 reactor.run()
252