Skip to content

Instantly share code, notes, and snippets.

@cbonesana
Created September 9, 2025 14:37
Show Gist options
  • Save cbonesana/49e92a2722f33732a528f0c61772401d to your computer and use it in GitHub Desktop.
Save cbonesana/49e92a2722f33732a528f0c61772401d to your computer and use it in GitHub Desktop.
FastAPI streaming proxy example with support for streaming responses.
"""Execute as a normal script, or subdivided using uvicorn:
Together:
python streams.py
GENERATOR:
uvicorn idk:app_gen --host localhost --port 8000
PROXY:
uvicorn idk:app_proxy --host localhost --port 7000
CLIENT:
CLIENT_ONLY=1 python streams.py
"""
from typing import AsyncGenerator, Any
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.requests import Request
from httpx import AsyncClient
from pydantic import BaseModel
import asyncio
import logging
import os
import uvicorn
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)7s %(name)10s: %(message)s",
)
CLIENT_ONLY: int = int(os.environ.get("CLIENT_ONLY", "0"))
class InData(BaseModel):
start: int
end: int
step: int = 1
sleep: float = 1.0
class RangeChunk(BaseModel):
i: int
n: int
PORT_GENERATOR: int = 8000
PORT_PROXY: int = 7000
app_gen = FastAPI()
@app_gen.post("/stream")
async def gen_stream(in_data: InData, req: Request) -> StreamingResponse:
LOGGER = logging.getLogger("generator")
LOGGER.info(f"<- {dict(req.headers)}")
LOGGER.info(f"<- {in_data}")
async def range_stream(data: InData) -> AsyncGenerator[bytes, Any]:
n = 0
for i in range(data.start, data.end, data.step):
obj = RangeChunk(i=i, n=n)
n += 1
LOGGER.info(f"-> {obj}")
yield obj.model_dump_json().encode()
await asyncio.sleep(data.sleep)
return StreamingResponse(
range_stream(in_data),
media_type="application/json",
)
app_proxy = FastAPI()
@app_proxy.post("/stream")
async def proxy_stream(req: Request) -> StreamingResponse:
LOGGER = logging.getLogger("proxy")
body = await req.body()
LOGGER.info(f"<- {body}")
headers = {k: v for k, v in req.headers.items() if k.lower != "host"}
LOGGER.info(f"<- {headers}")
async def response_stream() -> AsyncGenerator[bytes, None]:
async with AsyncClient() as client:
async with client.stream(
"POST",
f"http://localhost:{PORT_GENERATOR}/stream",
content=body,
headers=headers,
timeout=None,
) as r:
LOGGER.info(f"-> {r.headers}")
async for chunk in r.aiter_raw():
if await req.is_disconnected():
LOGGER.warning("xx response stream closed")
break
LOGGER.info(f"-> {chunk}")
yield chunk
return StreamingResponse(
response_stream(),
)
async def run(app: FastAPI, port: int) -> None:
config = uvicorn.Config(
app,
host="localhost",
port=port,
log_level="error",
)
server = uvicorn.Server(config)
await server.serve()
async def client() -> None:
LOGGER = logging.getLogger("client")
in_data = InData(
start=1,
end=10,
step=2,
sleep=0.2,
)
LOGGER.info(f"-> {in_data}")
async with AsyncClient() as client:
async with client.stream(
"POST",
f"http://localhost:{PORT_PROXY}/stream",
content=in_data.model_dump_json(),
timeout=None,
) as r:
r.raise_for_status()
LOGGER.info(f"<- {r.status_code}")
LOGGER.info(f"<- {r.headers}")
async for chunk in r.aiter_bytes():
LOGGER.info(f"<- {chunk}")
async def main() -> None:
task_gen = asyncio.create_task(run(app_gen, PORT_GENERATOR))
task_proxy = asyncio.create_task(run(app_proxy, PORT_PROXY))
await asyncio.sleep(2.0)
task_client = asyncio.create_task(client())
await task_client
await task_gen
await task_proxy
if __name__ == "__main__":
try:
loop = asyncio.new_event_loop()
if CLIENT_ONLY == 1:
loop.run_until_complete(client())
else:
loop.run_until_complete(main())
except Exception as _:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment