如何利用Python实现快速Ping一个IP网段地址?

描述

ping 命令是我们检查网络中最常用的命令,作为网络人员,基本上每天都会用到,可以很好地帮助我们分析和判定网络故障;如果有 10 设备,100 台设备,1000 台设备怎么办?一个个 ping 过去人都要疯掉了,这种情况在大型网络中我们有可能遇到,那怎么办呢?我们今天来看下如何用 python 来实现批量 ping 测试主机。代码如下:

#!/usr/bin/python3# -*- coding: utf-8 -*-import os

import argparse

import socket

import struct

import select

import time

ICMP_ECHO_REQUEST = 8 # Platform specific

DEFAULT_TIMEOUT = 0.1

DEFAULT_COUNT = 4class Pinger(object):

“”“ Pings to a host -- the Pythonic way”“”

def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):

self.target_host = target_host

self.count = count

self.timeout = timeout

def do_checksum(self, source_string):

“”“ Verify the packet integritity ”“”

sum = 0

max_count = (len(source_string)/2)*2

count = 0

while count 《 max_count:

val = source_string[count + 1]*256 + source_string[count]

sum = sum + val

sum = sum & 0xffffffff

count = count + 2

if max_count《len(source_string):

sum = sum + ord(source_string[len(source_string) - 1])

sum = sum & 0xffffffff

sum = (sum 》》 16) + (sum & 0xffff)

sum = sum + (sum 》》 16)

answer = ~sum

answer = answer & 0xffff

answer = answer 》》 8 | (answer 《《 8 & 0xff00)

return answer

def receive_pong(self, sock, ID, timeout):

“”“

Receive ping from the socket.

”“”

time_remaining = timeout

while True:

start_time = time.time()

readable = select.select([sock], [], [], time_remaining)

time_spent = (time.time() - start_time)

if readable[0] == []: # Timeout

return

time_received = time.time()

recv_packet, addr = sock.recvfrom(1024)

icmp_header = recv_packet[20:28]

type, code, checksum, packet_ID, sequence = struct.unpack(

“bbHHh”, icmp_header

if packet_ID == ID:

bytes_In_double = struct.calcsize(“d”)

time_sent = struct.unpack(“d”, recv_packet[28:28 + bytes_In_double])[0]

return time_received - time_sent

time_remaining = time_remaining - time_spent

if time_remaining 《= 0:

return

def send_ping(self, sock, ID):

“”“

Send ping to the target host

”“”

target_addr = socket.gethostbyname(self.target_host)

my_checksum = 0

# Create a dummy heder with a 0 checksum.

header = struct.pack(“bbHHh”, ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)

bytes_In_double = struct.calcsize(“d”)

data = (192 - bytes_In_double) * “Q”

data = struct.pack(“d”, time.time()) + bytes(data.encode(‘utf-8’))

# Get the checksum on the data and the dummy header.

my_checksum = self.do_checksum(header + data)

header = struct.pack(

“bbHHh”, ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1

packet = header + data

sock.sendto(packet, (target_addr, 1))

def ping_once(self):

“”“

Returns the delay (in seconds) or none on timeout.

”“”

icmp = socket.getprotobyname(“icmp”)

try:

sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)

except socket.error as e:

if e.errno == 1:

# Not superuser, so operation not permitted

e.msg += “ICMP messages can only be sent from root user processes”

raise socket.error(e.msg)

except Exception as e:

print(“Exception: %s” %(e))

my_ID = os.getpid() & 0xFFFF

self.send_ping(sock, my_ID)

delay = self.receive_pong(sock, my_ID, self.timeout)

sock.close()

return delay

def ping(self):

“”“

Run the ping process

”“”

for i in range(self.count):

print (“Ping to %s.。。” % self.target_host,)

try:

delay = self.ping_once()

except socket.gaierror as e:

print (“Ping failed. (socket error: ‘%s’)” % e[1])

break

if delay == None:

print (“Ping failed. (timeout within %ssec.)” % self.timeout)

else:

delay = delay * 1000

print(“Get pong in %0.4fms” % delay)

if __name__ == ‘__main__’:

alive = []

host_prefix = ‘192.168.242.’

for i in range(1, 255):

host = host_prefix + str(i)

pinger = Pinger(target_host=host)

delay = pinger.ping_once()

if delay == None:

print(“Ping %s 失败,超时2秒” % host)

else:

print(“ping %s = %s ms” % (host, round(delay * 1000, 4)))

alive.append(host)

# time.sleep(0.5)

测试如下:

代码

原文链接:www.yjsec.com/2020/11/07
编辑:jq

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分