Skip to content

Instantly share code, notes, and snippets.

@tobyxdd
Created October 1, 2023 19:50
Show Gist options
  • Save tobyxdd/4dd532a725d04de90291141780b149ae to your computer and use it in GitHub Desktop.
Save tobyxdd/4dd532a725d04de90291141780b149ae to your computer and use it in GitHub Desktop.
Python UDP test server & client
import argparse
import socket
import time
import random
import threading
PACKET_SIZE_MIN, PACKET_SIZE_MAX = 200, 2000
PACKET_PER_SECOND = 100
def server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((host, port))
print(f"Listening on {host}:{port} for UDP packets")
packet_count = 0
def recv():
nonlocal packet_count
while True:
data, addr = s.recvfrom(PACKET_SIZE_MAX)
packet_count += 1
recv_thread = threading.Thread(target=recv)
recv_thread.daemon = True
recv_thread.start()
try:
while True:
time.sleep(1)
print(f"Packets received: {packet_count}")
packet_count = 0
except KeyboardInterrupt:
s.close()
def client(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
interval = 1.0 / PACKET_PER_SECOND
packet_sent_count = 0
def send():
nonlocal packet_sent_count
while True:
start_time = time.time()
size = random.randint(PACKET_SIZE_MIN, PACKET_SIZE_MAX)
content = random.randbytes(size)
s.sendto(content, (host, port))
packet_sent_count += 1
elapsed_time = time.time() - start_time
if elapsed_time < interval:
time.sleep(interval - elapsed_time)
send_thread = threading.Thread(target=send)
send_thread.daemon = True
send_thread.start()
try:
while True:
time.sleep(1)
print(f"Packets sent: {packet_sent_count}")
packet_sent_count = 0
except KeyboardInterrupt:
s.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="UDP test script")
parser.add_argument(
"mode", choices=["client", "server"], help="Operate in client or server mode"
)
parser.add_argument(
"host", help="IP address to bind (for server) or send to (for client)"
)
parser.add_argument(
"port", type=int, help="Port to bind (for server) or send to (for client)"
)
args = parser.parse_args()
if args.mode == "server":
server(args.host, args.port)
else:
client(args.host, args.port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment