1
2
3
4
5
6
7
8
9
10
11
12
13
14 import sys
15 import os
16 import time
17 import socket
18 import ip
19 import icmp
20 import errno
21 import logging
22 log = logging.getLogger("zen.Ping")
23
24 from twisted.internet import reactor, defer
25 from twisted.spread import pb
26
28 """Not permitted to access resource."""
29
31 """Pinging two jobs simultaneously with different hostnames but the same IP"""
32
33 -class PingJob(pb.Copyable, pb.RemoteCopy):
34 """
35 Class representing a single target to be pinged.
36 """
37 - def __init__(self, ipaddr, hostname="", status=0, unused_cycle=60):
43
44
46 self.deferred = defer.Deferred()
47 self.rrt = 0
48 self.start = 0
49 self.sent = 0
50 self.message = ""
51 self.severity = 5
52 self.inprocess = False
53 self.pathcheck = 0
54 self.eventState = 0
55
56
58 if self.parent:
59 return self.parent.checkpath()
60
61
63 if self.parent:
64 return self.parent.routerpj()
65
66 pb.setUnjellyableForClass(PingJob, PingJob)
67
68
69 plog = logging.getLogger("zen.Ping")
71 """
72 Class that provides asyncronous icmp ping.
73 """
74
75 - def __init__(self, tries=2, timeout=2, sock=None):
81
82
86
87
89 """make an ICMP socket to use for sending and receiving pings"""
90 socketargs = socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP
91 if sock is None:
92 try:
93 s = socket
94 self.pingsocket = s.socket(*socketargs)
95 except socket.error, e:
96 err, msg = e.args
97 if err == errno.EACCES:
98 raise PermissionError("must be root to send icmp.")
99 raise e
100 else:
101 self.pingsocket = socket.fromfd(sock, *socketargs)
102 os.close(sock)
103 self.pingsocket.setblocking(0)
104 reactor.addReader(self)
105
107 return self.pingsocket.fileno()
108
111
113 reactor.removeReader(self)
114 self.pingsocket.close()
115
118
120 """Take a pingjob and send an ICMP packet for it"""
121
122 try:
123 pkt = icmp.Echo(self.procId, pingJob.sent, self.pktdata)
124 buf = icmp.assemble(pkt)
125 pingJob.start = time.time()
126 plog.debug("send icmp to '%s'", pingJob.ipaddr)
127 self.pingsocket.sendto(buf, (pingJob.ipaddr, 0))
128 reactor.callLater(self.timeout, self.checkTimeout, pingJob)
129 pingJob.sent += 1
130 current = self.jobqueue.get(pingJob.ipaddr, None)
131 if current:
132 if pingJob.hostname != current.hostname:
133 raise IpConflict("Host %s and %s are both using ip %s" %
134 (pingJob.hostname,
135 current.hostname,
136 pingJob.ipaddr))
137 self.jobqueue[pingJob.ipaddr] = pingJob
138 except (SystemExit, KeyboardInterrupt): raise
139 except Exception, e:
140 pingJob.rtt = -1
141 pingJob.message = "%s sendto error %s" % (pingJob.ipaddr, e)
142 self.reportPingJob(pingJob)
143
144
146 """receive a packet and decode its header"""
147 while reactor.running:
148 try:
149 data, (host, port) = self.pingsocket.recvfrom(1024)
150 if not data: return
151 ipreply = ip.disassemble(data)
152 try:
153 icmppkt = icmp.disassemble(ipreply.data)
154 except ValueError:
155 plog.debug("checksum failure on packet %r", ipreply.data)
156 try:
157 icmppkt = icmp.disassemble(ipreply.data, 0)
158 except ValueError:
159 continue
160 except Exception, ex:
161 plog.debug("Unable to decode reply packet payload %s", ex)
162 continue
163 sip = ipreply.src
164 if (icmppkt.get_type() == icmp.ICMP_ECHOREPLY and
165 icmppkt.get_id() == self.procId and
166 self.jobqueue.has_key(sip)):
167 plog.debug("echo reply pkt %s %s", sip, icmppkt)
168 self.pingJobSucceed(self.jobqueue[sip])
169 elif icmppkt.get_type() == icmp.ICMP_UNREACH:
170 try:
171 origpkt = icmppkt.get_embedded_ip()
172 dip = origpkt.dst
173 if (origpkt.data.find(self.pktdata) > -1
174 and self.jobqueue.has_key(dip)):
175 self.pingJobFail(self.jobqueue[dip])
176 except ValueError, ex:
177 plog.warn("failed to parse host unreachable packet")
178 else:
179 plog.debug("unexpected pkt %s %s", sip, icmppkt)
180 except (SystemExit, KeyboardInterrupt): raise
181 except socket.error, err:
182 errnum, errmsg = err.args
183 if errnum == errno.EAGAIN:
184 return
185 raise err
186 except Exception, ex:
187 log.exception("receiving packet error: %s" % ex)
188
189
191 """PingJob completed successfully.
192 """
193 plog.debug("pj succeed for %s", pj.ipaddr)
194 pj.rtt = time.time() - pj.start
195 pj.message = "ip %s is up" % (pj.ipaddr)
196 self.reportPingJob(pj)
197
198
200 """PingJob has failed remove from jobqueue.
201 """
202 plog.debug("pj fail for %s", pj.ipaddr)
203 pj.rtt = -1
204 pj.message = "ip %s is down" % (pj.ipaddr)
205 self.reportPingJob(pj)
206
207
209 try:
210 del self.jobqueue[pj.ipaddr]
211 except KeyError:
212 pass
213
214 if pj.rtt < 0:
215 pj.deferred.errback(pj)
216 else:
217 pj.deferred.callback(pj)
218
219
221 if self.jobqueue.has_key(pj.ipaddr):
222 now = time.time()
223 if now - pj.start > self.timeout:
224 if pj.sent >= self.tries:
225 plog.debug("pj timeout for %s", pj.ipaddr)
226 self.pingJobFail(pj)
227 else:
228 self.sendPacket(pj)
229 else:
230 plog.debug("calling checkTimeout needlessly for %s", pj.ipaddr)
231
233 return len(self.jobqueue)
234
235 - def ping(self, ip):
240
241
243 good = [pj for s, pj in results if s and pj.rtt >= 0]
244 bad = [pj for s, pj in results if s and pj.rtt < 0]
245 if good: print "Good ips: %s" % " ".join([g.ipaddr for g in good])
246 if bad: print "Bad ips: %s" % " ".join([b.ipaddr for b in bad])
247 print "Tested %d ips in %.1f seconds" % (len(results), time.time() - start)
248 reactor.stop()
249
250 if __name__ == "__main__":
251 ping = Ping()
252 logging.basicConfig()
253 log = logging.getLogger()
254 log.setLevel(10)
255 if len(sys.argv) > 1: targets = sys.argv[1:]
256 else: targets = ("127.0.0.1",)
257 lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True)
258 lst.addCallback(_printResults, time.time())
259 reactor.run()
260