Created
June 20, 2025 09:51
-
-
Save odysseus0/77baef9986feaebe34580c883a1dc0fd to your computer and use it in GitHub Desktop.
Network capture analysis - Deep dive into socket operations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Phase 1: Network-Level Deep Dive - Packet Capture Analysis | |
| Captures and compares network behavior between sync and async httpx clients | |
| """ | |
| import asyncio | |
| import httpx | |
| import socket | |
| import ssl | |
| import time | |
| import logging | |
| from typing import Dict, Any, List | |
| import json | |
| import sys | |
| # Configure detailed logging | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| # Monkey-patch socket to capture socket operations | |
| original_socket = socket.socket | |
| socket_operations: List[Dict[str, Any]] = [] | |
| class MonitoredSocket(socket.socket): | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self._operation_log = [] | |
| socket_operations.append({ | |
| 'time': time.time(), | |
| 'operation': 'socket_create', | |
| 'args': args, | |
| 'kwargs': kwargs | |
| }) | |
| def connect(self, address): | |
| socket_operations.append({ | |
| 'time': time.time(), | |
| 'operation': 'connect', | |
| 'address': address, | |
| 'socket_info': { | |
| 'family': self.family.name, | |
| 'type': self.type.name, | |
| 'proto': self.proto | |
| } | |
| }) | |
| return super().connect(address) | |
| def setsockopt(self, level, optname, value): | |
| socket_operations.append({ | |
| 'time': time.time(), | |
| 'operation': 'setsockopt', | |
| 'level': level, | |
| 'optname': optname, | |
| 'value': str(value) | |
| }) | |
| return super().setsockopt(level, optname, value) | |
| def send(self, data, flags=0): | |
| socket_operations.append({ | |
| 'time': time.time(), | |
| 'operation': 'send', | |
| 'data_len': len(data), | |
| 'flags': flags, | |
| 'preview': data[:50].hex() if len(data) > 0 else '' | |
| }) | |
| return super().send(data, flags) | |
| def recv(self, bufsize, flags=0): | |
| data = super().recv(bufsize, flags) | |
| socket_operations.append({ | |
| 'time': time.time(), | |
| 'operation': 'recv', | |
| 'requested': bufsize, | |
| 'received': len(data), | |
| 'flags': flags, | |
| 'preview': data[:50].hex() if len(data) > 0 else '' | |
| }) | |
| return data | |
| def capture_ssl_info(ssl_context): | |
| """Capture SSL context information""" | |
| return { | |
| 'check_hostname': ssl_context.check_hostname, | |
| 'protocol': ssl_context.protocol.name if hasattr(ssl_context.protocol, 'name') else str(ssl_context.protocol), | |
| 'verify_mode': ssl_context.verify_mode.name if hasattr(ssl_context.verify_mode, 'name') else str(ssl_context.verify_mode), | |
| 'options': bin(ssl_context.options), | |
| 'minimum_version': getattr(ssl_context, 'minimum_version', 'N/A'), | |
| 'maximum_version': getattr(ssl_context, 'maximum_version', 'N/A'), | |
| } | |
| def test_sync_client(): | |
| """Test synchronous httpx client with monitoring""" | |
| global socket_operations | |
| socket_operations = [] | |
| print("\n🔄 Testing SYNC httpx.Client with network monitoring...") | |
| # Temporarily replace socket.socket | |
| socket.socket = MonitoredSocket | |
| start_time = time.time() | |
| try: | |
| # Create custom SSL context to monitor | |
| ssl_context = ssl.create_default_context() | |
| with httpx.Client( | |
| timeout=30.0, | |
| verify=ssl_context, | |
| http2=False, # Disable HTTP/2 to simplify analysis | |
| ) as client: | |
| response = client.get( | |
| "https://api.tikapi.io/public/video", | |
| params={"id": "7003402629929913605"}, | |
| headers={ | |
| "X-API-KEY": "YOUR_TIKAPI_KEY_HERE", | |
| "Accept": "application/json", | |
| "User-Agent": "httpx-sync-test/1.0" | |
| } | |
| ) | |
| response.raise_for_status() | |
| end_time = time.time() | |
| result = { | |
| 'success': True, | |
| 'duration': end_time - start_time, | |
| 'status_code': response.status_code, | |
| 'ssl_info': capture_ssl_info(ssl_context), | |
| 'socket_operations': socket_operations, | |
| 'operation_count': len(socket_operations), | |
| 'http_version': response.http_version | |
| } | |
| print(f"✅ Sync client succeeded in {result['duration']:.3f}s") | |
| return result | |
| except Exception as e: | |
| end_time = time.time() | |
| result = { | |
| 'success': False, | |
| 'duration': end_time - start_time, | |
| 'error': f"{type(e).__name__}: {str(e)}", | |
| 'socket_operations': socket_operations, | |
| 'operation_count': len(socket_operations) | |
| } | |
| print(f"❌ Sync client failed: {result['error']}") | |
| return result | |
| finally: | |
| # Restore original socket | |
| socket.socket = original_socket | |
| async def test_async_client(): | |
| """Test asynchronous httpx client with monitoring""" | |
| global socket_operations | |
| socket_operations = [] | |
| print("\n⚡ Testing ASYNC httpx.AsyncClient with network monitoring...") | |
| # Temporarily replace socket.socket | |
| socket.socket = MonitoredSocket | |
| start_time = time.time() | |
| try: | |
| # Create custom SSL context to monitor | |
| ssl_context = ssl.create_default_context() | |
| async with httpx.AsyncClient( | |
| timeout=30.0, | |
| verify=ssl_context, | |
| http2=False, # Disable HTTP/2 to simplify analysis | |
| ) as client: | |
| response = await client.get( | |
| "https://api.tikapi.io/public/video", | |
| params={"id": "7003402629929913605"}, | |
| headers={ | |
| "X-API-KEY": "YOUR_TIKAPI_KEY_HERE", | |
| "Accept": "application/json", | |
| "User-Agent": "httpx-async-test/1.0" | |
| } | |
| ) | |
| response.raise_for_status() | |
| end_time = time.time() | |
| result = { | |
| 'success': True, | |
| 'duration': end_time - start_time, | |
| 'status_code': response.status_code, | |
| 'ssl_info': capture_ssl_info(ssl_context), | |
| 'socket_operations': socket_operations, | |
| 'operation_count': len(socket_operations), | |
| 'http_version': response.http_version | |
| } | |
| print(f"✅ Async client succeeded in {result['duration']:.3f}s") | |
| return result | |
| except Exception as e: | |
| end_time = time.time() | |
| result = { | |
| 'success': False, | |
| 'duration': end_time - start_time, | |
| 'error': f"{type(e).__name__}: {str(e)}", | |
| 'socket_operations': socket_operations, | |
| 'operation_count': len(socket_operations) | |
| } | |
| print(f"❌ Async client failed: {result['error']}") | |
| return result | |
| finally: | |
| # Restore original socket | |
| socket.socket = original_socket | |
| def analyze_differences(sync_result: Dict, async_result: Dict): | |
| """Analyze differences between sync and async results""" | |
| print("\n📊 Analysis Results:") | |
| print("=" * 60) | |
| # Basic comparison | |
| print(f"\n🎯 Success Status:") | |
| print(f" Sync: {'✅ Success' if sync_result['success'] else '❌ Failed'}") | |
| print(f" Async: {'✅ Success' if async_result['success'] else '❌ Failed'}") | |
| print(f"\n⏱️ Duration:") | |
| print(f" Sync: {sync_result['duration']:.3f}s") | |
| print(f" Async: {async_result['duration']:.3f}s") | |
| print(f"\n🔌 Socket Operations Count:") | |
| print(f" Sync: {sync_result['operation_count']} operations") | |
| print(f" Async: {async_result['operation_count']} operations") | |
| # Compare socket operations | |
| if sync_result['success'] and not async_result['success']: | |
| print("\n🔍 Last Socket Operations Before Failure:") | |
| if async_result['socket_operations']: | |
| print("\n Async (last 5 operations):") | |
| for op in async_result['socket_operations'][-5:]: | |
| print(f" - {op['operation']}: {op}") | |
| print("\n Sync (corresponding operations):") | |
| sync_ops = sync_result['socket_operations'] | |
| if len(sync_ops) >= len(async_result['socket_operations']): | |
| start_idx = len(async_result['socket_operations']) - 5 | |
| if start_idx >= 0: | |
| for op in sync_ops[start_idx:start_idx+5]: | |
| print(f" - {op['operation']}: {op}") | |
| # Socket option differences | |
| print("\n⚙️ Socket Options Used:") | |
| sync_sockopts = [op for op in sync_result['socket_operations'] if op['operation'] == 'setsockopt'] | |
| async_sockopts = [op for op in async_result['socket_operations'] if op['operation'] == 'setsockopt'] | |
| print(f" Sync socket options ({len(sync_sockopts)}):") | |
| for opt in sync_sockopts: | |
| print(f" - Level {opt['level']}, Option {opt['optname']}, Value {opt['value']}") | |
| print(f" Async socket options ({len(async_sockopts)}):") | |
| for opt in async_sockopts: | |
| print(f" - Level {opt['level']}, Option {opt['optname']}, Value {opt['value']}") | |
| # Save detailed results | |
| with open('network_capture_results.json', 'w') as f: | |
| json.dump({ | |
| 'sync_result': sync_result, | |
| 'async_result': async_result, | |
| 'analysis': { | |
| 'sync_success': sync_result['success'], | |
| 'async_success': async_result['success'], | |
| 'operation_count_diff': sync_result['operation_count'] - async_result['operation_count'] | |
| } | |
| }, f, indent=2, default=str) | |
| print("\n💾 Detailed results saved to network_capture_results.json") | |
| async def main(): | |
| """Run network capture analysis""" | |
| print("🔍 Phase 1: Network-Level Deep Dive - Packet Capture Analysis") | |
| print("=" * 60) | |
| # Test sync client | |
| sync_result = test_sync_client() | |
| # Test async client | |
| async_result = await test_async_client() | |
| # Analyze differences | |
| analyze_differences(sync_result, async_result) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment