Skip to content

Instantly share code, notes, and snippets.

@odysseus0
Created June 20, 2025 09:51
Show Gist options
  • Save odysseus0/77baef9986feaebe34580c883a1dc0fd to your computer and use it in GitHub Desktop.
Save odysseus0/77baef9986feaebe34580c883a1dc0fd to your computer and use it in GitHub Desktop.
Network capture analysis - Deep dive into socket operations
#!/usr/bin/env python3
"""
Phase 1: Network-Level Deep Dive - Packet Capture Analysis
Captures and compares network behavior between sync and async httpx clients
"""
import asyncio
import httpx
import socket
import ssl
import time
import logging
from typing import Dict, Any, List
import json
import sys
# Configure detailed logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# Monkey-patch socket to capture socket operations
original_socket = socket.socket
socket_operations: List[Dict[str, Any]] = []
class MonitoredSocket(socket.socket):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._operation_log = []
socket_operations.append({
'time': time.time(),
'operation': 'socket_create',
'args': args,
'kwargs': kwargs
})
def connect(self, address):
socket_operations.append({
'time': time.time(),
'operation': 'connect',
'address': address,
'socket_info': {
'family': self.family.name,
'type': self.type.name,
'proto': self.proto
}
})
return super().connect(address)
def setsockopt(self, level, optname, value):
socket_operations.append({
'time': time.time(),
'operation': 'setsockopt',
'level': level,
'optname': optname,
'value': str(value)
})
return super().setsockopt(level, optname, value)
def send(self, data, flags=0):
socket_operations.append({
'time': time.time(),
'operation': 'send',
'data_len': len(data),
'flags': flags,
'preview': data[:50].hex() if len(data) > 0 else ''
})
return super().send(data, flags)
def recv(self, bufsize, flags=0):
data = super().recv(bufsize, flags)
socket_operations.append({
'time': time.time(),
'operation': 'recv',
'requested': bufsize,
'received': len(data),
'flags': flags,
'preview': data[:50].hex() if len(data) > 0 else ''
})
return data
def capture_ssl_info(ssl_context):
"""Capture SSL context information"""
return {
'check_hostname': ssl_context.check_hostname,
'protocol': ssl_context.protocol.name if hasattr(ssl_context.protocol, 'name') else str(ssl_context.protocol),
'verify_mode': ssl_context.verify_mode.name if hasattr(ssl_context.verify_mode, 'name') else str(ssl_context.verify_mode),
'options': bin(ssl_context.options),
'minimum_version': getattr(ssl_context, 'minimum_version', 'N/A'),
'maximum_version': getattr(ssl_context, 'maximum_version', 'N/A'),
}
def test_sync_client():
"""Test synchronous httpx client with monitoring"""
global socket_operations
socket_operations = []
print("\n🔄 Testing SYNC httpx.Client with network monitoring...")
# Temporarily replace socket.socket
socket.socket = MonitoredSocket
start_time = time.time()
try:
# Create custom SSL context to monitor
ssl_context = ssl.create_default_context()
with httpx.Client(
timeout=30.0,
verify=ssl_context,
http2=False, # Disable HTTP/2 to simplify analysis
) as client:
response = client.get(
"https://api.tikapi.io/public/video",
params={"id": "7003402629929913605"},
headers={
"X-API-KEY": "YOUR_TIKAPI_KEY_HERE",
"Accept": "application/json",
"User-Agent": "httpx-sync-test/1.0"
}
)
response.raise_for_status()
end_time = time.time()
result = {
'success': True,
'duration': end_time - start_time,
'status_code': response.status_code,
'ssl_info': capture_ssl_info(ssl_context),
'socket_operations': socket_operations,
'operation_count': len(socket_operations),
'http_version': response.http_version
}
print(f"✅ Sync client succeeded in {result['duration']:.3f}s")
return result
except Exception as e:
end_time = time.time()
result = {
'success': False,
'duration': end_time - start_time,
'error': f"{type(e).__name__}: {str(e)}",
'socket_operations': socket_operations,
'operation_count': len(socket_operations)
}
print(f"❌ Sync client failed: {result['error']}")
return result
finally:
# Restore original socket
socket.socket = original_socket
async def test_async_client():
"""Test asynchronous httpx client with monitoring"""
global socket_operations
socket_operations = []
print("\n⚡ Testing ASYNC httpx.AsyncClient with network monitoring...")
# Temporarily replace socket.socket
socket.socket = MonitoredSocket
start_time = time.time()
try:
# Create custom SSL context to monitor
ssl_context = ssl.create_default_context()
async with httpx.AsyncClient(
timeout=30.0,
verify=ssl_context,
http2=False, # Disable HTTP/2 to simplify analysis
) as client:
response = await client.get(
"https://api.tikapi.io/public/video",
params={"id": "7003402629929913605"},
headers={
"X-API-KEY": "YOUR_TIKAPI_KEY_HERE",
"Accept": "application/json",
"User-Agent": "httpx-async-test/1.0"
}
)
response.raise_for_status()
end_time = time.time()
result = {
'success': True,
'duration': end_time - start_time,
'status_code': response.status_code,
'ssl_info': capture_ssl_info(ssl_context),
'socket_operations': socket_operations,
'operation_count': len(socket_operations),
'http_version': response.http_version
}
print(f"✅ Async client succeeded in {result['duration']:.3f}s")
return result
except Exception as e:
end_time = time.time()
result = {
'success': False,
'duration': end_time - start_time,
'error': f"{type(e).__name__}: {str(e)}",
'socket_operations': socket_operations,
'operation_count': len(socket_operations)
}
print(f"❌ Async client failed: {result['error']}")
return result
finally:
# Restore original socket
socket.socket = original_socket
def analyze_differences(sync_result: Dict, async_result: Dict):
"""Analyze differences between sync and async results"""
print("\n📊 Analysis Results:")
print("=" * 60)
# Basic comparison
print(f"\n🎯 Success Status:")
print(f" Sync: {'✅ Success' if sync_result['success'] else '❌ Failed'}")
print(f" Async: {'✅ Success' if async_result['success'] else '❌ Failed'}")
print(f"\n⏱️ Duration:")
print(f" Sync: {sync_result['duration']:.3f}s")
print(f" Async: {async_result['duration']:.3f}s")
print(f"\n🔌 Socket Operations Count:")
print(f" Sync: {sync_result['operation_count']} operations")
print(f" Async: {async_result['operation_count']} operations")
# Compare socket operations
if sync_result['success'] and not async_result['success']:
print("\n🔍 Last Socket Operations Before Failure:")
if async_result['socket_operations']:
print("\n Async (last 5 operations):")
for op in async_result['socket_operations'][-5:]:
print(f" - {op['operation']}: {op}")
print("\n Sync (corresponding operations):")
sync_ops = sync_result['socket_operations']
if len(sync_ops) >= len(async_result['socket_operations']):
start_idx = len(async_result['socket_operations']) - 5
if start_idx >= 0:
for op in sync_ops[start_idx:start_idx+5]:
print(f" - {op['operation']}: {op}")
# Socket option differences
print("\n⚙️ Socket Options Used:")
sync_sockopts = [op for op in sync_result['socket_operations'] if op['operation'] == 'setsockopt']
async_sockopts = [op for op in async_result['socket_operations'] if op['operation'] == 'setsockopt']
print(f" Sync socket options ({len(sync_sockopts)}):")
for opt in sync_sockopts:
print(f" - Level {opt['level']}, Option {opt['optname']}, Value {opt['value']}")
print(f" Async socket options ({len(async_sockopts)}):")
for opt in async_sockopts:
print(f" - Level {opt['level']}, Option {opt['optname']}, Value {opt['value']}")
# Save detailed results
with open('network_capture_results.json', 'w') as f:
json.dump({
'sync_result': sync_result,
'async_result': async_result,
'analysis': {
'sync_success': sync_result['success'],
'async_success': async_result['success'],
'operation_count_diff': sync_result['operation_count'] - async_result['operation_count']
}
}, f, indent=2, default=str)
print("\n💾 Detailed results saved to network_capture_results.json")
async def main():
"""Run network capture analysis"""
print("🔍 Phase 1: Network-Level Deep Dive - Packet Capture Analysis")
print("=" * 60)
# Test sync client
sync_result = test_sync_client()
# Test async client
async_result = await test_async_client()
# Analyze differences
analyze_differences(sync_result, async_result)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment