Last active
June 20, 2025 00:32
-
-
Save odysseus0/79f2ff3622c39832a51cdab0a16fdda4 to your computer and use it in GitHub Desktop.
Socket Spy - Monitor all socket operations in Python to debug networking issues at the lowest level
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # dependencies = [ | |
| # "httpx", # For the demo mode | |
| # ] | |
| # requires-python = ">=3.7" | |
| # /// | |
| """ | |
| Socket Spy - Monitor all socket operations in Python | |
| https://gist.github.com/odysseus0/79f2ff3622c39832a51cdab0a16fdda4 | |
| This tool revealed that httpx AsyncClient was attempting IPv6 connections | |
| while the sync client used IPv4, leading to the discovery of TikAPI's | |
| broken IPv6 support. | |
| Usage: | |
| uv run socket_spy.py --demo | |
| Or import in your code: | |
| from socket_spy import SpySocket | |
| SpySocket.install() | |
| # Your network code here | |
| SpySocket.uninstall() | |
| SpySocket.print_report() | |
| """ | |
| import socket | |
| import time | |
| import json | |
| import threading | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional | |
| from contextlib import contextmanager | |
| class SocketOperation: | |
| """Represents a single socket operation.""" | |
| def __init__(self, operation: str, socket_id: int, **kwargs): | |
| self.timestamp = datetime.now() | |
| self.time = time.time() | |
| self.operation = operation | |
| self.socket_id = socket_id | |
| self.thread_id = threading.get_ident() | |
| self.details = kwargs | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| 'timestamp': self.timestamp.isoformat(), | |
| 'time': self.time, | |
| 'operation': self.operation, | |
| 'socket_id': self.socket_id, | |
| 'thread_id': self.thread_id, | |
| **self.details | |
| } | |
| class SpySocket(socket.socket): | |
| """Socket wrapper that monitors all operations.""" | |
| operations: List[SocketOperation] = [] | |
| _original_socket = socket.socket | |
| _lock = threading.Lock() | |
| _installed = False | |
| def __init__(self, family=-1, type=-1, proto=-1, fileno=None): | |
| super().__init__(family, type, proto, fileno) | |
| self._spy_id = id(self) | |
| with self._lock: | |
| self.operations.append(SocketOperation( | |
| 'create', | |
| self._spy_id, | |
| family=self._get_family_name(family), | |
| type=self._get_type_name(type), | |
| proto=proto | |
| )) | |
| def connect(self, address): | |
| """Monitor connection attempts.""" | |
| family_name = self._get_family_name(self.family) | |
| # Log the attempt | |
| with self._lock: | |
| self.operations.append(SocketOperation( | |
| 'connect_attempt', | |
| self._spy_id, | |
| address=address, | |
| family=family_name, | |
| formatted_address=self._format_address(address) | |
| )) | |
| # Print real-time for debugging | |
| print(f"π [{datetime.now().strftime('%H:%M:%S')}] Connecting to: {address}") | |
| print(f" Family: {family_name}") | |
| try: | |
| result = super().connect(address) | |
| with self._lock: | |
| self.operations.append(SocketOperation( | |
| 'connect_success', | |
| self._spy_id, | |
| address=address | |
| )) | |
| print(f" β Connected!") | |
| return result | |
| except Exception as e: | |
| with self._lock: | |
| self.operations.append(SocketOperation( | |
| 'connect_failed', | |
| self._spy_id, | |
| address=address, | |
| error=str(e), | |
| error_type=type(e).__name__ | |
| )) | |
| print(f" β Failed: {e}") | |
| raise | |
| def send(self, data, flags=0): | |
| """Monitor data sending.""" | |
| with self._lock: | |
| self.operations.append(SocketOperation( | |
| 'send', | |
| self._spy_id, | |
| size=len(data), | |
| flags=flags, | |
| preview=data[:50].hex() if data else '' | |
| )) | |
| return super().send(data, flags) | |
| def recv(self, bufsize, flags=0): | |
| """Monitor data receiving.""" | |
| data = super().recv(bufsize, flags) | |
| with self._lock: | |
| self.operations.append(SocketOperation( | |
| 'recv', | |
| self._spy_id, | |
| requested=bufsize, | |
| received=len(data), | |
| flags=flags | |
| )) | |
| return data | |
| def close(self): | |
| """Monitor socket closing.""" | |
| with self._lock: | |
| self.operations.append(SocketOperation( | |
| 'close', | |
| self._spy_id | |
| )) | |
| return super().close() | |
| @staticmethod | |
| def _get_family_name(family: int) -> str: | |
| """Convert socket family to readable name.""" | |
| families = { | |
| socket.AF_INET: 'AF_INET (IPv4)', | |
| socket.AF_INET6: 'AF_INET6 (IPv6)', | |
| } | |
| # Handle different platforms | |
| if hasattr(socket, 'AF_UNIX'): | |
| families[socket.AF_UNIX] = 'AF_UNIX' | |
| return families.get(family, f'Unknown ({family})') | |
| @staticmethod | |
| def _get_type_name(sock_type: int) -> str: | |
| """Convert socket type to readable name.""" | |
| types = { | |
| socket.SOCK_STREAM: 'SOCK_STREAM (TCP)', | |
| socket.SOCK_DGRAM: 'SOCK_DGRAM (UDP)', | |
| } | |
| return types.get(sock_type, f'Unknown ({sock_type})') | |
| @staticmethod | |
| def _format_address(address) -> str: | |
| """Format address for readable output.""" | |
| if isinstance(address, tuple): | |
| if len(address) == 2: # IPv4 | |
| return f"{address[0]}:{address[1]}" | |
| elif len(address) == 4: # IPv6 | |
| return f"[{address[0]}]:{address[1]} (flow={address[2]}, scope={address[3]})" | |
| return str(address) | |
| @classmethod | |
| def install(cls): | |
| """Install the socket spy.""" | |
| if not cls._installed: | |
| socket.socket = cls | |
| cls._installed = True | |
| cls.operations.clear() | |
| print("π΅οΈ Socket Spy installed!") | |
| @classmethod | |
| def uninstall(cls): | |
| """Restore original socket.""" | |
| if cls._installed: | |
| socket.socket = cls._original_socket | |
| cls._installed = False | |
| print("π΅οΈ Socket Spy uninstalled!") | |
| @classmethod | |
| def get_operations(cls) -> List[Dict[str, Any]]: | |
| """Get all recorded operations.""" | |
| with cls._lock: | |
| return [op.to_dict() for op in cls.operations] | |
| @classmethod | |
| def print_report(cls): | |
| """Print a summary report of socket operations.""" | |
| operations = cls.get_operations() | |
| print("\n" + "=" * 60) | |
| print("π SOCKET OPERATIONS REPORT") | |
| print("=" * 60) | |
| # Group by socket ID | |
| by_socket = {} | |
| for op in operations: | |
| socket_id = op['socket_id'] | |
| if socket_id not in by_socket: | |
| by_socket[socket_id] = [] | |
| by_socket[socket_id].append(op) | |
| # Analyze each socket | |
| for socket_id, ops in by_socket.items(): | |
| print(f"\nπ Socket {socket_id}:") | |
| # Find key operations | |
| create_op = next((op for op in ops if op['operation'] == 'create'), None) | |
| connect_op = next((op for op in ops if op['operation'] == 'connect_attempt'), None) | |
| success_op = next((op for op in ops if op['operation'] == 'connect_success'), None) | |
| failed_op = next((op for op in ops if op['operation'] == 'connect_failed'), None) | |
| if create_op: | |
| print(f" Created: {create_op['family']} / {create_op['type']}") | |
| if connect_op: | |
| print(f" Target: {connect_op['formatted_address']}") | |
| print(f" Family: {connect_op['family']}") | |
| if success_op: | |
| print(f" Result: β Connected") | |
| elif failed_op: | |
| print(f" Result: β Failed - {failed_op['error']}") | |
| # Data transfer stats | |
| sends = [op for op in ops if op['operation'] == 'send'] | |
| recvs = [op for op in ops if op['operation'] == 'recv'] | |
| if sends or recvs: | |
| total_sent = sum(op['size'] for op in sends) | |
| total_recv = sum(op['received'] for op in recvs) | |
| print(f" Data: Sent {total_sent} bytes, Received {total_recv} bytes") | |
| # Summary | |
| print("\nπ SUMMARY:") | |
| print(f" Total sockets: {len(by_socket)}") | |
| # IPv4 vs IPv6 | |
| ipv4_sockets = sum(1 for ops in by_socket.values() | |
| if any('IPv4' in op.get('family', '') for op in ops)) | |
| ipv6_sockets = sum(1 for ops in by_socket.values() | |
| if any('IPv6' in op.get('family', '') for op in ops)) | |
| print(f" IPv4 sockets: {ipv4_sockets}") | |
| print(f" IPv6 sockets: {ipv6_sockets}") | |
| # Success rate | |
| total_connects = sum(1 for ops in by_socket.values() | |
| if any(op['operation'] == 'connect_attempt' for op in ops)) | |
| successful = sum(1 for ops in by_socket.values() | |
| if any(op['operation'] == 'connect_success' for op in ops)) | |
| if total_connects > 0: | |
| success_rate = (successful / total_connects) * 100 | |
| print(f" Connection success rate: {success_rate:.1f}%") | |
| @classmethod | |
| def save_to_file(cls, filename: str = "socket_operations.json"): | |
| """Save operations to JSON file.""" | |
| with open(filename, 'w') as f: | |
| json.dump(cls.get_operations(), f, indent=2) | |
| print(f"π Saved {len(cls.operations)} operations to {filename}") | |
| @contextmanager | |
| def spy_sockets(): | |
| """Context manager for socket spying.""" | |
| SpySocket.install() | |
| try: | |
| yield SpySocket | |
| finally: | |
| SpySocket.uninstall() | |
| # Example usage | |
| async def demo(): | |
| """Demonstrate socket spying with httpx.""" | |
| import httpx | |
| print("π― Testing httpx with Socket Spy...\n") | |
| with spy_sockets() as spy: | |
| # Test sync client | |
| print("\n=== Testing httpx.Client (sync) ===") | |
| try: | |
| with httpx.Client() as client: | |
| response = client.get("https://api.tikapi.io/public/check") | |
| print(f"Response: {response.status_code}\n") | |
| except Exception as e: | |
| print(f"Failed: {e}\n") | |
| # Test async client | |
| print("\n=== Testing httpx.AsyncClient ===") | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get("https://api.tikapi.io/public/check") | |
| print(f"Response: {response.status_code}\n") | |
| except Exception as e: | |
| print(f"Failed: {e}\n") | |
| # Print report | |
| SpySocket.print_report() | |
| if __name__ == "__main__": | |
| import sys | |
| # Check if running demo | |
| if len(sys.argv) > 1 and sys.argv[1] == "--demo": | |
| import asyncio | |
| asyncio.run(demo()) | |
| else: | |
| print("π΅οΈ Socket Spy - Monitor all socket operations") | |
| print("\nUsage:") | |
| print(" python socket_spy.py --demo # Run demo with httpx") | |
| print("\nOr import in your code:") | |
| print(" from socket_spy import spy_sockets") | |
| print(" with spy_sockets():") | |
| print(" # Your network code here") | |
| print("\nThe spy will reveal:") | |
| print(" - IPv4 vs IPv6 connection attempts") | |
| print(" - Connection successes and failures") | |
| print(" - Data transfer statistics") | |
| print(" - Which sockets are created by which operations") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment