1
2
3
4
5
6
7
8
9
10
11
12
13
14 import sys
15 import os
16 import time
17 import socket
18 import ip
19 import icmp
20 import errno
21 import logging
22 log = logging.getLogger("zen.Ping")
23
24 from twisted.internet import reactor, defer
25 from twisted.spread import pb
26
28 """Not permitted to access resource."""
29
31 """Pinging two jobs simultaneously with different hostnames but the same IP"""
32
33 -class PingJob(pb.Copyable, pb.RemoteCopy):
34 """
35 Class representing a single target to be pinged.
36 """
37 - def __init__(self, ipaddr, hostname="", status=0, unused_cycle=60):
43
44
46 self.deferred = defer.Deferred()
47 self.rrt = 0
48 self.start = 0
49 self.sent = 0
50 self.message = ""
51 self.severity = 5
52 self.inprocess = False
53 self.pathcheck = 0
54 self.eventState = 0
55
56
58 if self.parent:
59 return self.parent.checkpath()
60
61
63 if self.parent:
64 return self.parent.routerpj()
65
66 pb.setUnjellyableForClass(PingJob, PingJob)
67
68
69 plog = logging.getLogger("zen.Ping")
71 """
72 Class that provides asyncronous icmp ping.
73 """
74
75 - def __init__(self, tries=2, timeout=2, sock=None):
76 self.tries = tries
77 self.timeout = timeout
78 self.procId = os.getpid()
79 self.jobqueue = {}
80 self.pktdata = 'zenping %s %s' % (socket.getfqdn(), self.procId)
81 self.createPingSocket(sock)
82
83
85 """make an ICMP socket to use for sending and receiving pings"""
86 socketargs = socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP
87 if sock is None:
88 try:
89 s = socket
90 self.pingsocket = s.socket(*socketargs)
91 except socket.error, e:
92 err, msg = e.args
93 if err == errno.EACCES:
94 raise PermissionError("must be root to send icmp.")
95 raise e
96 else:
97 self.pingsocket = socket.fromfd(sock, *socketargs)
98 os.close(sock)
99 self.pingsocket.setblocking(0)
100 reactor.addReader(self)
101
103 return self.pingsocket.fileno()
104
107
109 reactor.removeReader(self)
110 self.pingsocket.close()
111
114
116 """Take a pingjob and send an ICMP packet for it"""
117
118 try:
119 pkt = icmp.Echo(self.procId, pingJob.sent, self.pktdata)
120 buf = icmp.assemble(pkt)
121 pingJob.start = time.time()
122 plog.debug("send icmp to '%s'", pingJob.ipaddr)
123 self.pingsocket.sendto(buf, (pingJob.ipaddr, 0))
124 reactor.callLater(self.timeout, self.checkTimeout, pingJob)
125 pingJob.sent += 1
126 current = self.jobqueue.get(pingJob.ipaddr, None)
127 if current:
128 if pingJob.hostname != current.hostname:
129 raise IpConflict("Host %s and %s are both using ip %s" %
130 (pingJob.hostname,
131 current.hostname,
132 pingJob.ipaddr))
133 self.jobqueue[pingJob.ipaddr] = pingJob
134 except (SystemExit, KeyboardInterrupt): raise
135 except Exception, e:
136 pingJob.rtt = -1
137 pingJob.message = "%s sendto error %s" % (pingJob.ipaddr, e)
138 self.reportPingJob(pingJob)
139
140
142 """receive a packet and decode its header"""
143 while reactor.running:
144 try:
145 data, (host, port) = self.pingsocket.recvfrom(1024)
146 if not data: return
147 ipreply = ip.disassemble(data)
148 try:
149 icmppkt = icmp.disassemble(ipreply.data)
150 except ValueError:
151 plog.debug("checksum failure on packet %r", ipreply.data)
152 try:
153 icmppkt = icmp.disassemble(ipreply.data, 0)
154 except ValueError:
155 continue
156 except Exception, ex:
157 plog.debug("Unable to decode reply packet payload %s", ex)
158 continue
159 sip = ipreply.src
160 if (icmppkt.get_type() == icmp.ICMP_ECHOREPLY and
161 icmppkt.get_id() == self.procId and
162 self.jobqueue.has_key(sip)):
163 plog.debug("echo reply pkt %s %s", sip, icmppkt)
164 self.pingJobSucceed(self.jobqueue[sip])
165 elif icmppkt.get_type() == icmp.ICMP_UNREACH:
166 try:
167 origpkt = icmppkt.get_embedded_ip()
168 dip = origpkt.dst
169 if (origpkt.data.find(self.pktdata) > -1
170 and self.jobqueue.has_key(dip)):
171 self.pingJobFail(self.jobqueue[dip])
172 except ValueError, ex:
173 plog.warn("failed to parse host unreachable packet")
174 else:
175 plog.debug("unexpected pkt %s %s", sip, icmppkt)
176 except (SystemExit, KeyboardInterrupt): raise
177 except socket.error, err:
178 errnum, errmsg = err.args
179 if errnum == errno.EAGAIN:
180 return
181 raise err
182 except Exception, ex:
183 log.exception("receiving packet error: %s" % ex)
184
185
187 """PingJob completed successfully.
188 """
189 plog.debug("pj succeed for %s", pj.ipaddr)
190 pj.rtt = time.time() - pj.start
191 pj.message = "ip %s is up" % (pj.ipaddr)
192 self.reportPingJob(pj)
193
194
196 """PingJob has failed remove from jobqueue.
197 """
198 plog.debug("pj fail for %s", pj.ipaddr)
199 pj.rtt = -1
200 pj.message = "ip %s is down" % (pj.ipaddr)
201 self.reportPingJob(pj)
202
203
205 try:
206 del self.jobqueue[pj.ipaddr]
207 except KeyError:
208 pass
209
210 if pj.rtt < 0:
211 pj.deferred.errback(pj)
212 else:
213 pj.deferred.callback(pj)
214
215
217 if self.jobqueue.has_key(pj.ipaddr):
218 now = time.time()
219 if now - pj.start > self.timeout:
220 if pj.sent >= self.tries:
221 plog.debug("pj timeout for %s", pj.ipaddr)
222 self.pingJobFail(pj)
223 else:
224 self.sendPacket(pj)
225 else:
226 plog.debug("calling checkTimeout needlessly for %s", pj.ipaddr)
227
229 return len(self.jobqueue)
230
231 - def ping(self, ip):
236
237
239 good = [pj for s, pj in results if s and pj.rtt >= 0]
240 bad = [pj for s, pj in results if s and pj.rtt < 0]
241 if good: print "Good ips: %s" % " ".join([g.ipaddr for g in good])
242 if bad: print "Bad ips: %s" % " ".join([b.ipaddr for b in bad])
243 print "Tested %d ips in %.1f seconds" % (len(results), time.time() - start)
244 reactor.stop()
245
246 if __name__ == "__main__":
247 ping = Ping()
248 logging.basicConfig()
249 log = logging.getLogger()
250 log.setLevel(10)
251 if len(sys.argv) > 1: targets = sys.argv[1:]
252 else: targets = ("127.0.0.1",)
253 lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True)
254 lst.addCallback(_printResults, time.time())
255 reactor.run()
256