Skip to content

Instantly share code, notes, and snippets.

@bukowa
Last active October 21, 2025 19:25
Show Gist options
  • Select an option

  • Save bukowa/cb076f5b0a7ae4aa85868db68170346e to your computer and use it in GitHub Desktop.

Select an option

Save bukowa/cb076f5b0a7ae4aa85868db68170346e to your computer and use it in GitHub Desktop.
python -m uvicorn run:app --host 0.0.0.0 --port 8255 --reload
# Use an official Python runtime as a parent image
FROM python:3.13-slim
# Set the working directory in the container
WORKDIR /app
# Copy the requirements file into the container at /app
COPY requirements.txt .
# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application's code into the container at /app
COPY . .
# Make port 8000 available to the world outside this container
EXPOSE 8225
# Define environment variable
ENV GEMINI_API_KEYS ""
# Run the application
CMD ["uvicorn", "run:app", "--host", "0.0.0.0", "--port", "8225"]
import httpx
import time
import asyncio
import os # <--- 1. IMPORT 'os' MODULE
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# --- Configuration ---
GEMINI_API_BASE_URL = "https://generativelanguage.googleapis.com"
# --- 2. LOAD KEYS FROM ENVIRONMENT VARIABLE ---
# We get a string with keys, empty by default if the variable does not exist
keys_string = os.getenv("GEMINI_API_KEYS", "")
# We split the string by commas and remove empty entries (e.g. when there are two commas next to each other)
API_KEYS = [key.strip() for key in keys_string.split(',') if key.strip()]
if not API_KEYS:
print("!!! WARNING: No API keys found in the GEMINI_API_KEYS environment variable.")
print("!!! Set the variable, e.g.: export GEMINI_API_KEYS='key1,key2'")
COOLDOWN_RATE_LIMIT = 30
COOLDOWN_SERVER_ERROR = 10
# --- State Management ---
API_KEY_STATES: Dict[str, Dict[str, Any]] = {
key: {"cooldown_until": 0} for key in API_KEYS
}
ROUND_ROBIN_INDEX = 0
STATE_LOCK = asyncio.Lock()
# --- Core Logic ---
app = FastAPI()
client = httpx.AsyncClient(base_url=GEMINI_API_BASE_URL)
async def get_next_available_key() -> str | None:
global ROUND_ROBIN_INDEX
async with STATE_LOCK:
if not API_KEYS: return None # Safeguard if the key list is empty
start_index = ROUND_ROBIN_INDEX
for i in range(len(API_KEYS)):
current_index = (start_index + i) % len(API_KEYS)
key = API_KEYS[current_index]
if time.time() >= API_KEY_STATES[key]["cooldown_until"]:
ROUND_ROBIN_INDEX = (current_index + 1) % len(API_KEYS)
return key
return None
async def mark_key_as_failed(key: str, status_code: int):
async with STATE_LOCK:
cooldown = COOLDOWN_RATE_LIMIT if status_code == 429 else COOLDOWN_SERVER_ERROR
print(f"Key ...{key[-4:]} ERROR (Status: {status_code}). Cooldown: {cooldown}s.")
API_KEY_STATES[key]["cooldown_until"] = time.time() + cooldown
@app.on_event("startup")
async def startup_event():
# Check at application startup
if not API_KEYS:
print("CRITICAL ERROR: The application is starting without any API keys. The proxy will not work.")
@app.on_event("shutdown")
async def shutdown_event():
await client.aclose()
@app.post("/{path:path}")
async def proxy_to_gemini(path: str, request: Request):
if not API_KEYS:
return Response(
content='{"error": "The proxy server has no API keys configured."}',
status_code=503, media_type="application/json"
)
request_body = await request.body()
headers = {k: v for k, v in request.headers.items() if k.lower() != "host"}
for _ in range(len(API_KEYS)):
api_key = await get_next_available_key()
if not api_key:
return Response(
content='{"error": "All API keys are currently unavailable (cooldown)."}',
status_code=503, media_type="application/json"
)
query_params = dict(request.query_params); query_params["key"] = api_key
try:
is_streaming = "stream" in path.lower()
response = await client.request(
method=request.method,
url=f"/{path}",
params=query_params,
content=request_body,
headers=headers,
timeout=120 if is_streaming else 30,
)
if response.status_code < 400:
return StreamingResponse(
content=response.aiter_bytes(),
status_code=response.status_code,
headers=dict(response.headers)
)
elif response.status_code in [429] or response.status_code >= 500:
await mark_key_as_failed(api_key, response.status_code)
await response.aclose()
continue
else:
return Response(
content=await response.aread(),
status_code=response.status_code,
headers=dict(response.headers)
)
except (httpx.TimeoutException, httpx.ConnectError) as e:
print(f"Network error for key ...{api_key[-4:]}: {e}")
await mark_key_as_failed(api_key, 503)
continue
return Response(
content='{"error": "All API keys failed in a row."}',
status_code=503,
media_type="application/json"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment