Last active
October 21, 2025 19:25
-
-
Save bukowa/cb076f5b0a7ae4aa85868db68170346e to your computer and use it in GitHub Desktop.
python -m uvicorn run:app --host 0.0.0.0 --port 8255 --reload
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Use an official Python runtime as a parent image | |
| FROM python:3.13-slim | |
| # Set the working directory in the container | |
| WORKDIR /app | |
| # Copy the requirements file into the container at /app | |
| COPY requirements.txt . | |
| # Install any needed packages specified in requirements.txt | |
| RUN pip install --no-cache-dir -r requirements.txt | |
| # Copy the rest of the application's code into the container at /app | |
| COPY . . | |
| # Make port 8000 available to the world outside this container | |
| EXPOSE 8225 | |
| # Define environment variable | |
| ENV GEMINI_API_KEYS "" | |
| # Run the application | |
| CMD ["uvicorn", "run:app", "--host", "0.0.0.0", "--port", "8225"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import httpx | |
| import time | |
| import asyncio | |
| import os # <--- 1. IMPORT 'os' MODULE | |
| from fastapi import FastAPI, Request, Response | |
| from fastapi.responses import StreamingResponse | |
| from typing import Dict, Any | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| # --- Configuration --- | |
| GEMINI_API_BASE_URL = "https://generativelanguage.googleapis.com" | |
| # --- 2. LOAD KEYS FROM ENVIRONMENT VARIABLE --- | |
| # We get a string with keys, empty by default if the variable does not exist | |
| keys_string = os.getenv("GEMINI_API_KEYS", "") | |
| # We split the string by commas and remove empty entries (e.g. when there are two commas next to each other) | |
| API_KEYS = [key.strip() for key in keys_string.split(',') if key.strip()] | |
| if not API_KEYS: | |
| print("!!! WARNING: No API keys found in the GEMINI_API_KEYS environment variable.") | |
| print("!!! Set the variable, e.g.: export GEMINI_API_KEYS='key1,key2'") | |
| COOLDOWN_RATE_LIMIT = 30 | |
| COOLDOWN_SERVER_ERROR = 10 | |
| # --- State Management --- | |
| API_KEY_STATES: Dict[str, Dict[str, Any]] = { | |
| key: {"cooldown_until": 0} for key in API_KEYS | |
| } | |
| ROUND_ROBIN_INDEX = 0 | |
| STATE_LOCK = asyncio.Lock() | |
| # --- Core Logic --- | |
| app = FastAPI() | |
| client = httpx.AsyncClient(base_url=GEMINI_API_BASE_URL) | |
| async def get_next_available_key() -> str | None: | |
| global ROUND_ROBIN_INDEX | |
| async with STATE_LOCK: | |
| if not API_KEYS: return None # Safeguard if the key list is empty | |
| start_index = ROUND_ROBIN_INDEX | |
| for i in range(len(API_KEYS)): | |
| current_index = (start_index + i) % len(API_KEYS) | |
| key = API_KEYS[current_index] | |
| if time.time() >= API_KEY_STATES[key]["cooldown_until"]: | |
| ROUND_ROBIN_INDEX = (current_index + 1) % len(API_KEYS) | |
| return key | |
| return None | |
| async def mark_key_as_failed(key: str, status_code: int): | |
| async with STATE_LOCK: | |
| cooldown = COOLDOWN_RATE_LIMIT if status_code == 429 else COOLDOWN_SERVER_ERROR | |
| print(f"Key ...{key[-4:]} ERROR (Status: {status_code}). Cooldown: {cooldown}s.") | |
| API_KEY_STATES[key]["cooldown_until"] = time.time() + cooldown | |
| @app.on_event("startup") | |
| async def startup_event(): | |
| # Check at application startup | |
| if not API_KEYS: | |
| print("CRITICAL ERROR: The application is starting without any API keys. The proxy will not work.") | |
| @app.on_event("shutdown") | |
| async def shutdown_event(): | |
| await client.aclose() | |
| @app.post("/{path:path}") | |
| async def proxy_to_gemini(path: str, request: Request): | |
| if not API_KEYS: | |
| return Response( | |
| content='{"error": "The proxy server has no API keys configured."}', | |
| status_code=503, media_type="application/json" | |
| ) | |
| request_body = await request.body() | |
| headers = {k: v for k, v in request.headers.items() if k.lower() != "host"} | |
| for _ in range(len(API_KEYS)): | |
| api_key = await get_next_available_key() | |
| if not api_key: | |
| return Response( | |
| content='{"error": "All API keys are currently unavailable (cooldown)."}', | |
| status_code=503, media_type="application/json" | |
| ) | |
| query_params = dict(request.query_params); query_params["key"] = api_key | |
| try: | |
| is_streaming = "stream" in path.lower() | |
| response = await client.request( | |
| method=request.method, | |
| url=f"/{path}", | |
| params=query_params, | |
| content=request_body, | |
| headers=headers, | |
| timeout=120 if is_streaming else 30, | |
| ) | |
| if response.status_code < 400: | |
| return StreamingResponse( | |
| content=response.aiter_bytes(), | |
| status_code=response.status_code, | |
| headers=dict(response.headers) | |
| ) | |
| elif response.status_code in [429] or response.status_code >= 500: | |
| await mark_key_as_failed(api_key, response.status_code) | |
| await response.aclose() | |
| continue | |
| else: | |
| return Response( | |
| content=await response.aread(), | |
| status_code=response.status_code, | |
| headers=dict(response.headers) | |
| ) | |
| except (httpx.TimeoutException, httpx.ConnectError) as e: | |
| print(f"Network error for key ...{api_key[-4:]}: {e}") | |
| await mark_key_as_failed(api_key, 503) | |
| continue | |
| return Response( | |
| content='{"error": "All API keys failed in a row."}', | |
| status_code=503, | |
| media_type="application/json" | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment