Skip to content

Instantly share code, notes, and snippets.

@BennyThink
Created March 16, 2025 10:39
Show Gist options
  • Save BennyThink/94ac6e088feb1cec829cf7c280c56783 to your computer and use it in GitHub Desktop.
Save BennyThink/94ac6e088feb1cec829cf7c280c56783 to your computer and use it in GitHub Desktop.
convert azure openai to openai in python
#!/usr/bin/env python3
# coding: utf-8
import os
from sanic import Sanic, json as json_response, raw
from sanic.request import Request
import httpx
client = httpx.AsyncClient(
http2=True,
timeout=httpx.Timeout(
connect=15.0,
read=300.0,
write=300.0,
pool=10.0,
),
)
app = Sanic(__name__)
url = os.getenv("URL")
api_key = os.getenv("API_KEY")
@app.route("/v1/chat/completions", methods=["POST"])
async def chat_completions(request: Request):
body = request.json
if body.get("stream"):
return await stream(request, body)
else:
return await non_stream(body)
async def non_stream(body):
response = await client.post(url, json=body, headers={"api-key": api_key})
return json_response(response.json(), status=response.status_code)
async def stream(request, body):
async with client.stream("POST", url, json=body, headers={"api-key": api_key}) as response:
if response.status_code != 200:
error = await response.aread()
return raw(error, content_type="application/json", status=response.status_code)
# success request
server = await request.respond(content_type="text/event-stream")
async for chunk in response.aiter_text():
await server.send(chunk)
if __name__ == "__main__":
app.run(host="127.0.0.1", port=8000, debug=True, dev=True, auto_reload=True)
@ben1one
Copy link

ben1one commented Jul 5, 2025

thx!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment