Skip to content

Instantly share code, notes, and snippets.

@jayvynl
Last active July 12, 2023 09:05
Show Gist options
  • Save jayvynl/9d47afd0e9e7c3b79e7761bdf7c9b19a to your computer and use it in GitHub Desktop.
Save jayvynl/9d47afd0e9e7c3b79e7761bdf7c9b19a to your computer and use it in GitHub Desktop.
python udp broadcast
import atexit
import fcntl
import socket
import struct
import time
import traceback
INTERVAL = 5
sockets = {}
def get_ip_by_name(interface):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', interface[:15].encode('utf-8'))
)[20:24])
except OSError:
pass
def get_all_ip():
ips = []
for _, name in socket.if_nameindex():
try:
ip = get_ip_by_name(name)
except OSError:
continue
if ip:
ips.append(ip)
return ips
def update_sockets():
global sockets
ip_set = set(get_all_ip())
for ip in sockets:
if ip not in ip_set:
sockets.pop(ip).close()
for ip in ip_set:
if ip not in sockets:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind((ip, 0))
sockets[ip] = sock
def close_sockets():
global sockets
for sock in sockets.values():
sock.close()
atexit.register(close_sockets)
def broadcast():
global sockets
while True:
start = time.time()
update_sockets()
for sock in sockets.values():
sock.sendto(b'abc', ('<broadcast>', 12345))
delay = start + INTERVAL - time.time()
if delay > 0:
time.sleep(delay)
if __name__ == '__main__':
while True:
try:
broadcast()
except KeyboardInterrupt:
break
except:
traceback.print_exc()
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment