-
-
Save sourceperl/10288663 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# ping a list of host with threads for increase speed | |
# use standard linux /bin/ping utility | |
from threading import Thread | |
import subprocess | |
try: | |
import queue | |
except ImportError: | |
import Queue as queue | |
import re | |
# some global vars | |
num_threads = 15 | |
ips_q = queue.Queue() | |
out_q = queue.Queue() | |
# build IP array | |
ips = [] | |
for i in range(1,200): | |
ips.append("192.168.0."+str(i)) | |
# thread code : wraps system ping command | |
def thread_pinger(i, q): | |
"""Pings hosts in queue""" | |
while True: | |
# get an IP item form queue | |
ip = q.get() | |
# ping it | |
args=['/bin/ping', '-c', '1', '-W', '1', str(ip)] | |
p_ping = subprocess.Popen(args, | |
shell=False, | |
stdout=subprocess.PIPE) | |
# save ping stdout | |
p_ping_out = str(p_ping.communicate()[0]) | |
if (p_ping.wait() == 0): | |
# rtt min/avg/max/mdev = 22.293/22.293/22.293/0.000 ms | |
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms', | |
p_ping_out, re.M|re.I) | |
ping_rtt = search.group(2) | |
out_q.put("OK " + str(ip) + " rtt= "+ ping_rtt) | |
# update queue : this ip is processed | |
q.task_done() | |
# start the thread pool | |
for i in range(num_threads): | |
worker = Thread(target=thread_pinger, args=(i, ips_q)) | |
worker.setDaemon(True) | |
worker.start() | |
# fill queue | |
for ip in ips: | |
ips_q.put(ip) | |
# wait until worker threads are done to exit | |
ips_q.join() | |
# print result | |
while True: | |
try: | |
msg = out_q.get_nowait() | |
except queue.Empty: | |
break | |
print(msg) |
#!/usr/bin/env python | |
# ping a list of host with threads for increase speed | |
# design to use data from/to SQL database | |
# use standard linux /bin/ping utility | |
from threading import Thread | |
import mysql.connector | |
import subprocess | |
try: | |
import queue | |
except ImportError: | |
import Queue as queue | |
import time | |
import re | |
# some global vars | |
num_threads = 30 | |
ips_q = queue.Queue() | |
out_q = queue.Queue() | |
# thread code : wraps system ping command | |
def thread_pinger(i, q): | |
"""Pings hosts in queue""" | |
while True: | |
# get an IP item form queue | |
item = q.get() | |
# ping it | |
args=['/bin/ping', '-c', '1', '-W', str(item['timeout']), | |
str(item['ip'])] | |
p_ping = subprocess.Popen(args, | |
shell=False, | |
stdout=subprocess.PIPE) | |
# save ping stdout | |
p_ping_out = str(p_ping.communicate()[0]) | |
# ping return 0 if up | |
if (p_ping.wait() == 0): | |
# rtt min/avg/max/mdev = 22.293/22.293/22.293/0.000 ms | |
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms', | |
p_ping_out, re.M|re.I) | |
item['up'] = True | |
item['rtt'] = search.group(2) | |
else: | |
item['up'] = False | |
# update output queue | |
out_q.put(item) | |
# update queue : this ip is processed | |
q.task_done() | |
# start the thread pool | |
for i in range(num_threads): | |
worker = Thread(target=thread_pinger, args=(i, ips_q)) | |
worker.setDaemon(True) | |
worker.start() | |
# build IP array | |
ips = [] | |
for i in range(1,200): | |
ips.append("192.168.0."+str(i)) | |
# main loop | |
while True: | |
# retreive data from DB | |
# add SQL here | |
# test start time | |
start = time.time() | |
# fill queue | |
for ip in ips: | |
ips_q.put({'ip': ip, 'timeout': 1}) | |
# wait until worker threads are done to exit | |
ips_q.join() | |
# display result | |
print("next:") | |
while True: | |
try: | |
msg = out_q.get_nowait() | |
except queue.Empty: | |
break | |
if msg['up']: | |
print(msg) | |
# test start end | |
end = time.time() | |
loop_time = round(end - start, 2) | |
print("loop time: %s" % (loop_time)) | |
# update DB | |
#add SQL here | |
# wait 5s before next cycle | |
time.sleep(5.0) |
Any idea where to find Queue package?
Queue is in the Python standard library. In Python 3, module name is now "queue" instead of "Queue" in Python 2.
"th_pinger.py" is an older script which initially run only on Python 2. I've juste update this gist to be able to run on both python 2 and 3.
Hello, I want save output into .txt file but it is still empty .Do zou have any idea how to do it ? Pinged.txt file is still empty
# save ping stdout
p_ping_out = str(p_ping.communicate()[0])
f = open("/home/pavollorenc/CODE/PYTHON/subnet_ping/pinged.txt", "a")
f.write(str(ip) + '\t' + 'UP' + '\n')
f.close
if (p_ping.wait() == 0):
# rtt min/avg/max/mdev = 22.293/22.293/22.293/0.000 ms
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms',
p_ping_out, re.M|re.I)
ping_rtt = search.group(2)
out_q.put("OK " + str(ip) + " rtt= "+ ping_rtt)
# update queue : this ip is processed
q.task_done()
will this work on windows?
no, it will not work on windows
hello, how would i go about modifying it for windows?
also, what is the difference in using queue here rather than async?
thanks
Thanks for this script @sourceperl
For those asking about Windows, you change the subprocess args like this:
args = ["PING.EXE", "-n", "1", "-w", "1", str(ip)]
Also replace this block of code with the following:
if p_ping.wait() == 0:
# Minimum = 1ms, Maximum = 1ms, Average = 1ms
search = re.search(
"Minimum = (.*)ms, Maximum = (.*)ms, Average = (.*)ms",
p_ping_out,
re.M | re.I,
)
ping_rtt = search.group(3)
out_q.put(f"OK {str(ip)} rtt (avg)={ping_rtt}ms")
Thanks @adrianyorke. I resorted to using: https://github.com/digineo/go-ping/tree/master/cmd/ping-test for Windows.
Hello @sourceperl, thank's for the code, i'm coding this
from threading import Thread, Lock
_db_lock = Lock()
import threading, time
import subprocess
import queue
import re
from redistimeseries.client import Client
from redis import StrictRedis, ConnectionError
import json
import sys
import os
import time, json
# some global vars
num_threads = 15
ips_q = queue.Queue()
out_qUp = queue.Queue()
out_qLow = queue.Queue()
ipRedis = '192.168.1.100'
def decode_redis(src):
if isinstance(src, list):
rv = list()
for key in src:
rv.append(decode_redis(key))
return rv
elif isinstance(src, dict):
rv = dict()
for key in src:
rv[key.decode()] = decode_redis(src[key])
return rv
elif isinstance(src, bytes):
return src.decode()
else:
raise Exception("type not handled: " +type(src))
def doQuerySUBS():
global r
subs = []
json_datos = {}
band = 0
print("Get data subscribers...")
data = decode_redis(r.hgetall('infraYI'))
if data == {}:
data = 'NO'
else:
data = json.loads(data['data'])
pass
for sub in range(len(data)):
if data[sub][0] != '' and data[sub][9] != '' and data[sub][10] != '' and data[sub][11] != '':
subs.append((data[sub][0], data[sub][1], data[sub][2], data[sub][3],
data[sub][4], data[sub][5], data[sub][6], data[sub][7],
data[sub][8], data[sub][9], data[sub][10], data[sub][11],
data[sub][12], data[sub][13], data[sub][14], data[sub][15]))
else:
pass
return subs
try:
rts = Client(host=ipRedis,port=6379,socket_keepalive=True,retry_on_timeout=True)
except Exception as e:
print(e)
try:
r = StrictRedis(host=ipRedis,port=6379,db=0,health_check_interval=30,socket_keepalive=True)
except Exception as e:
print(e)
# thread code : wraps system ping command
def thread_pinger(i, q):
"""Pings hosts in queue"""
p_ping_outS = []
while True:
# get an IP item form queue
ip = q.get()
# ping it
args=['/bin/ping', '-c', '1', '-W', '1', str(ip)]
p_ping = subprocess.Popen(args, close_fds=True, shell=False, stdout=subprocess.PIPE)
# save ping stdout
p_ping_out = p_ping.communicate()[0].decode('utf-8')
######## DEVICES UṔ ########
if (p_ping.wait() == 0):
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms',p_ping_out, re.M|re.I)
ping_rtt = search.group(2)
out_qUp.put("UP " + str(ip) + " rtt= "+ ping_rtt+' ms')
try:
rts.add(str(ip), int(time.time()), float(ping_rtt))
print(out_qUp.get_nowait())
except Exception as e:
print("[ERROR IP UP]########### rts.create "+str(e))
#print(out_qUp.get_nowait())
######## DEVICES DOWN ########
if (p_ping.wait() != 0):
p_ping_outS = p_ping_out.split(' ')
try:
rts.add(str(p_ping_outS[1]), int(time.time()), float(0.0))
print("DOWN "+str(p_ping_outS[1])+' 0.0'+' ms')
except Exception as e:
if len(p_ping_outS) > 1:
print("[ERROR IP DOWN]########### rts.create "+str(e))
else:
pass
time.sleep(1)
# update queue : this ip is processed
q.task_done()
class Listener1(threading.Thread):
def __init__(self, r, channels):
threading.Thread.__init__(self)
self.redis,self.init = r,0
self.pubsub = self.redis.pubsub()
print('Listener1...')
try:
self.pubsub.subscribe(channels)
except Exception as e:
print(e)
def work(self):
try:
ips = []
ipSubs = doQuerySUBS()
for x in range(0,len(ipSubs)):
ips.append(ipSubs[x][9])
#start the thread pool
for i in range(num_threads):
worker = Thread(target=thread_pinger, args=(i, ips_q))
worker.setDaemon(True)
worker.start()
# fill queue
for ip in ips:
ips_q.put(ip)
# wait until worker threads are done to exit
ips_q.join()
except Exception as e:
print(e)
time.sleep(5)
def run(self):
while True:
try:
self.work()
except ConnectionError:
print('[lost connection]')
while True:
print('trying to reconnect...')
try:
self.redis.ping()
except ConnectionError:
time.sleep(10)
else:
self.pubsub.subscribe(['last_session'])
break
time.sleep(0.001) # be nice to the system :)
client = Listener1(r, ['last_session','LT01TP0LT'])
client.start()
.... for making your example th_pinger.py
every 5 seconds, but after minutes appear this error:
OSError: [Errno 24] Too many open files
File "/usr/lib/python3.8/subprocess.py", line 1605, in _execute_child
errpipe_read, errpipe_write = os.pipe()
OSError: [Errno 24] Too many open files
.... you know why occur it?
Unfortunately not, do you have the same problem with basic code like this one: https://gist.github.com/sourceperl/0ef3719e8fef2c95d98c590ff1e7cefd ?
nice