Skip to content

Instantly share code, notes, and snippets.

@odysseus0
Last active June 20, 2025 00:32
Show Gist options
  • Save odysseus0/79f2ff3622c39832a51cdab0a16fdda4 to your computer and use it in GitHub Desktop.
Save odysseus0/79f2ff3622c39832a51cdab0a16fdda4 to your computer and use it in GitHub Desktop.
Socket Spy - Monitor all socket operations in Python to debug networking issues at the lowest level
#!/usr/bin/env python3
# /// script
# dependencies = [
# "httpx", # For the demo mode
# ]
# requires-python = ">=3.7"
# ///
"""
Socket Spy - Monitor all socket operations in Python
https://gist.github.com/odysseus0/79f2ff3622c39832a51cdab0a16fdda4
This tool revealed that httpx AsyncClient was attempting IPv6 connections
while the sync client used IPv4, leading to the discovery of TikAPI's
broken IPv6 support.
Usage:
uv run socket_spy.py --demo
Or import in your code:
from socket_spy import SpySocket
SpySocket.install()
# Your network code here
SpySocket.uninstall()
SpySocket.print_report()
"""
import socket
import time
import json
import threading
from datetime import datetime
from typing import List, Dict, Any, Optional
from contextlib import contextmanager
class SocketOperation:
"""Represents a single socket operation."""
def __init__(self, operation: str, socket_id: int, **kwargs):
self.timestamp = datetime.now()
self.time = time.time()
self.operation = operation
self.socket_id = socket_id
self.thread_id = threading.get_ident()
self.details = kwargs
def to_dict(self) -> Dict[str, Any]:
return {
'timestamp': self.timestamp.isoformat(),
'time': self.time,
'operation': self.operation,
'socket_id': self.socket_id,
'thread_id': self.thread_id,
**self.details
}
class SpySocket(socket.socket):
"""Socket wrapper that monitors all operations."""
operations: List[SocketOperation] = []
_original_socket = socket.socket
_lock = threading.Lock()
_installed = False
def __init__(self, family=-1, type=-1, proto=-1, fileno=None):
super().__init__(family, type, proto, fileno)
self._spy_id = id(self)
with self._lock:
self.operations.append(SocketOperation(
'create',
self._spy_id,
family=self._get_family_name(family),
type=self._get_type_name(type),
proto=proto
))
def connect(self, address):
"""Monitor connection attempts."""
family_name = self._get_family_name(self.family)
# Log the attempt
with self._lock:
self.operations.append(SocketOperation(
'connect_attempt',
self._spy_id,
address=address,
family=family_name,
formatted_address=self._format_address(address)
))
# Print real-time for debugging
print(f"πŸ”Œ [{datetime.now().strftime('%H:%M:%S')}] Connecting to: {address}")
print(f" Family: {family_name}")
try:
result = super().connect(address)
with self._lock:
self.operations.append(SocketOperation(
'connect_success',
self._spy_id,
address=address
))
print(f" βœ… Connected!")
return result
except Exception as e:
with self._lock:
self.operations.append(SocketOperation(
'connect_failed',
self._spy_id,
address=address,
error=str(e),
error_type=type(e).__name__
))
print(f" ❌ Failed: {e}")
raise
def send(self, data, flags=0):
"""Monitor data sending."""
with self._lock:
self.operations.append(SocketOperation(
'send',
self._spy_id,
size=len(data),
flags=flags,
preview=data[:50].hex() if data else ''
))
return super().send(data, flags)
def recv(self, bufsize, flags=0):
"""Monitor data receiving."""
data = super().recv(bufsize, flags)
with self._lock:
self.operations.append(SocketOperation(
'recv',
self._spy_id,
requested=bufsize,
received=len(data),
flags=flags
))
return data
def close(self):
"""Monitor socket closing."""
with self._lock:
self.operations.append(SocketOperation(
'close',
self._spy_id
))
return super().close()
@staticmethod
def _get_family_name(family: int) -> str:
"""Convert socket family to readable name."""
families = {
socket.AF_INET: 'AF_INET (IPv4)',
socket.AF_INET6: 'AF_INET6 (IPv6)',
}
# Handle different platforms
if hasattr(socket, 'AF_UNIX'):
families[socket.AF_UNIX] = 'AF_UNIX'
return families.get(family, f'Unknown ({family})')
@staticmethod
def _get_type_name(sock_type: int) -> str:
"""Convert socket type to readable name."""
types = {
socket.SOCK_STREAM: 'SOCK_STREAM (TCP)',
socket.SOCK_DGRAM: 'SOCK_DGRAM (UDP)',
}
return types.get(sock_type, f'Unknown ({sock_type})')
@staticmethod
def _format_address(address) -> str:
"""Format address for readable output."""
if isinstance(address, tuple):
if len(address) == 2: # IPv4
return f"{address[0]}:{address[1]}"
elif len(address) == 4: # IPv6
return f"[{address[0]}]:{address[1]} (flow={address[2]}, scope={address[3]})"
return str(address)
@classmethod
def install(cls):
"""Install the socket spy."""
if not cls._installed:
socket.socket = cls
cls._installed = True
cls.operations.clear()
print("πŸ•΅οΈ Socket Spy installed!")
@classmethod
def uninstall(cls):
"""Restore original socket."""
if cls._installed:
socket.socket = cls._original_socket
cls._installed = False
print("πŸ•΅οΈ Socket Spy uninstalled!")
@classmethod
def get_operations(cls) -> List[Dict[str, Any]]:
"""Get all recorded operations."""
with cls._lock:
return [op.to_dict() for op in cls.operations]
@classmethod
def print_report(cls):
"""Print a summary report of socket operations."""
operations = cls.get_operations()
print("\n" + "=" * 60)
print("πŸ“Š SOCKET OPERATIONS REPORT")
print("=" * 60)
# Group by socket ID
by_socket = {}
for op in operations:
socket_id = op['socket_id']
if socket_id not in by_socket:
by_socket[socket_id] = []
by_socket[socket_id].append(op)
# Analyze each socket
for socket_id, ops in by_socket.items():
print(f"\nπŸ”Œ Socket {socket_id}:")
# Find key operations
create_op = next((op for op in ops if op['operation'] == 'create'), None)
connect_op = next((op for op in ops if op['operation'] == 'connect_attempt'), None)
success_op = next((op for op in ops if op['operation'] == 'connect_success'), None)
failed_op = next((op for op in ops if op['operation'] == 'connect_failed'), None)
if create_op:
print(f" Created: {create_op['family']} / {create_op['type']}")
if connect_op:
print(f" Target: {connect_op['formatted_address']}")
print(f" Family: {connect_op['family']}")
if success_op:
print(f" Result: βœ… Connected")
elif failed_op:
print(f" Result: ❌ Failed - {failed_op['error']}")
# Data transfer stats
sends = [op for op in ops if op['operation'] == 'send']
recvs = [op for op in ops if op['operation'] == 'recv']
if sends or recvs:
total_sent = sum(op['size'] for op in sends)
total_recv = sum(op['received'] for op in recvs)
print(f" Data: Sent {total_sent} bytes, Received {total_recv} bytes")
# Summary
print("\nπŸ“ˆ SUMMARY:")
print(f" Total sockets: {len(by_socket)}")
# IPv4 vs IPv6
ipv4_sockets = sum(1 for ops in by_socket.values()
if any('IPv4' in op.get('family', '') for op in ops))
ipv6_sockets = sum(1 for ops in by_socket.values()
if any('IPv6' in op.get('family', '') for op in ops))
print(f" IPv4 sockets: {ipv4_sockets}")
print(f" IPv6 sockets: {ipv6_sockets}")
# Success rate
total_connects = sum(1 for ops in by_socket.values()
if any(op['operation'] == 'connect_attempt' for op in ops))
successful = sum(1 for ops in by_socket.values()
if any(op['operation'] == 'connect_success' for op in ops))
if total_connects > 0:
success_rate = (successful / total_connects) * 100
print(f" Connection success rate: {success_rate:.1f}%")
@classmethod
def save_to_file(cls, filename: str = "socket_operations.json"):
"""Save operations to JSON file."""
with open(filename, 'w') as f:
json.dump(cls.get_operations(), f, indent=2)
print(f"πŸ“ Saved {len(cls.operations)} operations to {filename}")
@contextmanager
def spy_sockets():
"""Context manager for socket spying."""
SpySocket.install()
try:
yield SpySocket
finally:
SpySocket.uninstall()
# Example usage
async def demo():
"""Demonstrate socket spying with httpx."""
import httpx
print("🎯 Testing httpx with Socket Spy...\n")
with spy_sockets() as spy:
# Test sync client
print("\n=== Testing httpx.Client (sync) ===")
try:
with httpx.Client() as client:
response = client.get("https://api.tikapi.io/public/check")
print(f"Response: {response.status_code}\n")
except Exception as e:
print(f"Failed: {e}\n")
# Test async client
print("\n=== Testing httpx.AsyncClient ===")
try:
async with httpx.AsyncClient() as client:
response = await client.get("https://api.tikapi.io/public/check")
print(f"Response: {response.status_code}\n")
except Exception as e:
print(f"Failed: {e}\n")
# Print report
SpySocket.print_report()
if __name__ == "__main__":
import sys
# Check if running demo
if len(sys.argv) > 1 and sys.argv[1] == "--demo":
import asyncio
asyncio.run(demo())
else:
print("πŸ•΅οΈ Socket Spy - Monitor all socket operations")
print("\nUsage:")
print(" python socket_spy.py --demo # Run demo with httpx")
print("\nOr import in your code:")
print(" from socket_spy import spy_sockets")
print(" with spy_sockets():")
print(" # Your network code here")
print("\nThe spy will reveal:")
print(" - IPv4 vs IPv6 connection attempts")
print(" - Connection successes and failures")
print(" - Data transfer statistics")
print(" - Which sockets are created by which operations")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment