Last active
September 11, 2025 02:09
-
-
Save James-E-A/3828775d68ea04a3e2906b7197d83778 to your computer and use it in GitHub Desktop.
Python UDP socket.recv() iterator supporting KeyboardInterrupt and Multicast
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Authored by James Edington Administrator in 2025; placed in the Public Domain. | |
| __all__ = ['udp_listen_sync', 'udp_listen_async'] | |
| """USAGE: | |
| from udp_helper import * | |
| # Simple iterator that responds instantly to Ctrl+C / SIGINT | |
| for packet in udp_listen_sync(("10.0.0.69", 4200)): | |
| print("Got packet:", packet.hex()) | |
| # Supports Multicast addresses seamlessly with the same syntax! | |
| for packet in udp_listen_sync(("224.0.0.42", 4200)): | |
| print("Got packet:", packet.hex()) | |
| # Also supports async, if you're into that! | |
| async for packet in udp_listen_async(("224.0.0.42", 4200)): | |
| print("Got packet:", packet.hex()) | |
| # Optional: constrain Multicast to *only* listening on a certain interface | |
| for packet in udp_listen_sync(("224.0.0.42", 4200), mcast_bind_only_host="10.0.0.69"): | |
| # only listening on NIC with IP 10.0.0.69 | |
| print("Got packet:", packet.hex()) | |
| # Optional: constrain Multicast to *only* listening on a certain interface | |
| for packet in udp_listen_sync(("ff05::2a", 4200), mcast_bind_only_ifindex=2): | |
| # only listening on first physical NIC | |
| print("Got packet:", packet.hex()) | |
| TODO: | |
| - Support discriminating between a packet addressed to the machine's LAN IP | |
| and a packet addressed to a Multicast group the machine was subscribed to, | |
| pending https://github.com/python/cpython/issues/80398#issuecomment-2302533639 | |
| - Fix support on binding IPv4 address by ifindex on Unix | |
| https://ko-fi.com/E_Administrator | |
| """ | |
| import asyncio | |
| from collections import Counter | |
| from collections.abc import Sequence, Set | |
| from contextlib import ExitStack, contextmanager, closing | |
| import ctypes | |
| from ctypes import sizeof | |
| import enum | |
| from functools import cache | |
| import ipaddress | |
| from itertools import chain, islice | |
| import logging | |
| from math import ceil, floor | |
| from operator import index | |
| import os | |
| import platform | |
| import re | |
| import select | |
| import signal | |
| import socket | |
| from socket import ( | |
| AF_INET, | |
| AF_INET6, | |
| INADDR_ANY, | |
| IPPROTO_IP, | |
| IPPROTO_IPV6, | |
| IPV6_JOIN_GROUP, | |
| IP_ADD_MEMBERSHIP, | |
| SOCK_DGRAM, | |
| inet_aton, | |
| inet_pton, | |
| ) | |
| import struct | |
| import sys | |
| import uuid | |
| import warnings | |
| try: | |
| import threading | |
| except ImportError: | |
| threading = None | |
| _platform_system = platform.system() | |
| _IFINDEX_ANY = { | |
| AF_INET: 0, | |
| AF_INET6: 0 | |
| } | |
| _HOST_ALL = { | |
| AF_INET: str(ipaddress.IPv4Address(INADDR_ANY)), | |
| AF_INET6: str(ipaddress.IPv6Address(INADDR_ANY)) | |
| } | |
| _PY_HOST_ALL = '' # a lot of the Python standard library expects this, so we should expect it, too | |
| # pack an integer (ifindex) into the format needed by mreq/mreq6 | |
| def _inet_iton(i): | |
| # NOTE: this uses machine endianness, **not** network byte order | |
| # as it's used for packing an integer into a struct | |
| # for the setsockopt system library call. | |
| return struct.pack('@I', i) | |
| try: | |
| # Linux/Unix | |
| from resource import getpagesize | |
| except ImportError: | |
| try: | |
| # Windows | |
| from ctypes import oledll, wintypes | |
| except ImportError: | |
| try: | |
| # generic fallback | |
| import mmap | |
| except ImportError: | |
| # last resort fallback | |
| def getpagesize(): | |
| return 512 | |
| getallocationgranularity = getpagesize | |
| else: | |
| # generic fallback (cont'd) | |
| def getpagesize(): | |
| return mmap.PAGESIZE | |
| getallocationgranularity = getpagesize | |
| else: | |
| # Windows (cont'd) | |
| @cache | |
| def getpagesize(): | |
| return GetSystemInfo()['pageSize'] | |
| @cache | |
| def getallocationgranularity(): | |
| return GetSystemInfo()['allocationGranularity'] | |
| wintypes_UINT_PTR = wintypes.UINT_PTR if hasattr(wintypes, 'UINT_PTR') else ctypes.c_size_t | |
| wintypes_DWORD_PTR = wintypes.DWORD_PTR if hasattr(wintypes, 'DWORD_PTR') else max([wintypes.DWORD, wintypes_UINT_PTR], key=sizeof) | |
| class wintypes_SYSTEM_INFO(ctypes.Structure): | |
| class _1(ctypes.Union): | |
| class _2(ctypes.Structure): | |
| _fields_ = [ | |
| ('wProcessorArchitecture', wintypes.WORD), | |
| ('wReserved', wintypes.WORD), | |
| ] | |
| _anonymous_ = ['_2'] | |
| _fields_ = [ | |
| ('dwOemId', wintypes.DWORD), | |
| ('_2', _2), | |
| ] | |
| _anonymous_ = ['_1'] | |
| _fields_ = [ | |
| ('_1', _1), | |
| ('dwPageSize', wintypes.DWORD), | |
| ('lpMinimumApplicationAddress', wintypes.LPVOID), | |
| ('lpMaximumApplicationAddress', wintypes.LPVOID), | |
| ('dwActiveProcessorMask', wintypes_DWORD_PTR), | |
| ('dwNumberOfProcessors', wintypes.DWORD), | |
| ('dwProcessorType', wintypes.DWORD), | |
| ('dwAllocationGranularity', wintypes.DWORD), | |
| ('wProcessorLevel', wintypes.WORD), | |
| ('wProcessorRevision', wintypes.WORD), | |
| ] | |
| @property | |
| def value(self): | |
| return { | |
| 'pageSize': self.dwPageSize, | |
| 'minimumApplicationAddress': ctypes.c_void_p(self.lpMinimumApplicationAddress), | |
| 'maximumApplicationAddress': ctypes.c_void_p(self.lpMaximumApplicationAddress), | |
| 'activeProcessorMask': self.dwActiveProcessorMask, | |
| '_activeProcessors': frozenset(i for i in range(self.dwNumberOfProcessors) if (self.dwActiveProcessorMask & (1<<i))), | |
| 'numberOfProcessors': self.dwNumberOfProcessors, | |
| 'allocationGranularity': self.dwAllocationGranularity, | |
| 'processorLevel': self.wProcessorLevel, | |
| 'processorRevision': self.wProcessorRevision, | |
| 'processorArchitecture': self.wProcessorArchitecture, | |
| 'oemId': self.dwOemId | |
| } | |
| _GetNativeSystemInfo = oledll.Kernel32['GetNativeSystemInfo'] | |
| _GetNativeSystemInfo.argtypes = [ | |
| wintypes.LPCVOID, | |
| ] | |
| def GetSystemInfo(): | |
| result = wintypes_SYSTEM_INFO() | |
| _GetNativeSystemInfo(ctypes.byref(result)) | |
| return result.value | |
| else: | |
| # Linux/Unix (cont'd) | |
| try: | |
| from resource import getallocationgranularity | |
| except ImportError: | |
| getallocationgranularity = getpagesize | |
| try: | |
| from socket import IP_PKTINFO | |
| except ImportError: | |
| # https://github.com/python/cpython/issues/75386 | |
| if 'DEFINE_IP_PKTINFO' in os.environ: | |
| IP_PKTINFO = int(os.environ['DEFINE_IP_PKTINFO']) | |
| else: | |
| if _platform_system == 'Windows': | |
| # https://learn.microsoft.com/en-us/windows/win32/winsock/ip-pktinfo#socket-option-value | |
| IP_PKTINFO = 19 | |
| elif _platform_system == 'Darwin': | |
| # https://github.com/apple-oss-distributions/xnu/blob/xnu-7195.81.3/bsd/netinet/in.h#L485 | |
| IP_PKTINFO = 26 | |
| elif _platform_system == 'Linux': | |
| # https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/in.h?h=v6.10&id=0c3836482481200ead7b416ca80c68a29cfdaabd#n104 | |
| IP_PKTINFO = 8 | |
| else: | |
| pass | |
| # http://mail-index.netbsd.org/tech-net/2017/12/29/msg006583.html | |
| try: | |
| # https://cgit.freebsd.org/src/tree/sys/netinet/in.h?h=release/14.2.0-p2#n444 | |
| # https://cvsweb.openbsd.org/src/sys/netinet/in.h?rev=1.148&content-type=text/x-cvsweb-markup&only_with_tag=OPENBSD_7_6#:~:text=%23define%09IP_RECVDSTADDR | |
| # https://cvsweb.netbsd.org/bsdweb.cgi/src/sys/netinet/in.h?rev=1.114;content-type=text%2Fx-cvsweb-markup;only_with_tag=netbsd-10-1-RELEASE#:~:text=%23define%09IP_RECVDSTADDR | |
| # https://github.com/python/cpython/commit/3fdf58b5cc56cf9837670f31dceb3ea4954bed8f#diff-412f0023cab7bc5669c6c1e710a6f37e7c3c387d299794b5fe82268a10223b29R1392 | |
| from socket import IP_RECVDSTADDR | |
| except ImportError: | |
| pass | |
| # simple iterator to listen on a UDP socket | |
| def udp_listen_sync( | |
| address=(_PY_HOST_ALL, 0), | |
| family=None, | |
| flags=0, *, | |
| sockopts=None, | |
| bufsize=None, | |
| auto_mcast=True, | |
| mcast_bind_only_host=None, | |
| mcast_bind_only_ifindex=None, | |
| use_wakeup=True, | |
| wakeup_socket=None, | |
| wakeup_timeout=None, | |
| _force_allow_busy_select=False, | |
| _force_allow_duplicate_mreq=False, | |
| _force_allow_mreqn=False, | |
| _force_allow_nondaemon_nonmain_thread=False | |
| ): | |
| # argument normalization and validation | |
| if bufsize is None: | |
| if platform.python_implementation() == 'CPython': | |
| # https://github.com/python/cpython/blob/v3.13.2/Modules/socketmodule.c#L3718 | |
| bufsize = max(getallocationgranularity() - sys.getsizeof(b''), 512) | |
| else: | |
| bufsize = getpagesize() | |
| if wakeup_timeout == 0.0: | |
| if not _force_allow_busy_select: | |
| raise ValueError("Value of 0 seconds provided for wakeup_timeout; this would execute a continual busy poll which would waste a tremendous amount of CPU time for no benefit.\nTo disable the timeout, pass wakeup_timeout=None instead.") | |
| else: | |
| warnings.warn("Value of 0 seconds provided for wakeup_timeout; this will execute a continual busy poll which will waste a tremendous amount of CPU time for no benefit.\nTo disable the timeout, pass wakeup_timeout=None instead.", RuntimeWarning) | |
| if use_wakeup: | |
| if (wakeup_socket is False) and (wakeup_timeout is None): | |
| raise ValueError("use_wakeup was enabled but wakeup_socket and wakeup_timeout were both disabled. Either omit wakeup_socket=False, pass a finite wakeup_timeout=..., or pass use_wakeup=False.") | |
| else: | |
| if wakeup_socket: | |
| raise ValueError("use_wakeup was disabled but wakeup_socket was set. Either omit wakeup_socket=..., or omit use_wakeup=False.") | |
| if wakeup_timeout is not None: | |
| raise ValueError("use_wakeup was disabled but wakeup_timeout was set. Either omit wakeup_timeout=..., or omit use_wakeup=False.") | |
| # body | |
| with ExitStack() as ctx: | |
| # make UDP socket | |
| sock = ctx.enter_context(_make_udp_socket( | |
| address, | |
| family, | |
| flags, | |
| sockopts=sockopts, | |
| auto_mcast=auto_mcast, | |
| mcast_bind_only_host=mcast_bind_only_host, | |
| mcast_bind_only_ifindex=mcast_bind_only_ifindex, | |
| _force_allow_duplicate_mreq=_force_allow_duplicate_mreq, | |
| _force_allow_mreqn=_force_allow_mreqn | |
| )) | |
| rlist = [sock] | |
| # make Ctrl-C work without any delay | |
| if use_wakeup: | |
| parent_wakeup_fd = None | |
| if wakeup_socket is None: | |
| # default codepath, caller wants regular behavior and did not give us a wakeup socket | |
| if (threading is None) or (threading.current_thread() is threading.main_thread()): | |
| # This signal wakeup FD fix only works if we're "the main thread of the main interpreter" | |
| # but how are we supposed to check that??? | |
| # https://github.com/python/cpython/blob/v3.13.2/Modules/signalmodule.c#L505 | |
| # https://peps.python.org/pep-0734/ | |
| # 1. Create internal socketpair | |
| wakeup_socket, wakeup_socket_2 = socket.socketpair() | |
| # 2. Attach one end of the socket to the signal handler | |
| parent_wakeup_fd = ctx.enter_context(_wakeup_fd_ctx(wakeup_socket_2.fileno(), 'report_restore')) | |
| # 3. Listen on the other end of it, to wake this thread up when a signal is received | |
| # even though recv would otherwise block signal handling | |
| rlist.insert(0, wakeup_socket) | |
| else: | |
| # When the main script is killed with ctrl-c or whatever, | |
| # Python automatically harvests all "daemon" threads. | |
| # Since we're running on a non-main thread, | |
| # we should just ensure that we're running as a daemon, | |
| # and we'll get the expected behavior. | |
| if (not threading.current_thread().daemon) and (not _force_allow_nondaemon_nonmain_thread): | |
| raise ValueError("thread must be started with daemon=true to listen synchronously on UDP") | |
| elif wakeup_socket is not False: | |
| # caller-provided wakeup socket | |
| # we are going to listen on it, but we are NOT going to | |
| # register it as a signal handler since that could interfere | |
| # with the caller's actual usage (e.g. could be a pipe from | |
| # another thread used in some not-literally-POSIX-signals IPC scheme); | |
| # if the caller actually wants to attach it as the wakeup handler, | |
| # they are of course perfectly free to do that themselves | |
| rlist.insert(0, wakeup_socket) | |
| else: | |
| # caller passed False singleton to DISABLE wakeup socket | |
| # and ONLY use the "dumb" periodic wakeup | |
| assert wakeup_timeout is not None | |
| # main loop | |
| sock_recv = sock.recv | |
| for ready in _iter_readable_socket_fds_forever(rlist, timeout=wakeup_timeout): | |
| if ready is sock: | |
| yield sock_recv(bufsize, flags) | |
| elif ready is wakeup_socket: | |
| if parent_wakeup_fd is not None: | |
| # NOTE: receiving potentially more than 1 byte | |
| # because we aren't guaranteed that the caller's | |
| # simply using a signal wakeup socket | |
| # and we want to make this passthrough work reasonably well | |
| os.write(parent_wakeup_fd, wakeup_socket.recv(bufsize)) | |
| else: | |
| # NOTE: We actually don't need to handle SIGINT or CTRL_C_EVENT; | |
| # if one of those was the signal in question, then | |
| # the Python runtime will have already raised KeyboardInterrupt | |
| # out of the select.select() call return boundary | |
| # and this block won't even be reached... | |
| # TODO: double-check that "ignoring" is reasonable behavior here, | |
| # or if our hooking into the wakeup socket has secretly suppressed | |
| # some other logic the caller still expects from the Python runtime | |
| # NOTE: receiving just 1 byte | |
| # because we are guaranteed that this is simply a signal wakeup | |
| # and if many signals arrived in quick succession | |
| # then we'll just enter this loop multiple times anyway so no problem | |
| logging.debug(f"ignoring: %r", signal.Signals(wakeup_socket.recv(1)[0])) | |
| elif __debug__: | |
| raise RuntimeError(f'ready = {ready!r}') | |
| # simple async iterator to listen on a UDP socket | |
| async def udp_listen_async( | |
| address=(_PY_HOST_ALL, 0), | |
| family=None, | |
| flags=0, *, | |
| sockopts=None, | |
| auto_mcast=True, | |
| mcast_bind_only_host=None, | |
| mcast_bind_only_ifindex=None, | |
| _force_allow_duplicate_mreq=False, | |
| _force_allow_mreqn=False | |
| ): | |
| with ExitStack() as ctx: | |
| loop = asyncio.get_running_loop() | |
| sock = _make_udp_socket( | |
| address, | |
| family, | |
| flags, | |
| sockopts=sockopts, | |
| auto_mcast=auto_mcast, | |
| mcast_bind_only_host=mcast_bind_only_host, | |
| mcast_bind_only_ifindex=mcast_bind_only_ifindex, | |
| _force_allow_duplicate_mreq=_force_allow_duplicate_mreq, | |
| _force_allow_mreqn=_force_allow_mreqn | |
| ) | |
| try: | |
| t, proto = await loop.create_datagram_endpoint( | |
| _SimpleDgramListenIterProto, | |
| sock=sock | |
| ) | |
| except Exception: | |
| # we own the socket | |
| sock.close() | |
| raise | |
| else: | |
| # transport owns the socket | |
| ctx.callback(t.close) | |
| # "async yield from" does not exist yet -- https://discuss.python.org/t/yield-from-in-async-functions/47050 | |
| async for data in proto: | |
| yield data | |
| # helper class to glue the asyncio callbacks into a simple (async) iterable | |
| class _SimpleDgramListenIterProto(asyncio.DatagramProtocol): | |
| __slots__ = ('__queue', '_transport') | |
| def __init__(self): | |
| # FIXME: Is there any way to do away with this queue?? | |
| self.__queue = asyncio.Queue() | |
| def connection_made(self, transport): | |
| # FIXME: Is this actually needed for SOCK_DGRAM?? | |
| self._transport = transport | |
| def datagram_received(self, data, addr): | |
| self.__queue.put_nowait(data) | |
| def __aiter__(self): | |
| return self | |
| async def __anext__(self): | |
| # FIXME: what's the right way to handle KeyboardInterrupt? | |
| # catching CancelledError and transmuting it to StopAsyncIteration | |
| # seems to suppress the KeyboardInterrupt entirely... | |
| return await self.__queue.get() | |
| # Creates, applies sockopts to, binds, and returns a UDP socket. | |
| def _make_udp_socket( | |
| address, | |
| family, | |
| flags=0, *, | |
| sockopts=frozenset(), | |
| auto_mcast, | |
| mcast_bind_only_host, | |
| mcast_bind_only_ifindex, | |
| _force_allow_duplicate_mreq, | |
| _force_allow_mreqn | |
| ): | |
| if (not auto_mcast) and (mcast_bind_only_host) or (mcast_bind_only_ifindex): | |
| raise ValueError("mreq construction was disabled but one of mcast_bind_only_host or mcast_bind_only_ifindex was set. Either omit mcast_bind_only_XXXXX=..., or omit auto_mcast=False.") | |
| # allow string addresses instead of socket module tuples for convenience | |
| if isinstance(address, str): | |
| address = _parse_ip_uri(address, expect_scheme='udp', require_scheme=False) | |
| if address[1] is None: | |
| address = (address[0], 0, *address[2:]) | |
| # best practice - new list i/c/o mutation | |
| if sockopts is None: | |
| sockopts = [] | |
| # how annoying it is to be required to specify this | |
| # when it COULD HAVE EASILY BEEN DEDUCED from the address format... | |
| # save the caller this nuisance, if we can | |
| if family is None: | |
| assert isinstance(address, (tuple, Sequence)) and len(address) >= 1 and isinstance(address[0], str) | |
| if address[0] == _PY_HOST_ALL: | |
| warnings.warn("Specify family=AF_INET or family=AF_INET6 when binding all hosts with implicit syntax.", DeprecationWarning) | |
| family = AF_INET # assume | |
| else: | |
| family = {4: AF_INET, 6: AF_INET6}[ipaddress.ip_address(address[0]).version] # guess | |
| # simplify interface for listening on Multicast | |
| if auto_mcast and _is_multicast(address[0]): | |
| if not _force_allow_duplicate_mreq: | |
| assert not _has_mreq(sockopts) | |
| if family == AF_INET: | |
| if _platform_system == 'Windows': | |
| if mcast_bind_only_host is None: | |
| mcast_bind_only_host = _PY_HOST_ALL | |
| if mcast_bind_only_ifindex is None: | |
| mcast_bind_only_host_or_ifindex = _PY_HOST_ALL | |
| else: | |
| mcast_bind_only_host_or_ifindex = mcast_bind_only_ifindex | |
| else: | |
| if mcast_bind_only_ifindex is None: | |
| mcast_bind_only_host_or_ifindex = mcast_bind_only_host | |
| else: | |
| # caller set mcast_bind_only_host *and* mcast_bind_only_ifindex explicitly. | |
| # while Windows doesn't support including these both in an IPv4 mreq, | |
| # the former constraint is going to be implicitly applied with the bind() call, | |
| # so we'll just apply the latter constraint via the mreq | |
| mcast_bind_only_host_or_ifindex = mcast_bind_only_ifindex | |
| address, mcast_group = ((mcast_bind_only_host, *address[1:]), address[0]) | |
| sockopts = _iadd_or_iappend(sockopts, _make_mreq_4_win(mcast_group, mcast_bind_only_host_or_ifindex)) | |
| elif _platform_system in {'Linux', 'FreeBSD'} or _force_allow_mreqn: | |
| # both Linux and FreeBSD accuse the basic mreq struct of being "legacy", | |
| # so we'll give them the fancy mreqn struct | |
| if mcast_bind_only_host is None: | |
| mcast_bind_only_host = _PY_HOST_ALL | |
| if mcast_bind_only_ifindex is None: | |
| mcast_bind_only_ifindex = _IFINDEX_ANY[family] | |
| address, mcast_group = ((mcast_bind_only_host, *address[1:]), address[0]) | |
| sockopts = _iadd_or_iappend(sockopts, _make_mreq_4_unix(mcast_group, mcast_bind_only_host, mcast_bind_only_ifindex)) | |
| else: | |
| # generic Unix may be safely assumed to support mreq, | |
| # but may NOT be safely assumed to support mreqn. | |
| # OpenBSD, NetBSD, and Mac OS all describe mreq as | |
| # the sole valid argument type for IP_ADD_MEMBERSHIP. | |
| if mcast_bind_only_host is None: | |
| if mcast_bind_only_ifindex is None: | |
| # no constraints, just use default value for mcast_bind_only_host | |
| mcast_bind_only_host = _PY_HOST_ALL | |
| else: | |
| # ifindex constraint only: try to support it on a best-effort basis | |
| mcast_bind_only_host = _lookup_ip(family, mcast_bind_only_ifindex) | |
| else: | |
| if mcast_bind_only_ifindex is None: | |
| # host is set and ifindex constraint is unset. | |
| # parameters are already perfect for _make_mreq_4_unix(). | |
| pass | |
| else: | |
| # unlike Windows, generic Unix does not allow passing an ifindex in via the imr_interface field | |
| # so we have no way to respect both these constraints | |
| raise ValueError(f"cannot specify both mcast_bind_only_ifindex and mcast_bind_only_host with {family!r} on {_platform_system}") | |
| address, mcast_group = ((mcast_bind_only_host), address[0]) | |
| sockopts = _iadd_or_iappend(sockopts, _make_mreq_4_unix(mcast_group, mcast_bind_only_host)) | |
| elif family == AF_INET6: | |
| # thank god, at least this interface was flattened | |
| if mcast_bind_only_host is None: | |
| mcast_bind_only_host = _PY_HOST_ALL | |
| if mcast_bind_only_ifindex is None: | |
| mcast_bind_only_ifindex = _IFINDEX_ANY[family] | |
| address, mcast_group = ((mcast_bind_only_host, *address[1:]), address[0]) | |
| sockopts = _iadd_or_iappend(sockopts, _make_mreq_6(mcast_group, mcast_bind_only_ifindex)) | |
| else: | |
| raise NotImplementedError(f"family={family!r}") | |
| # configure and start socket | |
| sock = socket.socket(family, SOCK_DGRAM) | |
| try: | |
| for opt in sockopts: | |
| sock.setsockopt(*opt) | |
| sock.bind(address) | |
| if sock.family in {AF_INET, AF_INET6} and not address[1]: | |
| # slightly more important debugging message as the port was randomly assigned | |
| logging.info("laddr=%r, sockopts=%r", sock.getsockname(), sockopts) | |
| else: | |
| logging.debug("laddr=%r, sockopts=%r", sock.getsockname(), sockopts) | |
| except Exception: | |
| sock.close() | |
| raise | |
| else: | |
| return sock | |
| # NOTE: check kqueue before epoll | |
| # because some BSDs also have an epoll shim | |
| # we don't want to accidentally use instead | |
| if hasattr(select, 'kqueue'): | |
| # BSDs | |
| def _iter_readable_socket_fds_forever(rlist, timeout=None): | |
| with closing(select.kqueue()) as kq: | |
| kev_list = [select.kqueue.event(obj, select.KQ_FILTER_READ, select.KQ_EV_ADD) for obj in rlist] | |
| max_ev = len(kev_list) | |
| if (max_ev == 0) and (timeout is not None): | |
| # bug in kqueue: timeout is ignored when max_ev = 0 | |
| # workaround h/t: https://github.com/python/cpython/blob/v3.13.2/Lib/selectors.py#L541-L545 | |
| max_ev = 1 | |
| # kqueue yields bare integers but we want to treat the caller to their familiar objects | |
| # h/t: https://github.com/python/cpython/blob/v3.13.2/Lib/selectors.py#L556 | |
| # https://github.com/python/cpython/blob/v3.13.2/Lib/selectors.py#L248 | |
| # https://github.com/python/cpython/blob/v3.13.2/Lib/selectors.py#L37 | |
| fd_to_key = { | |
| k: obj | |
| for obj in rlist | |
| for k in ([obj, int(obj.fileno())] if hasattr(obj, 'fileno') else [int(obj)]) | |
| } | |
| fd_to_key_get = fd_to_key.__getitem__ | |
| kq_control = kq.control | |
| kq_control(kev_list, 0, 0) | |
| while True: | |
| yield from (fd_to_key_get(kev.ident) for kev in kq_control(None, max_ev, timeout)) | |
| elif False: | |
| # Windows since 3.5 | |
| # TODO: steal code from https://github.com/python/cpython/blob/v3.13.2/Lib/asyncio/windows_events.py#L488 | |
| # NOTE: adding support for this will require substantially changing the interface of this function. | |
| # "After an instance of an open handle is associated with an I/O completion port, it cannot | |
| # be used in the ReadFileEx or WriteFileEx function because these functions have their own | |
| # asynchronous I/O mechanisms." (!!!) | |
| def _iter_readable_socket_fds_forever(rlist, timeout=None): | |
| # windll.Kernel32.CreateIoCompletionPort(None, None, 0, 0) | |
| with select_iocp.iocp() as cp: | |
| for (i, obj) in enumerate(rlist): | |
| # windll.Kernel32.CreateIoCompletionPort(obj.fileno(), cp._handle, i, 0) | |
| cp.register(obj, i, None) | |
| cp_get_ex = cp.get_ex | |
| rlist_get = rlist.__getitem__ | |
| while True: | |
| # https://learn.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatusex | |
| yield from (rlist_get(ev.completionKey) for ev in cp_get_ex()) | |
| elif hasattr(select, 'epoll'): | |
| # Linux since 2.5 | |
| def _iter_readable_socket_fds_forever(rlist, timeout=None): | |
| with select.epoll(sizehint=len(rlist)) as p: | |
| for obj in rlist: | |
| p.register(obj, select.EPOLLIN) | |
| poll_ = p.poll | |
| while True: | |
| yield from (ev[0] for ev in poll_(timeout)) | |
| elif hasattr(select, 'devpoll'): | |
| # completionist | |
| def _iter_readable_socket_fds_forever(rlist, timeout=None): | |
| with closing(select.devpoll()) as p: | |
| for obj in rlist: | |
| p.register(obj, select.POLLIN) | |
| poll_ = p.poll | |
| while True: | |
| yield from (ev[0] for ev in poll_(timeout)) | |
| elif hasattr(select, 'poll'): | |
| # reasonable fallback on most Unix | |
| def _iter_readable_socket_fds_forever(rlist, timeout=None): | |
| with ExitStack() as ctx: | |
| p = select.poll() | |
| for obj in rlist: | |
| p.register(obj, select.POLLIN) | |
| ctx.callback(p.unregister, obj) # Python documentation does not **guarantee** us that this is unnecessary | |
| poll_ = p.poll | |
| while True: | |
| yield from (ev[0] for ev in poll_(timeout)) | |
| elif _platform_system == 'Windows' and sys.getwindowsversion() >= (6,): | |
| # Windows since Vista | |
| # NOTE: we do NOT need to check sys.getwindowsversion() >= (10, 0, 19041) | |
| # because we are ONLY using this INTERNAL METHOD with incoming udp | |
| # so the infamous Windows 8 bug #309411 that affects newly established outgoing tcp | |
| # doesn't affect THIS codebase anyway | |
| from ctypes import windll, wintypes | |
| def _iter_readable_socket_fds_forever(rlist, timeout=None): | |
| # NOTE: for the winpoll api, it is correct to "fail" to unregister file descriptors | |
| # as the poll object does not refer to any kind of winpoll fd that ought to be closed | |
| # nor is there any registration/unregistration logic relating to the file descriptors | |
| # -- i.e. the Python GC, if any, is completely sufficient for correctness here | |
| # because "unregistering" each file descriptor on unwind | |
| # when the winpoll object is not going to be reused | |
| # is **guaranteed** to be a waste of time | |
| p = _winpoll(sizehint=len(rlist)) | |
| for obj in rlist: | |
| p.register(obj, select_POLLIN) | |
| poll_ = p._poll | |
| timeout_ms = -1 if timeout is None else ceil(timeout * 1000) if timeout >= 0 else floor(timeout * 1000) | |
| while True: | |
| yield from (ev[0] for ev in poll_(timeout_ms)) | |
| wintypes_UINT_PTR = wintypes.UINT_PTR if hasattr(wintypes, 'UINT_PTR') else ctypes.c_size_t | |
| wintypes_SOCKET = wintypes.SOCKET if hasattr(wintypes, 'SOCKET') else wintypes_UINT_PTR | |
| # https://github.com/tpn/winsdk-10/blob/master/Include/10.0.10240.0/um/WinSock2.h#L392 | |
| wintypes_SOCKET_ERROR = wintypes.SOCKET_ERROR if hasattr(wintypes, 'SOCKET_ERROR') else -1 | |
| # https://learn.microsoft.com/en-us/windows/win32/winsock/windows-sockets-error-codes-2 | |
| wintypes_WSAEINVAL = wintypes.WSAEINVAL if hasattr(wintypes, 'WSAEINVAL') else 10022 | |
| wintypes_WSAEINTR = wintypes.WSAEINTR if hasattr(wintypes, 'WSAEINTR') else 10004 | |
| # https://github.com/tpn/winsdk-10/blob/master/Include/10.0.10240.0/um/WinSock2.h#L1589 | |
| select_POLLRDNORM = select.POLLRDNORM if hasattr(select, 'POLLRDNORM') else 0x0100 | |
| select_POLLRDBAND = select.POLLRDBAND if hasattr(select, 'POLLRDBAND') else 0x0200 | |
| select_POLLIN = select.POLLIN if hasattr(select, 'POLLIN') else (select_POLLRDNORM | select_POLLRDBAND) | |
| select_POLLPRI = select.POLLPRI if hasattr(select, 'POLLPRI') else 0x0400 | |
| select_POLLWRNORM = select.POLLWRNORM if hasattr(select, 'POLLWRNORM') else 0x0010 | |
| select_POLLOUT = select.POLLOUT if hasattr(select, 'POLLOUT') else select_POLLWRNORM | |
| select_POLLWRBAND = select.POLLWRBAND if hasattr(select, 'POLLWRBAND') else 0x0020 | |
| select_POLLERR = select.POLLERR if hasattr(select, 'POLLERR') else 0x0001 | |
| select_POLLHUP = select.POLLHUP if hasattr(select, 'POLLHUP') else 0x0002 | |
| select_POLLNVAL = select.POLLNVAL if hasattr(select, 'POLLNVAL') else 0x0004 | |
| class WSAPOLLFD(ctypes.Structure): | |
| __slots__ = () | |
| _fields_ = [ | |
| ('fd', wintypes_SOCKET), | |
| ('events', wintypes.SHORT), | |
| ('revents', wintypes.SHORT), | |
| ] | |
| def _getfd(fileobj): | |
| return int(fileobj.fileno()) if hasattr(fileobj, 'fileno') else index(fileobj) | |
| # workaround -- https://github.com/python/cpython/issues/60711#issuecomment-2698306318 | |
| class _winpoll: | |
| # * "impl" is a ctypes array of WSAPOLLFD objects | |
| # we maintain "impl" as compact and overallocated; if the caller registers any negative or invalid FDs, | |
| # then they will be liable for dealing with any POLLNVAL results | |
| # | |
| # * "fd2obj" is a dict which maps the fd value of each occupied slot in "impl" to the Python object it goes with | |
| __slots__ = ('__impl_buf', '__fd2obj') | |
| def __init__(self, sizehint=max(getallocationgranularity() // sizeof(WSAPOLLFD), 1)): | |
| self.__impl_buf = ctypes.create_string_buffer(sizeof(WSAPOLLFD * sizehint)) | |
| self.__fd2obj = {} | |
| # debug function to make sure internal invariants hold | |
| def _check(self): | |
| impl_buf = self.__impl_buf | |
| fd2obj = self.__fd2obj | |
| # INVARIANT: impl_buf must be at least big enough to hold all our allegedly registered fds | |
| impl_view_t = WSAPOLLFD * len(fd2obj) | |
| if sizeof(impl_view_t) > sizeof(impl_buf): | |
| raise AssertionError("impl_buf too small") | |
| # INVARIANT: the actual values contained within impl_buf must agree with our allegedly registered fds | |
| impl_view = impl_view_t.from_buffer(impl_buf) | |
| if set(impl.fd for impl in impl_view) != fd2obj.keys(): | |
| raise AssertionError("mismatch between fdArray and fd2obj registry") | |
| # accepts optional int or float seconds, returns a list of (fileobj, eventmask) | |
| def poll(self, timeout=None): | |
| # we round amounts up if needed because | |
| # it'd be incorrect to wait any *less* than the requested interval | |
| return list(self._poll(-1 if timeout is None else ceil(timeout * 1000) if timeout >= 0 else floor(timeout * 1000))) | |
| # accepts integer milliseconds, returns an iterator of (fileobj, eventmask) | |
| def _poll(self, timeout_ms=-1): | |
| impl_buf = self.__impl_buf | |
| fd2obj = self.__fd2obj | |
| fds = len(fd2obj) | |
| impl_view_t = WSAPOLLFD * fds | |
| impl_view = impl_view_t.from_buffer(impl_buf) | |
| # call WSAPoll | |
| while True: | |
| ret = _WSAPoll(impl_buf, fds, timeout_ms) | |
| # we must handle errors manually because wintypes doesn't handle them; | |
| # we are using wintypes because oledll handles this function's errors incorrectly | |
| if ret == wintypes_SOCKET_ERROR: | |
| # https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsapoll#return-value | |
| ret = _WSAGetLastError() | |
| # https://peps.python.org/pep-0475/ | |
| # https://github.com/python/cpython/blob/v3.13.3/Modules/selectmodule.c#L675-L701 | |
| if ret == WSAEINTR: | |
| continue | |
| raise ctypes.WinError(ret) | |
| assert fds >= ret >= 0, "WSAPoll return value was not zero, greater than zero but less than fds, or exactly SOCKET_ERROR" | |
| break | |
| impl_view = (WSAPOLLFD * fds).from_buffer(impl_buf) | |
| fd2obj_getitem = fd2obj.__getitem__ | |
| yield from islice( | |
| ( | |
| (fd2obj_getitem(fd), ev) | |
| for fd, ev in ((impl.fd, impl.revents) for impl in impl_view) | |
| if ev # don't call fd2objgetitem for objects which had no events | |
| ), | |
| ret # islice returns early when possible | |
| ) | |
| def register(self, fileobj, eventmask=(select_POLLIN|select_POLLPRI|select_POLLOUT)): | |
| fd = _getfd(fileobj) | |
| if __debug__: self._check() | |
| impl_buf = self.__impl_buf | |
| fd2obj = self.__fd2obj | |
| fds = len(fd2obj) | |
| impl_view_t = WSAPOLLFD * fds | |
| impl_view = impl_view_t.from_buffer(impl_buf) | |
| for impl in impl_view: | |
| if impl.fd == fd: | |
| # select existing slot with this fd to update existing registration | |
| break | |
| else: | |
| # allocate new slot for new registration | |
| fds_1 = fds + 1 | |
| if sizeof(WSAPOLLFD * fds_1) > sizeof(impl_buf): | |
| # always at least double actual internal allocation size to avoid too many re-allocations if this is filled in a loop | |
| ctypes.resize( | |
| impl_buf, | |
| _smallest_multiple_atleast( | |
| getallocationgranularity(), | |
| sizeof(WSAPOLLFD * max(fds*2, fds_1)) | |
| ) | |
| ) | |
| impl_view_t = WSAPOLLFD * fds_1 | |
| impl_view = impl_view_t.from_buffer(impl_buf) | |
| # select newly allocated slot | |
| impl = impl_view[-1] | |
| impl.fd = fd | |
| # set entry registration | |
| impl.events = eventmask | |
| fd2obj[fd] = fileobj | |
| if __debug__: self._check() | |
| def modify(self, fileobj, eventmask): | |
| fd = _getfd(fileobj) | |
| if __debug__: self._check() | |
| impl_buf = self.__impl_buf | |
| fd2obj = self.__fd2obj | |
| fds = len(fd2obj) | |
| impl_view_t = WSAPOLLFD * fds | |
| impl_view = impl_view_t.from_buffer(impl_buf) | |
| for impl in impl_view: | |
| if impl.fd == fd: | |
| # select existing slot to update existing registration | |
| break | |
| else: | |
| raise KeyError(f"{fileobj!r} is not registered") | |
| # set entry registration | |
| impl.events = eventmask | |
| fd2obj[fd] = fileobj | |
| if __debug__: self._check() | |
| def unregister(self, fileobj): | |
| fd = _getfd(fileobj) | |
| if __debug__: self._check() | |
| impl_buf = self.__impl_buf | |
| fd2obj = self.__fd2obj | |
| fds = len(fd2obj) | |
| impl_view_t = WSAPOLLFD * fds | |
| impl_view = impl_view_t.from_buffer(impl_buf) | |
| for i, impl in enumerate(impl_view): | |
| if impl.fd == fd: | |
| # select existing slot to delete existing registration | |
| break | |
| else: | |
| raise KeyError(f"{fileobj!r} is not registered") | |
| fds_1 = fds - 1 | |
| if i < fds_1: | |
| # deleting entry in the middle of the buffer; need to re-compact array | |
| ctypes.memmove( | |
| ctypes.addressof(impl_view[i]), | |
| ctypes.addressof(impl_view[i+1]), | |
| sizeof(impl_view._type_) * (fds_1 - i) | |
| ) | |
| # unset entry registration | |
| del fd2obj[fd] | |
| if __debug__: self._check() | |
| _WSAPoll = windll.Ws2_32['WSAPoll'] | |
| _WSAPoll.argtypes = [ | |
| wintypes.LPVOID, | |
| wintypes.ULONG, | |
| wintypes.INT, | |
| ] | |
| _WSAGetLastError = windll.Ws2_32['WSAGetLastError'] | |
| else: | |
| # one of the fallbacks of all time | |
| def _iter_readable_socket_fds_forever(rlist, timeout=None): | |
| _empty = [] | |
| select_ = select.select | |
| while True: | |
| yield from select_(rlist, _empty, _empty, timeout)[0] | |
| def _is_multicast(host): | |
| if host == _PY_HOST_ALL: | |
| return False | |
| if isinstance(host, str): | |
| host = ipaddress.ip_address(host) | |
| return host.is_multicast | |
| POSSIBLE_IP_ADDRESS_OR_URI = re.compile(r'(?i)^(?:((?!\d)\w+)(?::/?/?))?\[?((?<!\[)[0-9\.]+(?!\])|[0-9A-F:]+(?:%\w+?)?)\]?(?::(\d+))?$') | |
| def _parse_ip_uri(s, *, expect_scheme=False, require_scheme=False): | |
| m = re.match(POSSIBLE_IP_ADDRESS_OR_URI, s) | |
| if m: | |
| if expect_scheme is not False: | |
| # expect_scheme is None -> expect the ABSENCE of a scheme | |
| # isinstance(expect_scheme, str) -> expect some particular scheme, but by default allow the absence of a scheme | |
| # expect_scheme is False -> do not expect any particular scheme | |
| # require_scheme == True -> do NOT allow absence of an expected scheme | |
| scheme = m.group(1).lower() if m.group(1) is not None else None | |
| if ((scheme is not None) or require_scheme) and (scheme != expect_scheme): | |
| raise ValueError(f"expected a URI with {expect_scheme} scheme, got a URI with {scheme} scheme") | |
| host = m.group(2) | |
| port = int(m.group(3)) if m.group(3) is not None else None | |
| if __debug__: ipaddress.ip_address(host) # fail early on bad values | |
| return (host, port) | |
| raise ValueError(f"could not parse {s!r} as an IP or IP URI with optional port and no path") | |
| def _lookup_ip(family, obj, *, try_str_as_ip_first=False): | |
| if obj == _PY_HOST_ALL: | |
| return _HOST_ALL[family] | |
| if try_str_as_ip_first and isinstance(obj, str): | |
| try: | |
| ipaddress.ip_address(obj) | |
| return obj | |
| except ValueError: | |
| pass | |
| return _plat_lookup_ip(family, obj) | |
| def _lookup_ifindex(key): | |
| # if we were given exactly an integer, just return it. | |
| if isinstance(key, int): | |
| return key | |
| else: | |
| try: | |
| return int(key) | |
| except ValueError: | |
| pass | |
| return _plat_lookup_ifindex(key) | |
| def _make_mreq_4_unix(mcast_group, interface_address=_PY_HOST_ALL, ifindex=None): | |
| if interface_address == _PY_HOST_ALL: | |
| interface_address = _HOST_ALL[AF_INET] | |
| if ifindex is None: | |
| # https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/in.h?h=v6.10&id=0c3836482481200ead7b416ca80c68a29cfdaabd#n178 | |
| # https://github.com/apple-oss-distributions/xnu/blob/xnu-7195.81.3/bsd/netinet/in.h#L563 | |
| # https://docs.oracle.com/en/operating-systems/solaris/oracle-solaris/11.4/prog-interfaces/receiving-ipv4-multicast-datagrams.html | |
| return ( | |
| IPPROTO_IP, | |
| IP_ADD_MEMBERSHIP, | |
| ( | |
| inet_aton(mcast_group) + | |
| inet_aton(interface_address) | |
| ) | |
| ) | |
| else: | |
| # https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/in.h?h=v6.10&id=0c3836482481200ead7b416ca80c68a29cfdaabd#n183 | |
| # https://cgit.freebsd.org/src/tree/share/man/man4/ip.4?h=release/14.2.0-p2#n566 | |
| return ( | |
| IPPROTO_IP, | |
| IP_ADD_MEMBERSHIP, | |
| ( | |
| inet_aton(mcast_group) + | |
| inet_aton(interface_address) + | |
| _inet_iton(ifindex) | |
| ) | |
| ) | |
| def _make_mreq_4_win(mcast_group, interface_address_or_ifindex=_PY_HOST_ALL): | |
| if interface_address_or_ifindex == _PY_HOST_ALL: | |
| interface_address = _HOST_ALL[AF_INET] | |
| elif isinstance(interface_address_or_ifindex, int): | |
| interface_address_ = ipaddress.IPv4Address(interface_address_or_ifindex) | |
| assert interface_address_ in ipaddress.IPv4Network("0.0.0.0/8"),\ | |
| "https://learn.microsoft.com/en-us/windows/win32/api/ws2ipdef/ns-ws2ipdef-ip_mreq#:~:text=The%20imr_interface%20member%20can%20be%20an%20interface%20index.,.%20The%200.0.0.0/8%20IPv4%20address%20block%20is%20not%20used%20(this%20range%20is%20reserved)." | |
| interface_address = str(interface_address_) | |
| else: | |
| interface_address = interface_address_or_ifindex | |
| # https://learn.microsoft.com/en-us/windows/win32/api/ws2ipdef/ns-ws2ipdef-ip_mreq#syntax | |
| return ( | |
| IPPROTO_IP, | |
| IP_ADD_MEMBERSHIP, | |
| ( | |
| inet_aton(mcast_group) + | |
| inet_aton(interface_address) | |
| ) | |
| ) | |
| def _make_mreq_6(mcast_group, ifindex=_IFINDEX_ANY[AF_INET6]): | |
| # https://learn.microsoft.com/en-us/windows/win32/api/ws2ipdef/ns-ws2ipdef-ipv6_mreq#syntax | |
| # https://github.com/apple-oss-distributions/xnu/blob/xnu-7195.81.3/bsd/netinet6/in6.h#L729 | |
| # https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/in6.h?h=v6.10&id=0c3836482481200ead7b416ca80c68a29cfdaabd#n60 | |
| return ( | |
| IPPROTO_IPV6, | |
| IPV6_JOIN_GROUP, | |
| ( | |
| inet_pton(AF_INET6, mcast_group) + | |
| _inet_iton(ifindex) | |
| ) | |
| ) | |
| def _has_mreq(sockopts): | |
| return any( | |
| ( | |
| (opt[0] == IPPROTO_IP and opt[1] == IP_ADD_MEMBERSHIP) or | |
| (opt[0] == IPPROTO_IPV6 and opt[1] == IPV6_JOIN_GROUP) | |
| ) | |
| for opt in sockopts | |
| if len(opt) >= 2 | |
| ) | |
| def _smallest_multiple_atleast(base, minimum_value): | |
| return ((minimum_value + base - 1) // base) * base | |
| def _ipaddress_interface_get_ip(interface): | |
| if (interface.version == 6) and (interface.scope_id is not None): | |
| return interface.ip.__class__(f"{interface.ip}%{interface.scope_id}") | |
| else: | |
| return interface.ip | |
| if _platform_system == 'Windows': | |
| from ctypes import windll, wintypes | |
| def _plat_lookup_ifindex(key): | |
| adaptersaddresses = GetAdaptersAddresses() | |
| # if we were given exactly the name of some interface, return that interface. | |
| d = { | |
| record[k_]: record | |
| for record in adaptersaddresses | |
| for k_ in ['name', '_AdapterName', '_FriendlyName'] | |
| } | |
| if key in d: | |
| return d[key]['_IfIndex'] | |
| # if we were given exactly a MAC address, return the interface with that MAC address. | |
| # accepts colon or dash separated hexadecimal octets. case-insensitive. | |
| if isinstance(key, str): | |
| mac = key.upper() | |
| d = { | |
| k: record | |
| for record in adaptersaddresses | |
| if '_PhysicalAddress' in record | |
| for k in [ | |
| record['_PhysicalAddress'].upper(), | |
| record['_PhysicalAddress'].replace('-', ':').upper() | |
| ] | |
| } | |
| if mac in d: | |
| return d[mac]['_IfIndex'] | |
| # if we were given an IP which is exactly the IP of some interface, return that interface. | |
| # accepts "naked" IP, "interface" IP-with-netmask, | |
| # and "just a subnet" which MUST EXACTLY MATCH the subnet of some interface. | |
| try: | |
| if isinstance(key, ipaddress._BaseAddress): | |
| ip = key | |
| if hasattr(key, 'hostmask') and ((int(ip) & int(ip.hostmask)) == 0): | |
| warnings.warn(f"Automatically converting questionable {ip.__class__.__name__} into {ip.network.__class__.__name__}", DeprecationWarning) | |
| ip = ip.network | |
| else: | |
| try: | |
| ip = ipaddress.ip_interface(key) | |
| if (int(ip) & int(ip.hostmask)) == 0: | |
| ip = ip.network | |
| except ValueError as e0: | |
| try: | |
| ip = ipaddress.ip_address(key) | |
| except ValueError: | |
| raise e0 from None | |
| except ValueError: | |
| pass | |
| else: | |
| d = { | |
| k: record | |
| for record in adaptersaddresses | |
| for addr in reversed(record['addrs']) # prioritize first address | |
| for k in [addr, _ipaddress_interface_get_ip(addr), addr.network] | |
| } | |
| if ip in d: | |
| return d[ip][{4: '_IfIndex', 6: '_Ipv6IfIndex'}[ip.version]] | |
| raise KeyError(key) | |
| def _plat_lookup_ip(family, key): | |
| if key == _IFINDEX_ANY[family]: | |
| return _HOST_ALL[family] | |
| adaptersaddresses = GetAdaptersAddresses() | |
| vers = {AF_INET: 4, AF_INET6: 6}[family] | |
| if isinstance(key, int): | |
| k = {AF_INET: '_IfIndex', AF_INET6: '_Ipv6IfIndex'}[family] | |
| d = { | |
| record[k]: _ipaddress_interface_get_ip(addr) | |
| for record in adaptersaddresses | |
| if (record.get(k) is not None) and (record['_OperStatus'] == IP_ADAPTER_ADDRESSES.wintypes_IF_OPER_STATUS.up) | |
| for addr in reversed(record['addrs']) # prioritize first address | |
| if addr.version == vers | |
| } | |
| elif isinstance(key, str): | |
| k_ = ['name', '_AdapterName', '_FriendlyName', '_PhysicalAddress'] | |
| d = { | |
| record[k]: _ipaddress_interface_get_ip(addr) | |
| for record in adaptersaddresses | |
| # NOTE: no need to filter this by OperStatus | |
| for addr in reversed(record['addrs']) # prioritize first address | |
| if addr.version == vers | |
| for k in k_ | |
| } | |
| else: | |
| raise TypeError(type(key)) | |
| return str(d[key]) | |
| wintypes_IFTYPE = wintypes.IFTYPE if hasattr(wintypes, 'IFTYPE') else wintypes.UINT | |
| wintypes_IF_INDEX = wintypes.IF_INDEX if hasattr(wintypes, 'IF_INDEX') else wintypes.ULONG | |
| wintypes_NET_IF_COMPARTMENT_ID = wintypes.NET_IF_COMPARTMENT_ID if hasattr(wintypes, 'NET_IF_COMPARTMENT_ID') else ctypes.c_uint32 | |
| wintypes_NET_IF_CONNECTION_TYPE = wintypes.NET_IF_CONNECTION_TYPE if hasattr(wintypes, 'NET_IF_CONNECTION_TYPE') else wintypes.UINT | |
| wintypes_NDIS_IF_MAX_STRING_SIZE = wintypes.NDIS_IF_MAX_STRING_SIZE if hasattr(wintypes, 'NDIS_IF_MAX_STRING_SIZE') else wintypes.IF_MAX_STRING_SIZE if hasattr(wintypes, 'IF_MAX_STRING_SIZE') else 256 | |
| wintypes_ADDRESS_FAMILY = wintypes.ADDRESS_FAMILY if hasattr(wintypes, 'ADDRESS_FAMILY') else wintypes.USHORT | |
| wintypes_UINT8 = wintypes.UINT8 if hasattr(wintypes, 'UINT8') else ctypes.c_uint8 | |
| wintypes_UCHAR = wintypes.UCHAR if hasattr(wintypes, 'UCHAR') else ctypes.c_uchar if hasattr(ctypes, 'c_uchar') else ctypes.c_ubyte | |
| wintypes_ULONG64 = wintypes.ULONG64 if hasattr(wintypes, 'ULONG64') else ctypes.c_uint64 | |
| wintypes_ULONGLONG = wintypes.ULONGLONG if hasattr(wintypes, 'ULONGLONG') else ctypes.c_ulonglong | |
| _StringFromGUID2 = ctypes.oledll.Ole32['StringFromGUID2'] | |
| _CLSIDFromString = ctypes.oledll.Ole32['CLSIDFromString'] | |
| class wintypes_GUID(ctypes.Structure): | |
| _fields_ = [ | |
| ('Data1', ctypes.c_uint32), | |
| ('Data2', ctypes.c_uint16), | |
| ('Data3', ctypes.c_uint16), | |
| ('Data4', ctypes.c_uint8*8), | |
| ] | |
| def __init__(self, *a, **k): | |
| if len(a) == 1 and isinstance(value := a[0], (str, uuid.UUID)): | |
| if isinstance(value, uuid.UUID): | |
| value = f"{{{value!s}}}" | |
| super().__init__(**k) | |
| self.__load_from_str(value) | |
| else: | |
| super().__init__(*a, **k) | |
| def __repr__(self): | |
| return f"{self.__class__.__name__}({str(self)!r})" | |
| def __str__(self): | |
| # we need +1 wchar because StringFromGUID2 weirdly does | |
| # both null-termination AND explicit written-length signalling | |
| buf = ctypes.create_unicode_buffer(39) | |
| ret = _StringFromGUID2(ctypes.byref(self), ctypes.byref(buf), len(buf)) | |
| # also, oledll weirdly neither raises nor sets GetLastError when StringFromGUID2 fails | |
| if ret == 0: raise ctypes.WinError(0x00000008) | |
| result = buf.value | |
| assert (len(result) + 1 == ret) or (len(result) == ret) | |
| return result | |
| def __load_from_str(self, s): | |
| # oledll raises when CLSIDFromString fails | |
| _CLSIDFromString(s, ctypes.byref(self)) | |
| def __bool__(self): | |
| return bool(self.Data1 or self.Data2 or self.Data3 or any(self.Data4)) | |
| @property | |
| def value(self): | |
| return uuid.UUID(str(self)) | |
| class IP_ADAPTER_ADDRESSES(ctypes.Union): | |
| ERROR_BUFFER_OVERFLOW = 111 | |
| class _field_1(ctypes.Union): | |
| class _field_2(ctypes.Structure): | |
| _fields_ = [('Length', wintypes.ULONG), ('IfIndex', wintypes_IF_INDEX)] | |
| _anonymous_ = ['_2'] | |
| _fields_ = [('Alignment', wintypes.ULONG), ('_2', _field_2)] | |
| class IP_ADAPTER_ADDRESSES_LH(ctypes.Structure): | |
| MAX_DHCPV6_DUID_LENGTH = 130 | |
| class _field_13(ctypes.Union): | |
| class _field_2(ctypes.Structure): | |
| _fields_ = [ | |
| ('DdnsEnabled', wintypes.ULONG, 1), | |
| ('RegisterAdapterSuffix', wintypes.ULONG, 1), | |
| ('Dhcpv4Enabled', wintypes.ULONG, 1), | |
| ('ReceiveOnly', wintypes.ULONG, 1), | |
| ('NoMulticast', wintypes.ULONG, 1), | |
| ('Ipv6OtherStatefulConfig', wintypes.ULONG, 1), | |
| ('NetbiosOverTcpipEnabled', wintypes.ULONG, 1), | |
| ('Ipv4Enabled', wintypes.ULONG, 1), | |
| ('Ipv6Enabled', wintypes.ULONG, 1), | |
| ('Ipv6ManagedAddressConfigurationSupported', wintypes.ULONG, 1), | |
| ] | |
| _anonymous_ = ['_2'] | |
| _fields_ = [('Flags', wintypes.ULONG), ('_2', _field_2)] | |
| class NET_LUID_LH(ctypes.Union): | |
| class _field_2(ctypes.Structure): | |
| _fields_ = [ | |
| ('_1', wintypes_ULONG64, 24), | |
| ('NetLuidIndex', wintypes_ULONG64, 24), | |
| ('IfType', wintypes_ULONG64, 16), | |
| ] | |
| class _LUID(ctypes.Structure): | |
| _fields_ = [ | |
| ('LowPart', wintypes.ULONG), | |
| ('HighPart', wintypes.ULONG), | |
| ] | |
| _anonymous_ = ['_2', '_LUID'] | |
| _fields_ = [('Value', wintypes_ULONG64), ('_2', _field_2), ('_LUID', _LUID)] | |
| @property | |
| def __value(self): | |
| return { | |
| 'name': ConvertInterfaceLuidToNameW(self.Luid), | |
| 'addrs': self.FirstUnicastAddress.contents.value, | |
| '_AdapterName': ctypes.string_at(self.AdapterName).decode('windows-1252'), | |
| '_Dhcpv4Server': self.Dhcpv4Server.value.contents.value[0] if self.Dhcpv4Server.value else None, | |
| '_Dhcpv6Server': self.Dhcpv6Server.value.contents.value[0] if self.Dhcpv6Server.value else None, | |
| '_FriendlyName': ctypes.wstring_at(self.FriendlyName), | |
| '_IfIndex': self.IfIndex, | |
| '_Ipv6IfIndex': self.Ipv6IfIndex, | |
| '_OperStatus': IP_ADAPTER_ADDRESSES.wintypes_IF_OPER_STATUS(self.OperStatus), | |
| '_NoMulticast': bool(self.NoMulticast), | |
| '_PhysicalAddress': bytes( | |
| (ctypes.c_uint8*self.PhysicalAddressLength) | |
| .from_buffer(self.PhysicalAddress) | |
| ).hex('-').upper() if self.PhysicalAddressLength > 0 else None | |
| } | |
| def __value_iter(self): | |
| cur = self | |
| yield cur.__value | |
| while (next_ptr := cur.Next): | |
| cur = next_ptr.contents | |
| yield cur.__value | |
| @property | |
| def value(self): | |
| return list(self.__value_iter()) | |
| class IP_ADAPTER_ADDRESSES_XP(ctypes.Structure): | |
| def __init__(self, *a, **k): | |
| raise NotImplementedError | |
| MAX_ADAPTER_ADDRESS_LENGTH = 8 | |
| class wintypes_IF_OPER_STATUS(enum.IntEnum): | |
| IfOperStatusUp = up = 1 | |
| IfOperStatusDown = down = 2 | |
| IfOperStatusTesting = testing = 3 | |
| IfOperStatusUnknown = unknown = 4 | |
| IfOperStatusDormant = dormant = 5 | |
| IfOperStatusNotPresent = notPresent = 6 | |
| IfOperStatusLowerLayerDown = lowerLayerDown = 7 | |
| class SOCKET_ADDRESS(ctypes.Structure): | |
| class _SOCKADDR(ctypes.Union): | |
| class SOCKADDR_IN(ctypes.Structure): | |
| class IN_ADDR(ctypes.Structure): | |
| class _field_1(ctypes.Union): | |
| class _field_1(ctypes.Structure): | |
| _fields_ = [ | |
| ('s_b1', wintypes_UCHAR), | |
| ('s_b2', wintypes_UCHAR), | |
| ('s_b3', wintypes_UCHAR), | |
| ('s_b4', wintypes_UCHAR), | |
| ] | |
| class _field_2(ctypes.Structure): | |
| _fields_ = [ | |
| ('s_w1', wintypes.USHORT), | |
| ('s_w2', wintypes.USHORT), | |
| ] | |
| _anonymous_ = ['S_un_b', 'S_un_w'] | |
| _fields_ = [('S_un_b', _field_1), ('S_un_w', _field_2)] | |
| _anonymous_ = ['S_un'] | |
| _fields_ = [('S_un', _field_1)] | |
| @property | |
| def value(self): | |
| return ipaddress.IPv4Address(bytes(self.S_un.S_un_b)) | |
| _fields_ = [ | |
| ('sin_family', wintypes.SHORT), | |
| ('sin_port', wintypes.USHORT), | |
| ('sin_addr', IN_ADDR), | |
| ('sin_zero', wintypes.CHAR*8), | |
| ] | |
| @property | |
| def value(self): | |
| assert self.sin_family == AF_INET | |
| return ( | |
| str(self.sin_addr.value), | |
| self.sin_port | |
| ) | |
| LPSOCKADDR_IN = ctypes.POINTER(SOCKADDR_IN) | |
| class SOCKADDR_IN6(ctypes.Structure): | |
| class IN6_ADDR(ctypes.Structure): | |
| class _field_1(ctypes.Union): | |
| _fields_ = [ | |
| ('Byte', wintypes_UINT8*16), | |
| ('Word', wintypes.USHORT*8) | |
| ] | |
| _anonymous_ = ['u'] | |
| _fields_ = [('u', _field_1)] | |
| @property | |
| def value(self): | |
| return ipaddress.IPv6Address(bytes(self.Byte)) | |
| _fields_ = [ | |
| ('sin6_family', wintypes.SHORT), | |
| ('sin6_port', wintypes.USHORT), | |
| ('sin6_flowinfo', wintypes.ULONG), | |
| ('sin6_addr', IN6_ADDR), | |
| ('sin6_scope_id', wintypes.ULONG) | |
| ] | |
| @property | |
| def value(self): | |
| assert self.sin6_family == AF_INET6 | |
| return ( | |
| str(self.sin6_addr.value), | |
| self.sin6_port, | |
| self.sin6_flowinfo, | |
| self.sin6_scope_id, | |
| ) | |
| LPSOCKADDR_IN6 = ctypes.POINTER(SOCKADDR_IN6) | |
| _anonymous_ = ['in_', 'in6'] | |
| _fields_ = [ | |
| ('family', wintypes_ADDRESS_FAMILY), | |
| ('in_', SOCKADDR_IN), | |
| ('in6', SOCKADDR_IN6), | |
| ] | |
| _choice = { | |
| AF_INET: 'in_', | |
| AF_INET6: 'in6', | |
| } | |
| @property | |
| def value(self): | |
| return getattr(self, self._choice[self.family]).value | |
| _fields_ = [ | |
| ('lpSockaddr', ctypes.POINTER(_SOCKADDR)), | |
| ('iSockaddrLength', wintypes.INT) | |
| ] | |
| _choice = { | |
| sizeof(_SOCKADDR.SOCKADDR_IN): _SOCKADDR.LPSOCKADDR_IN, | |
| sizeof(_SOCKADDR.SOCKADDR_IN6): _SOCKADDR.LPSOCKADDR_IN6, | |
| } | |
| @property | |
| def value(self): | |
| return ctypes.cast( | |
| self.lpSockaddr, | |
| self._choice.get( | |
| self.iSockaddrLength, | |
| self.lpSockaddr.__class__ | |
| ) | |
| ) | |
| class IP_ADAPTER_UNICAST_ADDRESS(ctypes.Structure): | |
| class _field_1(ctypes.Union): | |
| class _field_2(ctypes.Structure): | |
| _fields_ = [('Length', wintypes.ULONG), ('Flags', wintypes.DWORD)] | |
| _anonymous_ = ['_2'] | |
| _fields_ = [('Alignment', wintypes_ULONGLONG), ('_2', _field_2)] | |
| def __entry(self): | |
| contents = self.Address.value.contents | |
| if isinstance(contents, IP_ADAPTER_ADDRESSES.SOCKET_ADDRESS._SOCKADDR.SOCKADDR_IN): | |
| host, port = contents.value | |
| assert port == 0 | |
| return ipaddress.IPv4Interface(f"{host!s}/{self.OnLinkPrefixLength}") | |
| elif isinstance(contents, IP_ADAPTER_ADDRESSES.SOCKET_ADDRESS._SOCKADDR.SOCKADDR_IN6): | |
| host, port, flowinfo, scope_id = contents.value | |
| assert port == 0 | |
| assert flowinfo == 0, f"flowinfo = {flowinfo}" # how should we export this if it's not 0??? | |
| if scope_id != 0 and not host.endswith(f"%{scope_id}"): | |
| host = f"{host}%{scope_id}" | |
| return ipaddress.IPv6Interface(f"{host}/{self.OnLinkPrefixLength}") | |
| else: | |
| raise NotImplementedError(type(self.Address.value.contents)) | |
| def __value_iter(self): | |
| yield self.__entry() | |
| next_ptr = self.Next | |
| while next_ptr: | |
| cur = next_ptr.contents | |
| yield cur.__entry() | |
| next_ptr = cur.Next | |
| @property | |
| def value(self): | |
| return list(self.__value_iter()) | |
| IP_ADAPTER_UNICAST_ADDRESS._anonymous_ = ['_1'] | |
| IP_ADAPTER_UNICAST_ADDRESS._fields_ = [ | |
| ('_1', IP_ADAPTER_UNICAST_ADDRESS._field_1), | |
| ('Next', ctypes.POINTER(IP_ADAPTER_UNICAST_ADDRESS)), | |
| ('Address', SOCKET_ADDRESS), | |
| ('PrefixOrigin', wintypes.UINT), | |
| ('SuffixOrigin', wintypes.UINT), | |
| ('DadState', wintypes.UINT), | |
| ('ValidLifetime', wintypes.ULONG), | |
| ('PreferredLifetime', wintypes.ULONG), | |
| ('LeaseLifetime', wintypes.ULONG), | |
| ('OnLinkPrefixLength', wintypes_UINT8), | |
| ] | |
| class IP_ADAPTER_DNS_SUFFIX(ctypes.Structure): | |
| MAX_DNS_SUFFIX_STRING_LENGTH = 256 | |
| IP_ADAPTER_DNS_SUFFIX._fields_ = [ | |
| ('Next', ctypes.POINTER(IP_ADAPTER_DNS_SUFFIX)), | |
| ('String', wintypes.WCHAR*IP_ADAPTER_DNS_SUFFIX.MAX_DNS_SUFFIX_STRING_LENGTH), | |
| ] | |
| @property | |
| def value(self): | |
| # "value" of a disambiguated Union type is the value of the underlying active type | |
| length = self.Length | |
| if length == 448: | |
| return self.lh.value | |
| else: | |
| raise NotImplementedError(f"Length={length}") | |
| IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH._anonymous_ = ['_1', '_13'] | |
| IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH._fields_ = [ | |
| ('_1', IP_ADAPTER_ADDRESSES._field_1), | |
| ('Next', ctypes.POINTER(IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH)), | |
| ('AdapterName', wintypes.PCHAR), | |
| ('FirstUnicastAddress', ctypes.POINTER(IP_ADAPTER_ADDRESSES.IP_ADAPTER_UNICAST_ADDRESS)), | |
| ('FirstAnycastAddress', wintypes.LPVOID), # FIXME | |
| ('FirstMulticastAddress', wintypes.LPVOID), # FIXME | |
| ('FirstDnsServerAddress', wintypes.LPVOID), # FIXME | |
| ('DnsSuffix', wintypes.PWCHAR), | |
| ('Description', wintypes.PWCHAR), | |
| ('FriendlyName', wintypes.PWCHAR), | |
| ('PhysicalAddress', wintypes.BYTE*IP_ADAPTER_ADDRESSES.MAX_ADAPTER_ADDRESS_LENGTH), | |
| ('PhysicalAddressLength', wintypes.DWORD), | |
| ('_13', IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH._field_13), | |
| ('Mtu', wintypes.DWORD), | |
| ('IfType', wintypes_IFTYPE), | |
| ('OperStatus', wintypes.UINT), | |
| ('Ipv6IfIndex', wintypes_IF_INDEX), | |
| ('ZoneIndices', wintypes.ULONG*16), | |
| ('FirstPrefix', wintypes.LPVOID), # FIXME | |
| ('TransmitLinkSpeed', wintypes_ULONG64), | |
| ('ReceiveLinkSpeed', wintypes_ULONG64), | |
| ('FirstWinsServerAddress', wintypes.LPVOID), # FIXME | |
| ('FirstGatewayAddress', wintypes.LPVOID), # FIXME | |
| ('Ipv4Metric', wintypes.ULONG), | |
| ('Ipv6Metric', wintypes.ULONG), | |
| ('Luid', IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH.NET_LUID_LH), | |
| ('Dhcpv4Server', IP_ADAPTER_ADDRESSES.SOCKET_ADDRESS), | |
| ('CompartmentId', wintypes_NET_IF_COMPARTMENT_ID), | |
| ('NetworkGuid', wintypes_GUID), | |
| ('ConnectionType', wintypes_NET_IF_CONNECTION_TYPE), | |
| ('TunnelType', wintypes.UINT), | |
| ('Dhcpv6Server', IP_ADAPTER_ADDRESSES.SOCKET_ADDRESS), | |
| ('Dhcpv6ClientDuid', wintypes.BYTE*IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH.MAX_DHCPV6_DUID_LENGTH), | |
| ('Dhcpv6ClientDuidLength', wintypes.ULONG), | |
| ('Dhcpv6Iaid', wintypes.ULONG), | |
| ('FirstDnsSuffix', IP_ADAPTER_ADDRESSES.IP_ADAPTER_DNS_SUFFIX), | |
| ] | |
| IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_XP._anonymous_ = ['_1'] | |
| IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_XP._fields_ = [ | |
| ('_1', IP_ADAPTER_ADDRESSES._field_1), | |
| ('Next', ctypes.POINTER(IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_XP)), | |
| ('AdapterName', wintypes.PCHAR), | |
| ('FirstUnicastAddress', ctypes.POINTER(IP_ADAPTER_ADDRESSES.IP_ADAPTER_UNICAST_ADDRESS)), | |
| ('FirstAnycastAddress', wintypes.LPVOID), # FIXME | |
| ('FirstMulticastAddress', wintypes.LPVOID), # FIXME | |
| ('FirstDnsServerAddress', wintypes.LPVOID), # FIXME | |
| ('DnsSuffix', wintypes.PWCHAR), | |
| ('Description', wintypes.PWCHAR), | |
| ('FriendlyName', wintypes.PWCHAR), | |
| ('PhysicalAddress', wintypes.BYTE*IP_ADAPTER_ADDRESSES.MAX_ADAPTER_ADDRESS_LENGTH), | |
| ('PhysicalAddressLength', wintypes.DWORD), | |
| ('Flags', wintypes.DWORD), | |
| ('Mtu', wintypes.DWORD), | |
| ('IfType', wintypes_IFTYPE), | |
| ('OperStatus', wintypes.UINT), | |
| ('Ipv6IfIndex', wintypes_IF_INDEX), | |
| ('ZoneIndices', wintypes.DWORD*16), | |
| ('FirstPrefix', wintypes.LPVOID), # FIXME | |
| ] | |
| IP_ADAPTER_ADDRESSES._anonymous_ = ['_1'] | |
| IP_ADAPTER_ADDRESSES._fields_ = [ | |
| ('_1', IP_ADAPTER_ADDRESSES._field_1), | |
| ('lh', IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_LH), | |
| ('xp', IP_ADAPTER_ADDRESSES.IP_ADAPTER_ADDRESSES_XP), | |
| ] | |
| _ConvertInterfaceLuidToNameW = ctypes.oledll.Iphlpapi['ConvertInterfaceLuidToNameW'] | |
| _ConvertInterfaceLuidToNameW.argtypes = [ | |
| wintypes.LPVOID, | |
| wintypes.PWCHAR, | |
| wintypes.SIZE | |
| ] | |
| def ConvertInterfaceLuidToNameW(luid): | |
| buf = ctypes.create_unicode_buffer(wintypes_NDIS_IF_MAX_STRING_SIZE + 1) | |
| ret = _ConvertInterfaceLuidToNameW(ctypes.byref(luid), buf, wintypes.SIZE(len(buf))) | |
| assert ret == 0 # TODO: see if oledll raises or if we can omit this check | |
| return buf.value | |
| _GetAdaptersAddresses = ctypes.oledll.Iphlpapi['GetAdaptersAddresses'] | |
| _GetAdaptersAddresses.argtypes = [ | |
| wintypes.ULONG, | |
| wintypes.ULONG, | |
| wintypes.LPVOID, | |
| wintypes.LPVOID, | |
| wintypes.PULONG | |
| ] | |
| def GetAdaptersAddresses(family=socket.AF_UNSPEC, flags=0, *, bufsize=1024*15): | |
| buf = ctypes.create_string_buffer(bufsize) | |
| ret2 = wintypes.ULONG(sizeof(buf)) | |
| ret = _GetAdaptersAddresses(family, flags, None, ctypes.byref(buf), ctypes.byref(ret2)) | |
| while ret == IP_ADAPTER_ADDRESSES.ERROR_BUFFER_OVERFLOW: | |
| buf = ctypes.create_string_buffer(ret2.value) | |
| assert ret2.value == sizeof(buf) | |
| ret = _GetAdaptersAddresses(family, flags, None, ctypes.byref(buf), ctypes.byref(ret2)) | |
| if ret == 0: | |
| return IP_ADAPTER_ADDRESSES.from_buffer(buf).value | |
| else: | |
| raise ctypes.WinError(ret) | |
| else: | |
| def _plat_lookup_ifindex(key): | |
| ##ifaddrs = getifaddrs() | |
| raise NotImplementedError('lookup ifindex from ip or device identifier is TODO on Unix-like') | |
| def _plat_lookup_ip(family, key): | |
| ##ifaddrs = getifaddrs() | |
| raise NotImplementedError('lookip up ip address from ifindex or device identifier is TODO on Unix-like') | |
| # https://git.kernel.org/pub/scm/docs/man-pages/man-pages.git/tree/man/man3/getifaddrs.3?h=man-pages-6.9.1&id=efc71944cf19b4b6accc4bb7d0228e0a8c551afa | |
| # https://sourceware.org/git/?p=glibc.git;a=blob;f=sysdeps/unix/sysv/linux/ifaddrs.c;h=10b26d8b3c1c8f98eaad871d67a25510abe22933;hb=refs/heads/release/2.40/master#l453 | |
| # https://github.com/apple-oss-distributions/Libinfo/blob/Libinfo-542.40.3/gen.subproj/getifaddrs.3 | |
| libc = ctypes.CDLL(ctypes.util.find_library('c')) | |
| def _iadd_or_iappend(collection, x): | |
| if isinstance(collection, Sequence): | |
| if isinstance(collection, tuple): | |
| collection = list(collection) | |
| collection.append(x) | |
| else: | |
| collection |= {x} | |
| return collection | |
| @contextmanager | |
| def _wakeup_fd_ctx(fd, handle_overwrite=None, **k): | |
| prev_fd = signal.set_wakeup_fd(fd, **k) | |
| try: | |
| if prev_fd == -1: | |
| # Nominal case / "happy path": we did not overwrite any existing wakeup handler | |
| logging.debug(f"signal wakeup fd set to %d.", fd) | |
| yield None | |
| else: | |
| # We overwrote the existing handler | |
| if handle_overwrite == 'report_restore': | |
| logging.debug(f"signal wakeup fd already set (%d); overwrote it with %d; will automatically restore it after this context is exited.", prev_fd, fd) | |
| yield prev_fd | |
| elif handle_overwrite == 'report_norestore': | |
| actual_prev_fd, prev_fd = prev_fd, -1 | |
| logging.debug(f"signal wakeup fd already set (%d); overwrote it with %d.", prev_fd, fd) | |
| yield actual_prev_fd | |
| elif handle_overwrite == 'drop': | |
| prev_fd = -1 | |
| logging.debug(f"signal wakeup fd already set (%d); overwrote it with %d.", prev_fd, fd) | |
| yield None | |
| else: | |
| raise RuntimeError(f"signal wakeup fd already set ({prev_fd}); not sure what to do about that. Restoring that wakeup fd and bailing out.") | |
| finally: | |
| if prev_fd is not None: | |
| fd_removed_on_ctx_exit = signal.set_wakeup_fd(prev_fd) | |
| if fd_removed_on_ctx_exit == fd: | |
| if prev_fd != -1: | |
| logging.debug(f"signal wakeup fd %d removed; previous value (%d) restored.", fd, prev_fd) | |
| else: | |
| logging.debug(f"signal wakeup fd %d removed.", fd) | |
| else: | |
| raise RuntimeError(f"entered context by setting signal wakeup fd {fd!r}, but ended up removing signal wakeup fd {fd_removed_on_ctx_exit!r} on context exit") | |
| def _main(): | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('address') | |
| parser.add_argument('interface', nargs='?') | |
| parser.add_argument('--use-async', action=argparse.BooleanOptionalAction, default=True) | |
| params = parser.parse_args() | |
| logging.basicConfig(level=logging.DEBUG) | |
| addr = params.address | |
| k = {} | |
| if params.interface is not None: | |
| try: | |
| # ifindex | |
| mcast_bind_interface = int(params.interface) | |
| except ValueError: | |
| # other interface specifier | |
| mcast_bind_interface = params.interface | |
| k['mcast_bind_interface'] = mcast_bind_interface | |
| if params.use_async: | |
| @(lambda f: asycio.run(f())) | |
| async def main_async(): | |
| if hasattr(sys.stdout, 'buffer'): | |
| async for packet in udp_listen_async(addr, **k): | |
| # regular I/O | |
| sys.stdout.buffer.write(packet) | |
| sys.stdout.buffer.flush() | |
| else: | |
| async for packet in udp_listen_async(addr, **k): | |
| # idlelib console | |
| sys.stdout.write(packet.decode('utf-8', errors='surrogateescape')) | |
| else: | |
| if hasattr(sys.stdout, 'buffer'): | |
| for packet in udp_listen_sync(addr, **k): | |
| # regular I/O | |
| sys.stdout.buffer.write(packet) | |
| sys.stdout.buffer.flush() | |
| else: | |
| for packet in udp_listen_sync(addr, **k): | |
| # idlelib console | |
| sys.stdout.write(packet.decode('utf-8', errors='surrogateescape')) | |
| if __name__ == '__main__': | |
| _main() |
Author
Author
this signal.set_wakeup_fd solution seems like the second-nastiest kludge ever for getting KeyboardInterrupt to play nice with socket.recv; is there a better way??
https://stackoverflow.com/q/78124574/1874170
at least it has the upside of instant response; the "nastiest" kludge would of course be setting a timeout on select.select and just periodically wakeing up, but still... this feels awkward
EDIT: ouch, maybe it doesn't get much better than this...
https://vorpus.org/blog/control-c-handling-in-python-and-trio/
Author
for some reason, Ctrl-C doesn't work in IDLE on PyPy on Windows (testing on Linux TBD), though it still works in CMD on PyPy on Windows...
EDIT: lmao, I'm not alone here
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
recvmsg Windows?
https://learn.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_wsarecvmsg
https://stackoverflow.com/q/37334871/1874170