Skip to content

Instantly share code, notes, and snippets.

@James-E-A
Last active September 11, 2025 02:09
Show Gist options
  • Save James-E-A/3828775d68ea04a3e2906b7197d83778 to your computer and use it in GitHub Desktop.
Save James-E-A/3828775d68ea04a3e2906b7197d83778 to your computer and use it in GitHub Desktop.
Python UDP socket.recv() iterator supporting KeyboardInterrupt and Multicast
#!/usr/bin/env python3
# Authored by James Edington Administrator in 2025; placed in the Public Domain.
__all__ = ['udp_listen_sync', 'udp_listen_async']
"""USAGE:
from udp_helper import *
# Simple iterator that responds instantly to Ctrl+C / SIGINT
for packet in udp_listen_sync(("10.0.0.69", 4200)):
print("Got packet:", packet.hex())
# Supports Multicast addresses seamlessly with the same syntax!
for packet in udp_listen_sync(("224.0.0.42", 4200)):
print("Got packet:", packet.hex())
# Also supports async, if you're into that!
async for packet in udp_listen_async(("224.0.0.42", 4200)):
print("Got packet:", packet.hex())
# Optional: constrain Multicast to *only* listening on a certain interface
for packet in udp_listen_sync(("224.0.0.42", 4200), mcast_bind_only_host="10.0.0.69"):
# only listening on NIC with IP 10.0.0.69
print("Got packet:", packet.hex())
# Optional: constrain Multicast to *only* listening on a certain interface
for packet in udp_listen_sync(("ff05::2a", 4200), mcast_bind_only_ifindex=2):
# only listening on first physical NIC
print("Got packet:", packet.hex())
TODO:
- Support discriminating between a packet addressed to the machine's LAN IP
and a packet addressed to a Multicast group the machine was subscribed to,
pending https://github.com/python/cpython/issues/80398#issuecomment-2302533639
- Fix support on binding IPv4 address by ifindex on Unix
https://ko-fi.com/E_Administrator
"""
import asyncio
from collections import Counter
from collections.abc import Sequence, Set
from contextlib import ExitStack, contextmanager, closing
import ctypes
from ctypes import sizeof
import enum
from functools import cache
import ipaddress
from itertools import chain, islice
import logging
from math import ceil, floor
from operator import index
import os
import platform
import re
import select
import signal
import socket
from socket import (
AF_INET,
AF_INET6,
INADDR_ANY,
IPPROTO_IP,
IPPROTO_IPV6,
IPV6_JOIN_GROUP,
IP_ADD_MEMBERSHIP,
SOCK_DGRAM,
inet_aton,
inet_pton,
)
import struct
import sys
import uuid
import warnings
try:
import threading
except ImportError:
threading = None
_platform_system = platform.system()
_IFINDEX_ANY = {
AF_INET: 0,
AF_INET6: 0
}
_HOST_ALL = {
AF_INET: str(ipaddress.IPv4Address(INADDR_ANY)),
AF_INET6: str(ipaddress.IPv6Address(INADDR_ANY))
}
_PY_HOST_ALL = '' # a lot of the Python standard library expects this, so we should expect it, too
# pack an integer (ifindex) into the format needed by mreq/mreq6
def _inet_iton(i):
# NOTE: this uses machine endianness, **not** network byte order
# as it's used for packing an integer into a struct
# for the setsockopt system library call.
return struct.pack('@I', i)
try:
# Linux/Unix
from resource import getpagesize
except ImportError:
try:
# Windows
from ctypes import oledll, wintypes
except ImportError:
try:
# generic fallback
import mmap
except ImportError:
# last resort fallback
def getpagesize():
return 512
getallocationgranularity = getpagesize
else:
# generic fallback (cont'd)
def getpagesize():
return mmap.PAGESIZE
getallocationgranularity = getpagesize
else:
# Windows (cont'd)
@cache
def getpagesize():
return GetSystemInfo()['pageSize']
@cache
def getallocationgranularity():
return GetSystemInfo()['allocationGranularity']
wintypes_UINT_PTR = wintypes.UINT_PTR if hasattr(wintypes, 'UINT_PTR') else ctypes.c_size_t
wintypes_DWORD_PTR = wintypes.DWORD_PTR if hasattr(wintypes, 'DWORD_PTR') else max([wintypes.DWORD, wintypes_UINT_PTR], key=sizeof)
class wintypes_SYSTEM_INFO(ctypes.Structure):
class _1(ctypes.Union):
class _2(ctypes.Structure):
_fields_ = [
('wProcessorArchitecture', wintypes.WORD),
('wReserved', wintypes.WORD),
]
_anonymous_ = ['_2']
_fields_ = [
('dwOemId', wintypes.DWORD),
('_2', _2),
]
_anonymous_ = ['_1']
_fields_ = [
('_1', _1),
('dwPageSize', wintypes.DWORD),
('lpMinimumApplicationAddress', wintypes.LPVOID),
('lpMaximumApplicationAddress', wintypes.LPVOID),
('dwActiveProcessorMask', wintypes_DWORD_PTR),
('dwNumberOfProcessors', wintypes.DWORD),
('dwProcessorType', wintypes.DWORD),
('dwAllocationGranularity', wintypes.DWORD),
('wProcessorLevel', wintypes.WORD),
('wProcessorRevision', wintypes.WORD),
]
@property
def value(self):
return {
'pageSize': self.dwPageSize,
'minimumApplicationAddress': ctypes.c_void_p(self.lpMinimumApplicationAddress),
'maximumApplicationAddress': ctypes.c_void_p(self.lpMaximumApplicationAddress),
'activeProcessorMask': self.dwActiveProcessorMask,
'_activeProcessors': frozenset(i for i in range(self.dwNumberOfProcessors) if (self.dwActiveProcessorMask & (1<<i))),
'numberOfProcessors': self.dwNumberOfProcessors,
'allocationGranularity': self.dwAllocationGranularity,
'processorLevel': self.wProcessorLevel,
'processorRevision': self.wProcessorRevision,
'processorArchitecture': self.wProcessorArchitecture,
'oemId': self.dwOemId
}
_GetNativeSystemInfo = oledll.Kernel32['GetNativeSystemInfo']
_GetNativeSystemInfo.argtypes = [
wintypes.LPCVOID,
]
def GetSystemInfo():
result = wintypes_SYSTEM_INFO()
_GetNativeSystemInfo(ctypes.byref(result))
return result.value
else:
# Linux/Unix (cont'd)
try:
from resource import getallocationgranularity
except ImportError:
getallocationgranularity = getpagesize
try:
from socket import IP_PKTINFO
except ImportError:
# https://github.com/python/cpython/issues/75386
if 'DEFINE_IP_PKTINFO' in os.environ:
IP_PKTINFO = int(os.environ['DEFINE_IP_PKTINFO'])
else:
if _platform_system == 'Windows':
# https://learn.microsoft.com/en-us/windows/win32/winsock/ip-pktinfo#socket-option-value
IP_PKTINFO = 19
elif _platform_system == 'Darwin':
# https://github.com/apple-oss-distributions/xnu/blob/xnu-7195.81.3/bsd/netinet/in.h#L485
IP_PKTINFO = 26
elif _platform_system == 'Linux':
# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/in.h?h=v6.10&id=0c3836482481200ead7b416ca80c68a29cfdaabd#n104
IP_PKTINFO = 8
else:
pass
# http://mail-index.netbsd.org/tech-net/2017/12/29/msg006583.html
try:
# https://cgit.freebsd.org/src/tree/sys/netinet/in.h?h=release/14.2.0-p2#n444
# https://cvsweb.openbsd.org/src/sys/netinet/in.h?rev=1.148&content-type=text/x-cvsweb-markup&only_with_tag=OPENBSD_7_6#:~:text=%23define%09IP_RECVDSTADDR
# https://cvsweb.netbsd.org/bsdweb.cgi/src/sys/netinet/in.h?rev=1.114;content-type=text%2Fx-cvsweb-markup;only_with_tag=netbsd-10-1-RELEASE#:~:text=%23define%09IP_RECVDSTADDR
# https://github.com/python/cpython/commit/3fdf58b5cc56cf9837670f31dceb3ea4954bed8f#diff-412f0023cab7bc5669c6c1e710a6f37e7c3c387d299794b5fe82268a10223b29R1392
from socket import IP_RECVDSTADDR
except ImportError:
pass
# simple iterator to listen on a UDP socket
def udp_listen_sync(
address=(_PY_HOST_ALL, 0),
family=None,
flags=0, *,
sockopts=None,
bufsize=None,
auto_mcast=True,
mcast_bind_only_host=None,
mcast_bind_only_ifindex=None,
use_wakeup=True,
wakeup_socket=None,
wakeup_timeout=None,
_force_allow_busy_select=False,
_force_allow_duplicate_mreq=False,
_force_allow_mreqn=False,
_force_allow_nondaemon_nonmain_thread=False
):
# argument normalization and validation
if bufsize is None:
if platform.python_implementation() == 'CPython':
# https://github.com/python/cpython/blob/v3.13.2/Modules/socketmodule.c#L3718
bufsize = max(getallocationgranularity() - sys.getsizeof(b''), 512)
else:
bufsize = getpagesize()
if wakeup_timeout == 0.0:
if not _force_allow_busy_select:
raise ValueError("Value of 0 seconds provided for wakeup_timeout; this would execute a continual busy poll which would waste a tremendous amount of CPU time for no benefit.\nTo disable the timeout, pass wakeup_timeout=None instead.")
else:
warnings.warn("Value of 0 seconds provided for wakeup_timeout; this will execute a continual busy poll which will waste a tremendous amount of CPU time for no benefit.\nTo disable the timeout, pass wakeup_timeout=None instead.", RuntimeWarning)
if use_wakeup:
if (wakeup_socket is False) and (wakeup_timeout is None):
raise ValueError("use_wakeup was enabled but wakeup_socket and wakeup_timeout were both disabled. Either omit wakeup_socket=False, pass a finite wakeup_timeout=..., or pass use_wakeup=False.")
else:
if wakeup_socket:
raise ValueError("use_wakeup was disabled but wakeup_socket was set. Either omit wakeup_socket=..., or omit use_wakeup=False.")
if wakeup_timeout is not None:
raise ValueError("use_wakeup was disabled but wakeup_timeout was set. Either omit wakeup_timeout=..., or omit use_wakeup=False.")
# body
with ExitStack() as ctx:
# make UDP socket
sock = ctx.enter_context(_make_udp_socket(
address,
family,
flags,
sockopts=sockopts,
auto_mcast=auto_mcast,
mcast_bind_only_host=mcast_bind_only_host,
mcast_bind_only_ifindex=mcast_bind_only_ifindex,
_force_allow_duplicate_mreq=_force_allow_duplicate_mreq,
_force_allow_mreqn=_force_allow_mreqn
))
rlist = [sock]
# make Ctrl-C work without any delay
if use_wakeup:
parent_wakeup_fd = None
if wakeup_socket is None:
# default codepath, caller wants regular behavior and did not give us a wakeup socket
if (threading is None) or (threading.current_thread() is threading.main_thread()):
# This signal wakeup FD fix only works if we're "the main thread of the main interpreter"
# but how are we supposed to check that???
# https://github.com/python/cpython/blob/v3.13.2/Modules/signalmodule.c#L505
# https://peps.python.org/pep-0734/
# 1. Create internal socketpair
wakeup_socket, wakeup_socket_2 = socket.socketpair()
# 2. Attach one end of the socket to the signal handler
parent_wakeup_fd = ctx.enter_context(_wakeup_fd_ctx(wakeup_socket_2.fileno(), 'report_restore'))
# 3. Listen on the other end of it, to wake this thread up when a signal is received
# even though recv would otherwise block signal handling
rlist.insert(0, wakeup_socket)
else:
# When the main script is killed with ctrl-c or whatever,
# Python automatically harvests all "daemon" threads.
# Since we're running on a non-main thread,
# we should just ensure that we're running as a daemon,
# and we'll get the expected behavior.
if (not threading.current_thread().daemon) and (not _force_allow_nondaemon_nonmain_thread):
raise ValueError("thread must be started with daemon=true to listen synchronously on UDP")
elif wakeup_socket is not False:
# caller-provided wakeup socket
# we are going to listen on it, but we are NOT going to
# register it as a signal handler since that could interfere
# with the caller's actual usage (e.g. could be a pipe from
# another thread used in some not-literally-POSIX-signals IPC scheme);
# if the caller actually wants to attach it as the wakeup handler,
# they are of course perfectly free to do that themselves
rlist.insert(0, wakeup_socket)
else:
# caller passed False singleton to DISABLE wakeup socket
# and ONLY use the "dumb" periodic wakeup
assert wakeup_timeout is not None
# main loop
sock_recv = sock.recv
for ready in _iter_readable_socket_fds_forever(rlist, timeout=wakeup_timeout):
if ready is sock:
yield sock_recv(bufsize, flags)
elif ready is wakeup_socket:
if parent_wakeup_fd is not None:
# NOTE: receiving potentially more than 1 byte
# because we aren't guaranteed that the caller's
# simply using a signal wakeup socket
# and we want to make this passthrough work reasonably well
os.write(parent_wakeup_fd, wakeup_socket.recv(bufsize))
else:
# NOTE: We actually don't need to handle SIGINT or CTRL_C_EVENT;
# if one of those was the signal in question, then
# the Python runtime will have already raised KeyboardInterrupt
# out of the select.select() call return boundary
# and this block won't even be reached...
# TODO: double-check that "ignoring" is reasonable behavior here,
# or if our hooking into the wakeup socket has secretly suppressed
# some other logic the caller still expects from the Python runtime
# NOTE: receiving just 1 byte
# because we are guaranteed that this is simply a signal wakeup
# and if many signals arrived in quick succession
# then we'll just enter this loop multiple times anyway so no problem
logging.debug(f"ignoring: %r", signal.Signals(wakeup_socket.recv(1)[0]))
elif __debug__:
raise RuntimeError(f'ready = {ready!r}')
# simple async iterator to listen on a UDP socket
async def udp_listen_async(
address=(_PY_HOST_ALL, 0),
family=None,
flags=0, *,
sockopts=None,
auto_mcast=True,
mcast_bind_only_host=None,
mcast_bind_only_ifindex=None,
_force_allow_duplicate_mreq=False,
_force_allow_mreqn=False
):
with ExitStack() as ctx:
loop = asyncio.get_running_loop()
sock = _make_udp_socket(
address,
family,
flags,
sockopts=sockopts,
auto_mcast=auto_mcast,
mcast_bind_only_host=mcast_bind_only_host,
mcast_bind_only_ifindex=mcast_bind_only_ifindex,
_force_allow_duplicate_mreq=_force_allow_duplicate_mreq,
_force_allow_mreqn=_force_allow_mreqn
)
try:
t, proto = await loop.create_datagram_endpoint(
_SimpleDgramListenIterProto,
sock=sock
)
except Exception:
# we own the socket
sock.close()
raise
else:
# transport owns the socket
ctx.callback(t.close)
# "async yield from" does not exist yet -- https://discuss.python.org/t/yield-from-in-async-functions/47050
async for data in proto:
yield data
# helper class to glue the asyncio callbacks into a simple (async) iterable
class _SimpleDgramListenIterProto(asyncio.DatagramProtocol):
__slots__ = ('__queue', '_transport')
def __init__(self):
# FIXME: Is there any way to do away with this queue??
self.__queue = asyncio.Queue()
def connection_made(self, transport):
# FIXME: Is this actually needed for SOCK_DGRAM??
self._transport = transport
def datagram_received(self, data, addr):
self.__queue.put_nowait(data)
def __aiter__(self):
return self
async def __anext__(self):
# FIXME: what's the right way to handle KeyboardInterrupt?
# catching CancelledError and transmuting it to StopAsyncIteration
# seems to suppress the KeyboardInterrupt entirely...
return await self.__queue.get()
# Creates, applies sockopts to, binds, and returns a UDP socket.
def _make_udp_socket(
address,
family,
flags=0, *,
sockopts=frozenset(),
auto_mcast,
mcast_bind_only_host,
mcast_bind_only_ifindex,
_force_allow_duplicate_mreq,
_force_allow_mreqn
):
if (not auto_mcast) and (mcast_bind_only_host) or (mcast_bind_only_ifindex):
raise ValueError("mreq construction was disabled but one of mcast_bind_only_host or mcast_bind_only_ifindex was set. Either omit mcast_bind_only_XXXXX=..., or omit auto_mcast=False.")
# allow string addresses instead of socket module tuples for convenience
if isinstance(address, str):
address = _parse_ip_uri(address, expect_scheme='udp', require_scheme=False)
if address[1] is None:
address = (address[0], 0, *address[2:])
# best practice - new list i/c/o mutation
if sockopts is None:
sockopts = []
# how annoying it is to be required to specify this
# when it COULD HAVE EASILY BEEN DEDUCED from the address format...
# save the caller this nuisance, if we can
if family is None:
assert isinstance(address, (tuple, Sequence)) and len(address) >= 1 and isinstance(address[0], str)
if address[0] == _PY_HOST_ALL:
warnings.warn("Specify family=AF_INET or family=AF_INET6 when binding all hosts with implicit syntax.", DeprecationWarning)
family = AF_INET # assume
else:
family = {4: AF_INET, 6: AF_INET6}[ipaddress.ip_address(address[0]).version] # guess
# simplify interface for listening on Multicast
if auto_mcast and _is_multicast(address[0]):
if not _force_allow_duplicate_mreq:
assert not _has_mreq(sockopts)
if family == AF_INET:
if _platform_system == 'Windows':
if mcast_bind_only_host is None:
mcast_bind_only_host = _PY_HOST_ALL
if mcast_bind_only_ifindex is None:
mcast_bind_only_host_or_ifindex = _PY_HOST_ALL
else:
mcast_bind_only_host_or_ifindex = mcast_bind_only_ifindex
else:
if mcast_bind_only_ifindex is None:
mcast_bind_only_host_or_ifindex = mcast_bind_only_host
else:
# caller set mcast_bind_only_host *and* mcast_bind_only_ifindex explicitly.
# while Windows doesn't support including these both in an IPv4 mreq,
# the former constraint is going to be implicitly applied with the bind() call,
# so we'll just apply the latter constraint via the mreq
mcast_bind_only_host_or_ifindex = mcast_bind_only_ifindex
address, mcast_group = ((mcast_bind_only_host, *address[1:]), address[0])
sockopts = _iadd_or_iappend(sockopts, _make_mreq_4_win(mcast_group, mcast_bind_only_host_or_ifindex))
elif _platform_system in {'Linux', 'FreeBSD'} or _force_allow_mreqn:
# both Linux and FreeBSD accuse the basic mreq struct of being "legacy",
# so we'll give them the fancy mreqn struct
if mcast_bind_only_host is None:
mcast_bind_only_host = _PY_HOST_ALL
if mcast_bind_only_ifindex is None:
mcast_bind_only_ifindex = _IFINDEX_ANY[family]
address, mcast_group = ((mcast_bind_only_host, *address[1:]), address[0])
sockopts = _iadd_or_iappend(sockopts, _make_mreq_4_unix(mcast_group, mcast_bind_only_host, mcast_bind_only_ifindex))
else:
# generic Unix may be safely assumed to support mreq,
# but may NOT be safely assumed to support mreqn.
# OpenBSD, NetBSD, and Mac OS all describe mreq as
# the sole valid argument type for IP_ADD_MEMBERSHIP.
if mcast_bind_only_host is None:
if mcast_bind_only_ifindex is None:
# no constraints, just use default value for mcast_bind_only_host
mcast_bind_only_host = _PY_HOST_ALL
else:
# ifindex constraint only: try to support it on a best-effort basis
mcast_bind_only_host = _lookup_ip(family, mcast_bind_only_ifindex)
else:
if mcast_bind_only_ifindex is None:
# host is set and ifindex constraint is unset.
# parameters are already perfect for _make_mreq_4_unix().
pass
else:
# unlike Windows, generic Unix does not allow passing an ifindex in via the imr_interface field
# so we have no way to respect both these constraints
raise ValueError(f"cannot specify both mcast_bind_only_ifindex and mcast_bind_only_host with {family!r} on {_platform_system}")
address, mcast_group = ((mcast_bind_only_host), address[0])
sockopts = _iadd_or_iappend(sockopts, _make_mreq_4_unix(mcast_group, mcast_bind_only_host))
elif family == AF_INET6:
# thank god, at least this interface was flattened
if mcast_bind_only_host is None:
mcast_bind_only_host = _PY_HOST_ALL
if mcast_bind_only_ifindex is None:
mcast_bind_only_ifindex = _IFINDEX_ANY[family]
address, mcast_group = ((mcast_bind_only_host, *address[1:]), address[0])
sockopts = _iadd_or_iappend(sockopts, _make_mreq_6(mcast_group, mcast_bind_only_ifindex))
else:
raise NotImplementedError(f"family={family!r}")
# configure and start socket
sock = socket.socket(family, SOCK_DGRAM)
try:
for opt in sockopts:
sock.setsockopt(*opt)
sock.bind(address)
if sock.family in {AF_INET, AF_INET6} and not address[1]:
# slightly more important debugging message as the port was randomly assigned
logging.info("laddr=%r, sockopts=%r", sock.getsockname(), sockopts)
else:
logging.debug("laddr=%r, sockopts=%r", sock.getsockname(), sockopts)
except Exception:
sock.close()
raise
else:
return sock
# NOTE: check kqueue before epoll
# because some BSDs also have an epoll shim
# we don't want to accidentally use instead
if hasattr(select, 'kqueue'):
# BSDs
def _iter_readable_socket_fds_forever(rlist, timeout=None):
with closing(select.kqueue()) as kq:
kev_list = [select.kqueue.event(obj, select.KQ_FILTER_READ, select.KQ_EV_ADD) for obj in rlist]
max_ev = len(kev_list)
if (max_ev == 0) and (timeout is not None):
# bug in kqueue: timeout is ignored when max_ev = 0
# workaround h/t: https://github.com/python/cpython/blob/v3.13.2/Lib/selectors.py#L541-L545
max_ev = 1
# kqueue yields bare integers but we want to treat the caller to their familiar objects
# h/t: https://github.com/python/cpython/blob/v3.13.2/Lib/selectors.py#L556
# https://github.com/python/cpython/blob/v3.13.2/Lib/selectors.py#L248
# https://github.com/python/cpython/blob/v3.13.2/Lib/selectors.py#L37
fd_to_key = {
k: obj
for obj in rlist
for k in ([obj, int(obj.fileno())] if hasattr(obj, 'fileno') else [int(obj)])
}
fd_to_key_get = fd_to_key.__getitem__
kq_control = kq.control
kq_control(kev_list, 0, 0)
while True:
yield from (fd_to_key_get(kev.ident) for kev in kq_control(None, max_ev, timeout))
elif False:
# Windows since 3.5
# TODO: steal code from https://github.com/python/cpython/blob/v3.13.2/Lib/asyncio/windows_events.py#L488
# NOTE: adding support for this will require substantially changing the interface of this function.
# "After an instance of an open handle is associated with an I/O completion port, it cannot
# be used in the ReadFileEx or WriteFileEx function because these functions have their own
# asynchronous I/O mechanisms." (!!!)
def _iter_readable_socket_fds_forever(rlist, timeout=None):
# windll.Kernel32.CreateIoCompletionPort(None, None, 0, 0)
with select_iocp.iocp() as cp:
for (i, obj) in enumerate(rlist):
# windll.Kernel32.CreateIoCompletionPort(obj.fileno(), cp._handle, i, 0)
cp.register(obj, i, None)
cp_get_ex = cp.get_ex
rlist_get = rlist.__getitem__
while True:
# https://learn.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatusex
yield from (rlist_get(ev.completionKey) for ev in cp_get_ex())
elif hasattr(select, 'epoll'):
# Linux since 2.5
def _iter_readable_socket_fds_forever(rlist, timeout=None):
with select.epoll(sizehint=len(rlist)) as p:
for obj in rlist:
p.register(obj, select.EPOLLIN)
poll_ = p.poll
while True:
yield from (ev[0] for ev in poll_(timeout))
elif hasattr(select, 'devpoll'):
# completionist
def _iter_readable_socket_fds_forever(rlist, timeout=None):
with closing(select.devpoll()) as p:
for obj in rlist:
p.register(obj, select.POLLIN)
poll_ = p.poll
while True:
yield from (ev[0] for ev in poll_(timeout))
elif hasattr(select, 'poll'):
# reasonable fallback on most Unix
def _iter_readable_socket_fds_forever(rlist, timeout=None):
with ExitStack() as ctx:
p = select.poll()
for obj in rlist:
p.register(obj, select.POLLIN)
ctx.callback(p.unregister, obj) # Python documentation does not **guarantee** us that this is unnecessary
poll_ = p.poll
while True:
yield from (ev[0] for ev in poll_(timeout))
elif _platform_system == 'Windows' and sys.getwindowsversion() >= (6,):
# Windows since Vista
# NOTE: we do NOT need to check sys.getwindowsversion() >= (10, 0, 19041)
# because we are ONLY using this INTERNAL METHOD with incoming udp
# so the infamous Windows 8 bug #309411 that affects newly established outgoing tcp
# doesn't affect THIS codebase anyway
from ctypes import windll, wintypes
def _iter_readable_socket_fds_forever(rlist, timeout=None):
# NOTE: for the winpoll api, it is correct to "fail" to unregister file descriptors
# as the poll object does not refer to any kind of winpoll fd that ought to be closed
# nor is there any registration/unregistration logic relating to the file descriptors
# -- i.e. the Python GC, if any, is completely sufficient for correctness here
# because "unregistering" each file descriptor on unwind
# when the winpoll object is not going to be reused
# is **guaranteed** to be a waste of time
p = _winpoll(sizehint=len(rlist))
for obj in rlist:
p.register(obj, select_POLLIN)
poll_ = p._poll
timeout_ms = -1 if timeout is None else ceil(timeout * 1000) if timeout >= 0 else floor(timeout * 1000)
while True:
yield from (ev[0] for ev in poll_(timeout_ms))
wintypes_UINT_PTR = wintypes.UINT_PTR if hasattr(wintypes, 'UINT_PTR') else ctypes.c_size_t
wintypes_SOCKET = wintypes.SOCKET if hasattr(wintypes, 'SOCKET') else wintypes_UINT_PTR
# https://github.com/tpn/winsdk-10/blob/master/Include/10.0.10240.0/um/WinSock2.h#L392
wintypes_SOCKET_ERROR = wintypes.SOCKET_ERROR if hasattr(wintypes, 'SOCKET_ERROR') else -1
# https://learn.microsoft.com/en-us/windows/win32/winsock/windows-sockets-error-codes-2
wintypes_WSAEINVAL = wintypes.WSAEINVAL if hasattr(wintypes, 'WSAEINVAL') else 10022
wintypes_WSAEINTR = wintypes.WSAEINTR if hasattr(wintypes, 'WSAEINTR') else 10004
# https://github.com/tpn/winsdk-10/blob/master/Include/10.0.10240.0/um/WinSock2.h#L1589
select_POLLRDNORM = select.POLLRDNORM if hasattr(select, 'POLLRDNORM') else 0x0100
select_POLLRDBAND = select.POLLRDBAND if hasattr(select, 'POLLRDBAND') else 0x0200
select_POLLIN = select.POLLIN if hasattr(select, 'POLLIN') else (select_POLLRDNORM | select_POLLRDBAND)
select_POLLPRI = select.POLLPRI if hasattr(select, 'POLLPRI') else 0x0400
select_POLLWRNORM = select.POLLWRNORM if hasattr(select, 'POLLWRNORM') else 0x0010
select_POLLOUT = select.POLLOUT if hasattr(select, 'POLLOUT') else select_POLLWRNORM
select_POLLWRBAND = select.POLLWRBAND if hasattr(select, 'POLLWRBAND') else 0x0020
select_POLLERR = select.POLLERR if hasattr(select, 'POLLERR') else 0x0001
select_POLLHUP = select.POLLHUP if hasattr(select, 'POLLHUP') else 0x0002
select_POLLNVAL = select.POLLNVAL if hasattr(select, 'POLLNVAL') else 0x0004
class WSAPOLLFD(ctypes.Structure):
__slots__ = ()
_fields_ = [
('fd', wintypes_SOCKET),
('events', wintypes.SHORT),
('revents', wintypes.SHORT),
]
def _getfd(fileobj):
return int(fileobj.fileno()) if hasattr(fileobj, 'fileno') else index(fileobj)
# workaround -- https://github.com/python/cpython/issues/60711#issuecomment-2698306318
class _winpoll:
# * "impl" is a ctypes array of WSAPOLLFD objects
# we maintain "impl" as compact and overallocated; if the caller registers any negative or invalid FDs,
# then they will be liable for dealing with any POLLNVAL results
#
# * "fd2obj" is a dict which maps the fd value of each occupied slot in "impl" to the Python object it goes with
__slots__ = ('__impl_buf', '__fd2obj')
def __init__(self, sizehint=max(getallocationgranularity() // sizeof(WSAPOLLFD), 1)):
self.__impl_buf = ctypes.create_string_buffer(sizeof(WSAPOLLFD * sizehint))
self.__fd2obj = {}
# debug function to make sure internal invariants hold
def _check(self):
impl_buf = self.__impl_buf
fd2obj = self.__fd2obj
# INVARIANT: impl_buf must be at least big enough to hold all our allegedly registered fds
impl_view_t = WSAPOLLFD * len(fd2obj)
if sizeof(impl_view_t) > sizeof(impl_buf):
raise AssertionError("impl_buf too small")
# INVARIANT: the actual values contained within impl_buf must agree with our allegedly registered fds
impl_view = impl_view_t.from_buffer(impl_buf)
if set(impl.fd for impl in impl_view) != fd2obj.keys():
raise AssertionError("mismatch between fdArray and fd2obj registry")
# accepts optional int or float seconds, returns a list of (fileobj, eventmask)
def poll(self, timeout=None):
# we round amounts up if needed because
# it'd be incorrect to wait any *less* than the requested interval
return list(self._poll(-1 if timeout is None else ceil(timeout * 1000) if timeout >= 0 else floor(timeout * 1000)))
# accepts integer milliseconds, returns an iterator of (fileobj, eventmask)
def _poll(self, timeout_ms=-1):
impl_buf = self.__impl_buf
fd2obj = self.__fd2obj
fds = len(fd2obj)
impl_view_t = WSAPOLLFD * fds
impl_view = impl_view_t.from_buffer(impl_buf)
# call WSAPoll
while True:
ret = _WSAPoll(impl_buf, fds, timeout_ms)
# we must handle errors manually because wintypes doesn't handle them;
# we are using wintypes because oledll handles this function's errors incorrectly
if ret == wintypes_SOCKET_ERROR:
# https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsapoll#return-value
ret = _WSAGetLastError()
# https://peps.python.org/pep-0475/
# https://github.com/python/cpython/blob/v3.13.3/Modules/selectmodule.c#L675-L701
if ret == WSAEINTR:
continue
raise ctypes.WinError(ret)
assert fds >= ret >= 0, "WSAPoll return value was not zero, greater than zero but less than fds, or exactly SOCKET_ERROR"
break
impl_view = (WSAPOLLFD * fds).from_buffer(impl_buf)
fd2obj_getitem = fd2obj.__getitem__
yield from islice(
(
(fd2obj_getitem(fd), ev)
for fd, ev in ((impl.fd, impl.revents) for impl in impl_view)
if ev # don't call fd2objgetitem for objects which had no events
),
ret # islice returns early when possible
)
def register(self, fileobj, eventmask=(select_POLLIN|select_POLLPRI|select_POLLOUT)):
fd = _getfd(fileobj)
if __debug__: self._check()
impl_buf = self.__impl_buf
fd2obj = self.__fd2obj
fds = len(fd2obj)
impl_view_t = WSAPOLLFD * fds
impl_view = impl_view_t.from_buffer(impl_buf)
for impl in impl_view:
if impl.fd == fd:
# select existing slot with this fd to update existing registration
break
else:
# allocate new slot for new registration
fds_1 = fds + 1
if sizeof(WSAPOLLFD * fds_1) > sizeof(impl_buf):
# always at least double actual internal allocation size to avoid too many re-allocations if this is filled in a loop
ctypes.resize(
impl_buf,
_smallest_multiple_atleast(
getallocationgranularity(),
sizeof(WSAPOLLFD * max(fds*2, fds_1))
)
)
impl_view_t = WSAPOLLFD * fds_1
impl_view = impl_view_t.from_buffer(impl_buf)
# select newly allocated slot
impl = impl_view[-1]
impl.fd = fd
# set entry registration
impl.events = eventmask
fd2obj[fd] = fileobj
if __debug__: self._check()
def modify(self, fileobj, eventmask):
fd = _getfd(fileobj)
if __debug__: self._check()
impl_buf = self.__impl_buf
fd2obj = self.__fd2obj
fds = len(fd2obj)
impl_view_t = WSAPOLLFD * fds
impl_view = impl_view_t.from_buffer(impl_buf)
for impl in impl_view:
if impl.fd == fd:
# select existing slot to update existing registration
break
else:
raise KeyError(f"{fileobj!r} is not registered")
# set entry registration
impl.events = eventmask
fd2obj[fd] = fileobj
if __debug__: self._check()
def unregister(self, fileobj):
fd = _getfd(fileobj)
if __debug__: self._check()
impl_buf = self.__impl_buf
fd2obj = self.__fd2obj
fds = len(fd2obj)
impl_view_t = WSAPOLLFD * fds
impl_view = impl_view_t.from_buffer(impl_buf)
for i, impl in enumerate(impl_view):
if impl.fd == fd:
# select existing slot to delete existing registration
break
else:
raise KeyError(f"{fileobj!r} is not registered")
fds_1 = fds - 1
if i < fds_1:
# deleting entry in the middle of the buffer; need to re-compact array
ctypes.memmove(
ctypes.addressof(impl_view[i]),
ctypes.addressof(impl_view[i+1]),
sizeof(impl_view._type_) * (fds_1 - i)
)
# unset entry registration
del fd2obj[fd]
if __debug__: self._check()
_WSAPoll = windll.Ws2_32['WSAPoll']
_WSAPoll.argtypes = [
wintypes.LPVOID,
wintypes.ULONG,
wintypes.INT,
]
_WSAGetLastError = windll.Ws2_32['WSAGetLastError']
else:
# one of the fallbacks of all time
def _iter_readable_socket_fds_forever(rlist, timeout=None):
_empty = []
select_ = select.select
while True:
yield from select_(rlist, _empty, _empty, timeout)[0]
def _is_multicast(host):
if host == _PY_HOST_ALL:
return False
if isinstance(host, str):
host = ipaddress.ip_address(host)
return host.is_multicast
POSSIBLE_IP_ADDRESS_OR_URI = re.compile(r'(?i)^(?:((?!\d)\w+)(?::/?/?))?\[?((?<!\[)[0-9\.]+(?!\])|[0-9A-F:]+(?:%\w+?)?)\]?(?::(\d+))?$')
def _parse_ip_uri(s, *, expect_scheme=False, require_scheme=False):
m = re.match(POSSIBLE_IP_ADDRESS_OR_URI, s)
if m:
if expect_scheme is not False:
# expect_scheme is None -> expect the ABSENCE of a scheme
# isinstance(expect_scheme, str) -> expect some particular scheme, but by default allow the absence of a scheme
# expect_scheme is False -> do not expect any particular scheme
# require_scheme == True -> do NOT allow absence of an expected scheme
scheme = m.group(1).lower() if m.group(1) is not None else None
if ((scheme is not None) or require_scheme) and (scheme != expect_scheme):
raise ValueError(f"expected a URI with {expect_scheme} scheme, got a URI with {scheme} scheme")
host = m.group(2)
port = int(m.group(3)) if m.group(3) is not None else None
if __debug__: ipaddress.ip_address(host) # fail early on bad values
return (host, port)
raise ValueError(f"could not parse {s!r} as an IP or IP URI with optional port and no path")
def _lookup_ip(family, obj, *, try_str_as_ip_first=False):
if obj == _PY_HOST_ALL:
return _HOST_ALL[family]
if try_str_as_ip_first and isinstance(obj, str):
try:
ipaddress.ip_address(obj)
return obj
except ValueError:
pass
return _plat_lookup_ip(family, obj)
def _lookup_ifindex(key):
# if we were given exactly an integer, just return it.
if isinstance(key, int):
return key
else:
try:
return int(key)
except ValueError:
pass
return _plat_lookup_ifindex(key)
def _make_mreq_4_unix(mcast_group, interface_address=_PY_HOST_ALL, ifindex=None):
if interface_address == _PY_HOST_ALL:
interface_address = _HOST_ALL[AF_INET]
if ifindex is None:
# https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/in.h?h=v6.10&id=0c3836482481200ead7b416ca80c68a29cfdaabd#n178
# https://github.com/apple-oss-distributions/xnu/blob/xnu-7195.81.3/bsd/netinet/in.h#L563
# https://docs.oracle.com/en/operating-systems/solaris/oracle-solaris/11.4/prog-interfaces/receiving-ipv4-multicast-datagrams.html
return (
IPPROTO_IP,
IP_ADD_MEMBERSHIP,
(
inet_aton(mcast_group) +
inet_aton(interface_address)
)
)
else:
# https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/in.h?h=v6.10&id=0c3836482481200ead7b416ca80c68a29cfdaabd#n183
# https://cgit.freebsd.org/src/tree/share/man/man4/ip.4?h=release/14.2.0-p2#n566
return (
IPPROTO_IP,
IP_ADD_MEMBERSHIP,
(
inet_aton(mcast_group) +
inet_aton(interface_address) +
_inet_iton(ifindex)
)
)
def _make_mreq_4_win(mcast_group, interface_address_or_ifindex=_PY_HOST_ALL):
if interface_address_or_ifindex == _PY_HOST_ALL:
interface_address = _HOST_ALL[AF_INET]
elif isinstance(interface_address_or_ifindex, int):
interface_address_ = ipaddress.IPv4Address(interface_address_or_ifindex)
assert interface_address_ in ipaddress.IPv4Network("0.0.0.0/8"),\
"https://learn.microsoft.com/en-us/windows/win32/api/ws2ipdef/ns-ws2ipdef-ip_mreq#:~:text=The%20imr_interface%20member%20can%20be%20an%20interface%20index.,.%20The%200.0.0.0/8%20IPv4%20address%20block%20is%20not%20used%20(this%20range%20is%20reserved)."
interface_address = str(interface_address_)
else:
interface_address = interface_address_or_ifindex
# https://learn.microsoft.com/en-us/windows/win32/api/ws2ipdef/ns-ws2ipdef-ip_mreq#syntax
return (
IPPROTO_IP,
IP_ADD_MEMBERSHIP,
(
inet_aton(mcast_group) +
inet_aton(interface_address)
)
)
def _make_mreq_6(mcast_group, ifindex=_IFINDEX_ANY[AF_INET6]):
# https://learn.microsoft.com/en-us/windows/win32/api/ws2ipdef/ns-ws2ipdef-ipv6_mreq#syntax
# https://github.com/apple-oss-distributions/xnu/blob/xnu-7195.81.3/bsd/netinet6/in6.h#L729
# https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/in6.h?h=v6.10&id=0c3836482481200ead7b416ca80c68a29cfdaabd#n60
return (
IPPROTO_IPV6,
IPV6_JOIN_GROUP,
(
inet_pton(AF_INET6, mcast_group) +
_inet_iton(ifindex)
)
)
def _has_mreq(sockopts):
return any(
(
(opt[0] == IPPROTO_IP and opt[1] == IP_ADD_MEMBERSHIP) or
(opt[0] == IPPROTO_IPV6 and opt[1] == IPV6_JOIN_GROUP)
)
for opt in sockopts
if len(opt) >= 2
)
def _smallest_multiple_atleast(base, minimum_value):
return ((minimum_value + base - 1) // base) * base
def _ipaddress_interface_get_ip(interface):
if (interface.version == 6) and (interface.scope_id is not None):
return interface.ip.__class__(f"{interface.ip}%{interface.scope_id}")
else:
return interface.ip
if _platform_system == 'Windows':
from ctypes import windll, wintypes
def _plat_lookup_ifindex(key):
adaptersaddresses = GetAdaptersAddresses()
# if we were given exactly the name of some interface, return that interface.
d = {
record[k_]: record
for record in adaptersaddresses
for k_ in ['name', '_AdapterName', '_FriendlyName']
}
if key in d:
return d[key]['_IfIndex']
# if we were given exactly a MAC address, return the interface with that MAC address.
# accepts colon or dash separated hexadecimal octets. case-insensitive.
if isinstance(key, str):
mac = key.upper()
d = {
k: record
for record in adaptersaddresses
if '_PhysicalAddress' in record
for k in [
record['_PhysicalAddress'].upper(),
record['_PhysicalAddress'].replace('-', ':').upper()
]
}
if mac in d:
return d[mac]['_IfIndex']
# if we were given an IP which is exactly the IP of some interface, return that interface.
# accepts "naked" IP, "interface" IP-with-netmask,
# and "just a subnet" which MUST EXACTLY MATCH the subnet of some interface.
try:
if isinstance(key, ipaddress._BaseAddress):
ip = key
if hasattr(key, 'hostmask') and ((int(ip) & int(ip.hostmask)) == 0):
warnings.warn(f"Automatically converting questionable {ip.__class__.__name__} into {ip.network.__class__.__name__}", DeprecationWarning)
ip = ip.network
else:
try:
ip = ipaddress.ip_interface(key)
if (int(ip) & int(ip.hostmask)) == 0:
ip = ip.network
except ValueError as e0:
try:
ip = ipaddress.ip_address(key)
except ValueError:
raise e0 from None
except ValueError:
pass
else:
d = {
k: record
for record in adaptersaddresses
for addr in reversed(record['addrs']) # prioritize first address
for k in [addr, _ipaddress_interface_get_ip(addr), addr.network]
}
if ip in d:
return d[ip][{4: '_IfIndex', 6: '_Ipv6IfIndex'}[ip.version]]
raise KeyError(key)
def _plat_lookup_ip(family, key):
if key == _IFINDEX_ANY[family]:
return _HOST_ALL[family]
adaptersaddresses = GetAdaptersAddresses()
vers = {AF_INET: 4, AF_INET6: 6}[family]
if isinstance(key, int):
k = {AF_INET: '_IfIndex', AF_INET6: '_Ipv6IfIndex'}[family]
d = {
record[k]: _ipaddress_interface_get_ip(addr)
for record in adaptersaddresses
if (record.get(k) is not None) and (record['_OperStatus'] == IP_ADAPTER_ADDRESSES.wintypes_IF_OPER_STATUS.up)
for addr in reversed(record['addrs']) # prioritize first address
if addr.version == vers
}
elif isinstance(key, str):
k_ = ['name', '_AdapterName', '_FriendlyName', '_PhysicalAddress']
d = {
record[k]: _ipaddress_interface_get_ip(addr)
for record in adaptersaddresses
# NOTE: no need to filter this by OperStatus
for addr in reversed(record['addrs']) # prioritize first address
if addr.version == vers
for k in k_
}
else:
raise TypeError(type(key))
return str(d[key])
wintypes_IFTYPE = wintypes.IFTYPE if hasattr(wintypes, 'IFTYPE') else wintypes.UINT
wintypes_IF_INDEX = wintypes.IF_INDEX if hasattr(wintypes, 'IF_INDEX') else wintypes.ULONG
wintypes_NET_IF_COMPARTMENT_ID = wintypes.NET_IF_COMPARTMENT_ID if hasattr(wintypes, 'NET_IF_COMPARTMENT_ID') else ctypes.c_uint32
wintypes_NET_IF_CONNECTION_TYPE = wintypes.NET_IF_CONNECTION_TYPE if hasattr(wintypes, 'NET_IF_CONNECTION_TYPE') else wintypes.UINT
wintypes_NDIS_IF_MAX_STRING_SIZE = wintypes.NDIS_IF_MAX_STRING_SIZE if hasattr(wintypes, 'NDIS_IF_MAX_STRING_SIZE') else wintypes.IF_MAX_STRING_SIZE if hasattr(wintypes, 'IF_MAX_STRING_SIZE') else 256
wintypes_ADDRESS_FAMILY = wintypes.ADDRESS_FAMILY if hasattr(wintypes, 'ADDRESS_FAMILY') else wintypes.USHORT
wintypes_UINT8 = wintypes.UINT8 if hasattr(wintypes, 'UINT8') else ctypes.c_uint8
wintypes_UCHAR = wintypes.UCHAR if hasattr(wintypes, 'UCHAR') else ctypes.c_uchar if hasattr(ctypes, 'c_uchar') else ctypes.c_ubyte
wintypes_ULONG64 = wintypes.ULONG64 if hasattr(wintypes, 'ULONG64') else ctypes.c_uint64
wintypes_ULONGLONG = wintypes.ULONGLONG if hasattr(wintypes, 'ULONGLONG') else ctypes.c_ulonglong
_StringFromGUID2 = ctypes.oledll.Ole32['StringFromGUID2']
_CLSIDFromString = ctypes.oledll.Ole32['CLSIDFromString']
class wintypes_GUID(ctypes.Structure):
_fields_ = [
('Data1', ctypes.c_uint32),
('Data2', ctypes.c_uint16),
('Data3', ctypes.c_uint16),
('Data4', ctypes.c_uint8*8),
]
def __init__(self, *a, **k):
if len(a) == 1 and isinstance(value := a[0], (str, uuid.UUID)):
if isinstance(value, uuid.UUID):
value = f"{{{value!s}}}"
super().__init__(**k)
self.__load_from_str(value)
else:
super().__init__(*a, **k)
def __repr__(self):
return f"{self.__class__.__name__}({str(self)!r})"
def __str__(self):
# we need +1 wchar because StringFromGUID2 weirdly does
# both null-termination AND explicit written-length signalling
buf = ctypes.create_unicode_buffer(39)
ret = _StringFromGUID2(ctypes.byref(self), ctypes.byref(buf), len(buf))
# also, oledll weirdly neither raises nor sets GetLastError when StringFromGUID2 fails
if ret == 0: raise ctypes.WinError(0x00000008)
result = buf.value
assert (len(result) + 1 == ret) or (len(result) == ret)
return result
def __load_from_str(self, s):
# oledll raises when CLSIDFromString fails
_CLSIDFromString(s, ctypes.byref(self))
def __bool__(self):
return bool(self.Data1 or self.Data2 or self.Data3 or any(self.Data4))
@property
def value(self):
return uuid.UUID(str(self))
class IP_ADAPTER_ADDRESSES(ctypes.Union):
ERROR_BUFFER_OVERFLOW = 111
class _field_1(ctypes.Union):
class _field_2(ctypes.Structure):
_fields_ = [('Length', wintypes.ULONG), ('IfIndex', wintypes_IF_INDEX)]
_anonymous_ = ['_2']
_fields_ = [('Alignment', wintypes.ULONG), ('_2', _field_2)]
class IP_ADAPTER_ADDRESSES_LH(ctypes.Structure):
MAX_DHCPV6_DUID_LENGTH = 130
class _field_13(ctypes.Union):
class _field_2(ctypes.Structure):
_fields_ = [
('DdnsEnabled', wintypes.ULONG, 1),
('RegisterAdapterSuffix', wintypes.ULONG, 1),
('Dhcpv4Enabled', wintypes.ULONG, 1),
('ReceiveOnly', wintypes.ULONG, 1),
('NoMulticast', wintypes.ULONG, 1),
('Ipv6OtherStatefulConfig', wintypes.ULONG, 1),
('NetbiosOverTcpipEnabled', wintypes.ULONG, 1),
('Ipv4Enabled', wintypes.ULONG, 1),
('Ipv6Enabled', wintypes.ULONG, 1),
('Ipv6ManagedAddressConfigurationSupported', wintypes.ULONG, 1),
]
_anonymous_ = ['_2']
_fields_ = [('Flags', wintypes.ULONG), ('_2', _field_2)]
class NET_LUID_LH(ctypes.Union):
class _field_2(ctypes.Structure):
_fields_ = [
('_1', wintypes_ULONG64, 24),
('NetLuidIndex', wintypes_ULONG64, 24),
('IfType', wintypes_ULONG64, 16),
]
class _LUID(ctypes.Structure):
_fields_ = [
('LowPart', wintypes.ULONG),
('HighPart', wintypes.ULONG),
]
_anonymous_ = ['_2', '_LUID']
_fields_ = [('Value', wintypes_ULONG64), ('_2', _field_2), ('_LUID', _LUID)]
@property
def __value(self):
return {
'name': ConvertInterfaceLuidToNameW(self.Luid),
'addrs': self.FirstUnicastAddress.contents.value,
'_AdapterName': ctypes.string_at(self.AdapterName).decode('windows-1252'),
'_Dhcpv4Server': self.Dhcpv4Server.value.contents.value[0] if self.Dhcpv4Server.value else None,
'_Dhcpv6Server': self.Dhcpv6Server.value.contents.value[0] if self.Dhcpv6Server.value else None,
'_FriendlyName': ctypes.wstring_at(self.FriendlyName),
'_IfIndex': self.IfIndex,
'_Ipv6IfIndex': self.Ipv6IfIndex,
'_OperStatus': IP_ADAPTER_ADDRESSES.wintypes_IF_OPER_STATUS(self.OperStatus),
'_NoMulticast': bool(self.NoMulticast),
'_PhysicalAddress': bytes(
(ctypes.c_uint8*self.PhysicalAddressLength)
.from_buffer(self.PhysicalAddress)
).hex('-').upper() if self.PhysicalAddressLength > 0 else None
}
def __value_iter(self):
cur = self
yield cur.__value
while (next_ptr := cur.Next):
cur = next_ptr.contents
yield cur.__value
@property
def value(self):
return list(self.__value_iter())
class IP_ADAPTER_ADDRESSES_XP(ctypes.Structure):
def __init__(self, *a, **k):
raise NotImplementedError
MAX_ADAPTER_ADDRESS_LENGTH = 8
class wintypes_IF_OPER_STATUS(enum.IntEnum):
IfOperStatusUp = up = 1
IfOperStatusDown = down = 2
IfOperStatusTesting = testing = 3
IfOperStatusUnknown = unknown = 4
IfOperStatusDormant = dormant = 5
IfOperStatusNotPresent = notPresent = 6
IfOperStatusLowerLayerDown = lowerLayerDown = 7
class SOCKET_ADDRESS(ctypes.Structure):
class _SOCKADDR(ctypes.Union):
class SOCKADDR_IN(ctypes.Structure):
class IN_ADDR(ctypes.Structure):
class _field_1(ctypes.Union):
class _field_1(ctypes.Structure):
_fields_ = [
('s_b1', wintypes_UCHAR),
('s_b2', wintypes_UCHAR),
('s_b3', wintypes_UCHAR),
('s_b4', wintypes_UCHAR),
]
class _field_2(ctypes.Structure):
_fields_ = [
('s_w1', wintypes.USHORT),
('s_w2', wintypes.USHORT),
]
_anonymous_ = ['S_un_b', 'S_un_w']
_fields_ = [('S_un_b', _field_1), ('S_un_w', _field_2)]
_anonymous_ = ['S_un']
_fields_ = [('S_un', _field_1)]
@property
def value(self):
return ipaddress.IPv4Address(bytes(self.S_un.S_un_b))
_fields_ = [
('sin_family', wintypes.SHORT),
('sin_port', wintypes.USHORT),
('sin_addr', IN_ADDR),
('sin_zero', wintypes.CHAR*8),
]
@property
def value(self):
assert self.sin_family == AF_INET
return (
str(self.sin_addr.value),
self.sin_port
)
LPSOCKADDR_IN = ctypes.POINTER(SOCKADDR_IN)
class SOCKADDR_IN6(ctypes.Structure):
class IN6_ADDR(ctypes.Structure):
class _field_1(ctypes.Union):
_fields_ = [
('Byte', wintypes_UINT8*16),
('Word', wintypes.USHORT*8)
]
_anonymous_ = ['u']
_fields_ = [('u', _field_1)]
@property
def value(self):
return ipaddress.IPv6Address(bytes(self.Byte))
_fields_ = [
('sin6_family', wintypes.SHORT),
('sin6_port', wintypes.USHORT),
('sin6_flowinfo', wintypes.ULONG),
('sin6_addr', IN6_ADDR),
('sin6_scope_id', wintypes.ULONG)
]
@property
def value(self):
assert self.sin6_family == AF_INET6
return (
str(self.sin6_addr.value),
self.sin6_port,
self.sin6_flowinfo,
self.sin6_scope_id,
)
LPSOCKADDR_IN6 = ctypes.POINTER(SOCKADDR_IN6)
_anonymous_ = ['in_', 'in6']
_fields_ = [
('family', wintypes_ADDRESS_FAMILY),
('in_', SOCKADDR_IN),
('in6', SOCKADDR_IN6),
]
_choice = {
AF_INET: 'in_',
AF_INET6: 'in6',
}
@property
def value(self):
return getattr(self, self._choice[self.family]).value
_fields_ = [
('lpSockaddr', ctypes.POINTER(_SOCKADDR)),
('iSockaddrLength', wintypes.INT)
]
_choice = {
sizeof(_SOCKADDR.SOCKADDR_IN): _SOCKADDR.LPSOCKADDR_IN,
sizeof(_SOCKADDR.SOCKADDR_IN6): _SOCKADDR.LPSOCKADDR_IN6,
}
@property
def value(self):
return ctypes.cast(
self.lpSockaddr,
self._choice.get(
self.iSockaddrLength,
self.lpSockaddr.__class__
)
)
class IP_ADAPTER_UNICAST_ADDRESS(ctypes.Structure):
class _field_1(ctypes.Union):
class _field_2(ctypes.Structure):
_fields_ = [('Length', wintypes.ULONG), ('Flags', wintypes.DWORD)]
_anonymous_ = ['_2']
_fields_ = [('Alignment', wintypes_ULONGLONG), ('_2', _field_2)]
def __entry(self):
contents = self.Address.value.contents
if isinstance(contents, IP_ADAPTER_ADDRESSES.SOCKET_ADDRESS._SOCKADDR.SOCKADDR_IN):
host, port = contents.value
assert port == 0
return ipaddress.IPv4Interface(f"{host!s}/{self.OnLinkPrefixLength}")
elif isinstance(contents, IP_ADAPTER_ADDRESSES.SOCKET_ADDRESS._SOCKADDR.SOCKADDR_IN6):
host, port, flowinfo, scope_id = contents.value
assert port == 0
assert flowinfo == 0, f"flowinfo = {flowinfo}" # how should we export this if it's not 0???
if scope_id != 0 and not host.endswith(f"%{scope_id}"):
host = f"{host}%{scope_id}"
return ipaddress.IPv6Interface(f"{host}/{self.OnLinkPrefixLength}")
else:
raise NotImplementedError(type(self.Address.value.contents))
def __value_iter(self):
yield self.__entry()
next_ptr = self.Next
while next_ptr:
cur = next_ptr.contents
yield cur.__entry()
next_ptr = cur.Next
@property
def value(self):
return list(self.__value_iter())
IP_ADAPTER_UNICAST_ADDRESS._anonymous_ = ['_1']
IP_ADAPTER_UNICAST_ADDRESS._fields_ = [
('_1', IP_ADAPTER_UNICAST_ADDRESS._field_1),
('Next', ctypes.POINTER(IP_ADAPTER_UNICAST_ADDRESS)),
('Address', SOCKET_ADDRESS),
('PrefixOrigin', wintypes.UINT),
('SuffixOrigin', wintypes.UINT),
('DadState', wintypes.UINT),
('ValidLifetime', wintypes.ULONG),
('PreferredLifetime', wintypes.ULONG),
('LeaseLifetime', wintypes.ULONG),
('OnLinkPrefixLength', wintypes_UINT8),
]
class IP_ADAPTER_DNS_SUFFIX(ctypes.Structure):
MAX_DNS_SUFFIX_STRING_LENGTH = 256
IP_ADAPTER_DNS_SUFFIX._fields_ = [
('Next', ctypes.POINTER(IP_ADAPTER_DNS_SUFFIX)),
('String', wintypes.WCHAR*IP_ADAPTER_DNS_SUFFIX.MAX_DNS_SUFFIX_STRING_LENGTH),
]
@property
def value(self):
# "value" of a disambiguated Union type is the value of the underlying active type
length = self.Length
if length == 448:
return self.lh.value
else:
raise NotImplementedError(f"Length={length}")
IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH._anonymous_ = ['_1', '_13']
IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH._fields_ = [
('_1', IP_ADAPTER_ADDRESSES._field_1),
('Next', ctypes.POINTER(IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH)),
('AdapterName', wintypes.PCHAR),
('FirstUnicastAddress', ctypes.POINTER(IP_ADAPTER_ADDRESSES.IP_ADAPTER_UNICAST_ADDRESS)),
('FirstAnycastAddress', wintypes.LPVOID), # FIXME
('FirstMulticastAddress', wintypes.LPVOID), # FIXME
('FirstDnsServerAddress', wintypes.LPVOID), # FIXME
('DnsSuffix', wintypes.PWCHAR),
('Description', wintypes.PWCHAR),
('FriendlyName', wintypes.PWCHAR),
('PhysicalAddress', wintypes.BYTE*IP_ADAPTER_ADDRESSES.MAX_ADAPTER_ADDRESS_LENGTH),
('PhysicalAddressLength', wintypes.DWORD),
('_13', IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH._field_13),
('Mtu', wintypes.DWORD),
('IfType', wintypes_IFTYPE),
('OperStatus', wintypes.UINT),
('Ipv6IfIndex', wintypes_IF_INDEX),
('ZoneIndices', wintypes.ULONG*16),
('FirstPrefix', wintypes.LPVOID), # FIXME
('TransmitLinkSpeed', wintypes_ULONG64),
('ReceiveLinkSpeed', wintypes_ULONG64),
('FirstWinsServerAddress', wintypes.LPVOID), # FIXME
('FirstGatewayAddress', wintypes.LPVOID), # FIXME
('Ipv4Metric', wintypes.ULONG),
('Ipv6Metric', wintypes.ULONG),
('Luid', IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH.NET_LUID_LH),
('Dhcpv4Server', IP_ADAPTER_ADDRESSES.SOCKET_ADDRESS),
('CompartmentId', wintypes_NET_IF_COMPARTMENT_ID),
('NetworkGuid', wintypes_GUID),
('ConnectionType', wintypes_NET_IF_CONNECTION_TYPE),
('TunnelType', wintypes.UINT),
('Dhcpv6Server', IP_ADAPTER_ADDRESSES.SOCKET_ADDRESS),
('Dhcpv6ClientDuid', wintypes.BYTE*IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH.MAX_DHCPV6_DUID_LENGTH),
('Dhcpv6ClientDuidLength', wintypes.ULONG),
('Dhcpv6Iaid', wintypes.ULONG),
('FirstDnsSuffix', IP_ADAPTER_ADDRESSES.IP_ADAPTER_DNS_SUFFIX),
]
IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_XP._anonymous_ = ['_1']
IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_XP._fields_ = [
('_1', IP_ADAPTER_ADDRESSES._field_1),
('Next', ctypes.POINTER(IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_XP)),
('AdapterName', wintypes.PCHAR),
('FirstUnicastAddress', ctypes.POINTER(IP_ADAPTER_ADDRESSES.IP_ADAPTER_UNICAST_ADDRESS)),
('FirstAnycastAddress', wintypes.LPVOID), # FIXME
('FirstMulticastAddress', wintypes.LPVOID), # FIXME
('FirstDnsServerAddress', wintypes.LPVOID), # FIXME
('DnsSuffix', wintypes.PWCHAR),
('Description', wintypes.PWCHAR),
('FriendlyName', wintypes.PWCHAR),
('PhysicalAddress', wintypes.BYTE*IP_ADAPTER_ADDRESSES.MAX_ADAPTER_ADDRESS_LENGTH),
('PhysicalAddressLength', wintypes.DWORD),
('Flags', wintypes.DWORD),
('Mtu', wintypes.DWORD),
('IfType', wintypes_IFTYPE),
('OperStatus', wintypes.UINT),
('Ipv6IfIndex', wintypes_IF_INDEX),
('ZoneIndices', wintypes.DWORD*16),
('FirstPrefix', wintypes.LPVOID), # FIXME
]
IP_ADAPTER_ADDRESSES._anonymous_ = ['_1']
IP_ADAPTER_ADDRESSES._fields_ = [
('_1', IP_ADAPTER_ADDRESSES._field_1),
('lh', IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH),
('xp', IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_XP),
]
_ConvertInterfaceLuidToNameW = ctypes.oledll.Iphlpapi['ConvertInterfaceLuidToNameW']
_ConvertInterfaceLuidToNameW.argtypes = [
wintypes.LPVOID,
wintypes.PWCHAR,
wintypes.SIZE
]
def ConvertInterfaceLuidToNameW(luid):
buf = ctypes.create_unicode_buffer(wintypes_NDIS_IF_MAX_STRING_SIZE + 1)
ret = _ConvertInterfaceLuidToNameW(ctypes.byref(luid), buf, wintypes.SIZE(len(buf)))
assert ret == 0 # TODO: see if oledll raises or if we can omit this check
return buf.value
_GetAdaptersAddresses = ctypes.oledll.Iphlpapi['GetAdaptersAddresses']
_GetAdaptersAddresses.argtypes = [
wintypes.ULONG,
wintypes.ULONG,
wintypes.LPVOID,
wintypes.LPVOID,
wintypes.PULONG
]
def GetAdaptersAddresses(family=socket.AF_UNSPEC, flags=0, *, bufsize=1024*15):
buf = ctypes.create_string_buffer(bufsize)
ret2 = wintypes.ULONG(sizeof(buf))
ret = _GetAdaptersAddresses(family, flags, None, ctypes.byref(buf), ctypes.byref(ret2))
while ret == IP_ADAPTER_ADDRESSES.ERROR_BUFFER_OVERFLOW:
buf = ctypes.create_string_buffer(ret2.value)
assert ret2.value == sizeof(buf)
ret = _GetAdaptersAddresses(family, flags, None, ctypes.byref(buf), ctypes.byref(ret2))
if ret == 0:
return IP_ADAPTER_ADDRESSES.from_buffer(buf).value
else:
raise ctypes.WinError(ret)
else:
def _plat_lookup_ifindex(key):
##ifaddrs = getifaddrs()
raise NotImplementedError('lookup ifindex from ip or device identifier is TODO on Unix-like')
def _plat_lookup_ip(family, key):
##ifaddrs = getifaddrs()
raise NotImplementedError('lookip up ip address from ifindex or device identifier is TODO on Unix-like')
# https://git.kernel.org/pub/scm/docs/man-pages/man-pages.git/tree/man/man3/getifaddrs.3?h=man-pages-6.9.1&id=efc71944cf19b4b6accc4bb7d0228e0a8c551afa
# https://sourceware.org/git/?p=glibc.git;a=blob;f=sysdeps/unix/sysv/linux/ifaddrs.c;h=10b26d8b3c1c8f98eaad871d67a25510abe22933;hb=refs/heads/release/2.40/master#l453
# https://github.com/apple-oss-distributions/Libinfo/blob/Libinfo-542.40.3/gen.subproj/getifaddrs.3
libc = ctypes.CDLL(ctypes.util.find_library('c'))
def _iadd_or_iappend(collection, x):
if isinstance(collection, Sequence):
if isinstance(collection, tuple):
collection = list(collection)
collection.append(x)
else:
collection |= {x}
return collection
@contextmanager
def _wakeup_fd_ctx(fd, handle_overwrite=None, **k):
prev_fd = signal.set_wakeup_fd(fd, **k)
try:
if prev_fd == -1:
# Nominal case / "happy path": we did not overwrite any existing wakeup handler
logging.debug(f"signal wakeup fd set to %d.", fd)
yield None
else:
# We overwrote the existing handler
if handle_overwrite == 'report_restore':
logging.debug(f"signal wakeup fd already set (%d); overwrote it with %d; will automatically restore it after this context is exited.", prev_fd, fd)
yield prev_fd
elif handle_overwrite == 'report_norestore':
actual_prev_fd, prev_fd = prev_fd, -1
logging.debug(f"signal wakeup fd already set (%d); overwrote it with %d.", prev_fd, fd)
yield actual_prev_fd
elif handle_overwrite == 'drop':
prev_fd = -1
logging.debug(f"signal wakeup fd already set (%d); overwrote it with %d.", prev_fd, fd)
yield None
else:
raise RuntimeError(f"signal wakeup fd already set ({prev_fd}); not sure what to do about that. Restoring that wakeup fd and bailing out.")
finally:
if prev_fd is not None:
fd_removed_on_ctx_exit = signal.set_wakeup_fd(prev_fd)
if fd_removed_on_ctx_exit == fd:
if prev_fd != -1:
logging.debug(f"signal wakeup fd %d removed; previous value (%d) restored.", fd, prev_fd)
else:
logging.debug(f"signal wakeup fd %d removed.", fd)
else:
raise RuntimeError(f"entered context by setting signal wakeup fd {fd!r}, but ended up removing signal wakeup fd {fd_removed_on_ctx_exit!r} on context exit")
def _main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('address')
parser.add_argument('interface', nargs='?')
parser.add_argument('--use-async', action=argparse.BooleanOptionalAction, default=True)
params = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
addr = params.address
k = {}
if params.interface is not None:
try:
# ifindex
mcast_bind_interface = int(params.interface)
except ValueError:
# other interface specifier
mcast_bind_interface = params.interface
k['mcast_bind_interface'] = mcast_bind_interface
if params.use_async:
@(lambda f: asycio.run(f()))
async def main_async():
if hasattr(sys.stdout, 'buffer'):
async for packet in udp_listen_async(addr, **k):
# regular I/O
sys.stdout.buffer.write(packet)
sys.stdout.buffer.flush()
else:
async for packet in udp_listen_async(addr, **k):
# idlelib console
sys.stdout.write(packet.decode('utf-8', errors='surrogateescape'))
else:
if hasattr(sys.stdout, 'buffer'):
for packet in udp_listen_sync(addr, **k):
# regular I/O
sys.stdout.buffer.write(packet)
sys.stdout.buffer.flush()
else:
for packet in udp_listen_sync(addr, **k):
# idlelib console
sys.stdout.write(packet.decode('utf-8', errors='surrogateescape'))
if __name__ == '__main__':
_main()
@James-E-A
Copy link
Author

James-E-A commented Feb 28, 2025

this signal.set_wakeup_fd solution seems like the second-nastiest kludge ever for getting KeyboardInterrupt to play nice with socket.recv; is there a better way??
https://stackoverflow.com/q/78124574/1874170

at least it has the upside of instant response; the "nastiest" kludge would of course be setting a timeout on select.select and just periodically wakeing up, but still... this feels awkward

EDIT: ouch, maybe it doesn't get much better than this...
https://vorpus.org/blog/control-c-handling-in-python-and-trio/

@James-E-A
Copy link
Author

James-E-A commented Mar 6, 2025

for some reason, Ctrl-C doesn't work in IDLE on PyPy on Windows (testing on Linux TBD), though it still works in CMD on PyPy on Windows...
EDIT: lmao, I'm not alone here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment