Skip to content

Instantly share code, notes, and snippets.

@xoelop
Last active January 17, 2022 16:53
Show Gist options
  • Save xoelop/8abfa5efef3a502e1bced94dafc28725 to your computer and use it in GitHub Desktop.
Save xoelop/8abfa5efef3a502e1bced94dafc28725 to your computer and use it in GitHub Desktop.
A middleware that transforms the JSON output from a FastAPI endpoint into CSV content
import json
import io
from fastapi.responses import StreamingResponse
from fastapi import Request, Response
from fastapi import FastAPI
app = FastAPI()
@app.get('/test/')
def main():
return [{'a':1}, {'b':2}, {'c': 3}]
class async_iterator_wrapper:
def __init__(self, obj):
self._it = iter(obj)
def __aiter__(self):
return self
async def __anext__(self):
try:
value = next(self._it)
except StopIteration:
raise StopAsyncIteration
return value
@app.middleware("http")
async def convert_to_csv_middleware(request: Request, call_next) -> Response:
# getting the response body: https://stackoverflow.com/a/64357530/5031446
# creating a stream response from a csv: https://stackoverflow.com/a/61910803/5031446
response = await call_next(request)
if request.query_params.get("format") == "csv":
# Consuming FastAPI response and grabbing body here
resp_body = [section async for section in response.__dict__["body_iterator"]]
# Repairing FastAPI response
response.__setattr__("body_iterator", async_iterator_wrapper(resp_body))
# Formatting response body for logging
try:
resp_body = json.loads(resp_body[0].decode())
except:
resp_body = str(resp_body)
df = pd.json_normalize(resp_body)
stream = io.StringIO()
df.to_csv(stream, index=False)
response = response = StreamingResponse(
iter([stream.getvalue()]), media_type="text/csv"
)
return response
# alternate version:
# str_csv = df.to_csv(index=False)
# return PlainTextResponse(str_csv, media_type="text/csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment