Skip to content

Instantly share code, notes, and snippets.

@nitori
Last active January 8, 2023 12:51
Show Gist options
  • Save nitori/39ccd4b3aed4b1c46dbc61a84dba6734 to your computer and use it in GitHub Desktop.
Save nitori/39ccd4b3aed4b1c46dbc61a84dba6734 to your computer and use it in GitHub Desktop.
import socket
import threading
import queue
import time
from dataclasses import dataclass
from typing import Any
@dataclass
class Client:
q: queue.Queue
t: threading.Thread
last_seen: float
def udp_worker(q: queue.Queue) -> None:
while True:
payload = q.get()
if payload is None:
return
data, addr = payload
print(data, addr)
def udp_receiver(host: str, port: int) -> None:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((host, port))
clients: dict[Any, Client] = {}
timeout_after = 15
try:
while True:
data, addr = sock.recvfrom(1 << 15)
if addr not in clients:
print('New client', addr)
q = queue.Queue()
t = threading.Thread(target=udp_worker, args=(q,))
clients[addr] = Client(q, t, time.monotonic())
t.start()
clients[addr].q.put((data, addr))
clients[addr].last_seen = time.monotonic()
to_remove = []
for addr, client in clients.items():
if time.monotonic() - client.last_seen > timeout_after:
print('Client timeout', addr)
client.q.put(None)
client.t.join()
to_remove.append(addr)
for addr in to_remove:
del clients[addr]
finally:
for client in clients.values():
client.q.put_nowait(None)
for addr, client in clients.items():
client.t.join()
print('Client thread for', addr, 'closed')
def main():
udp_receiver('0.0.0.0', 12345)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment