Created
June 20, 2025 09:51
-
-
Save odysseus0/f9eed309983a691e732ad5a49f2f0273 to your computer and use it in GitHub Desktop.
Compare sync vs async httpx - Phase 1 investigation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import asyncio | |
| import httpx | |
| import threading | |
| import concurrent.futures | |
| def sync_test(): | |
| """Test with synchronous httpx client.""" | |
| print("π Testing synchronous httpx.Client...") | |
| try: | |
| with httpx.Client(timeout=30.0) as client: | |
| response = client.get( | |
| "https://api.tikapi.io/public/video", | |
| params={"id": "7003402629929913605"}, | |
| headers={ | |
| "X-API-KEY": "YOUR_TIKAPI_KEY_HERE", | |
| "Accept": "application/json" | |
| } | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| print(f" β Sync client success! Status: {data.get('status')}") | |
| return True | |
| except Exception as e: | |
| print(f" β Sync client failed: {type(e).__name__}: {e}") | |
| return False | |
| async def async_test(): | |
| """Test with asynchronous httpx client.""" | |
| print("β‘ Testing asynchronous httpx.AsyncClient...") | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get( | |
| "https://api.tikapi.io/public/video", | |
| params={"id": "7003402629929913605"}, | |
| headers={ | |
| "X-API-KEY": "YOUR_TIKAPI_KEY_HERE", | |
| "Accept": "application/json" | |
| } | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| print(f" β Async client success! Status: {data.get('status')}") | |
| return True | |
| except Exception as e: | |
| print(f" β Async client failed: {type(e).__name__}: {e}") | |
| return False | |
| async def async_wrapped_sync(): | |
| """Run sync httpx client in a thread from async context.""" | |
| print("π Testing sync httpx.Client wrapped in asyncio.to_thread...") | |
| try: | |
| result = await asyncio.to_thread(sync_test) | |
| return result | |
| except Exception as e: | |
| print(f" β Wrapped sync failed: {type(e).__name__}: {e}") | |
| return False | |
| async def main(): | |
| """Compare sync vs async httpx behavior.""" | |
| print("π Investigating httpx async vs sync behavior with TikAPI") | |
| print("=" * 60) | |
| # Test 1: Pure sync | |
| print("\n1. Pure synchronous test:") | |
| sync_result = sync_test() | |
| # Test 2: Pure async | |
| print("\n2. Pure asynchronous test:") | |
| async_result = await async_test() | |
| # Test 3: Sync wrapped in async | |
| print("\n3. Sync client in async context:") | |
| wrapped_result = await async_wrapped_sync() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment