Skip to content

Instantly share code, notes, and snippets.

@toolittlecakes
Last active April 25, 2026 19:33
Show Gist options
  • Select an option

  • Save toolittlecakes/d1ebb189c061ab27e3ec59c1c4937137 to your computer and use it in GitHub Desktop.

Select an option

Save toolittlecakes/d1ebb189c061ab27e3ec59c1c4937137 to your computer and use it in GitHub Desktop.
OpenAI-compatible wrapper over `codex exec` with `/v1/chat/completions`, `/v1/responses`, and `/v1/models`
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "fastapi>=0.122.0",
# "uvicorn>=0.30.0",
# ]
# ///
from __future__ import annotations
import argparse
import asyncio
import json
import os
import tempfile
import time
import tomllib
import uuid
from pathlib import Path
from typing import Any
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 8000
DEFAULT_MAX_CONCURRENCY = 100
DEFAULT_TIMEOUT_SECONDS = 600
SUPPORTED_REASONING_EFFORTS = {"low", "medium", "high"}
TEXT_PART_TYPES = {"text", "input_text", "output_text"}
HELP_DESCRIPTION = """OpenAI-compatible wrapper over `codex exec`.
Main mapping:
- `model`: passed to `codex exec -m`
- `model@preset`: split into `-m model` and optional `-p preset`
- `reasoning_effort` or `reasoning.effort`: passed as `-c model_reasoning_effort="..."`
- chat `messages` or responses `input`: flattened into a single transcript prompt and sent via stdin
- chat `response_format.type=json_schema` or responses `text.format.type=json_schema`:
mapped to `--output-schema <tmpfile>`
- final answer: read from `codex exec -o <tmpfile>`
Current request shape:
- supported roles: `system`, `developer`, `user`, `assistant`
- supported content parts: plain text only
- stateless mode: prior conversation history must be included in `messages` on every request
- unsupported fields fail fast with HTTP 400 instead of being silently ignored
"""
HELP_EPILOG = """Examples:
codex2api --port 8000
codex2api --port 8000 --cwd /path/to/project
curl http://127.0.0.1:8000/v1/chat/completions \\
-H 'content-type: application/json' \\
-d '{
"model": "gpt-5.3-codex-spark",
"reasoning_effort": "low",
"messages": [{"role": "user", "content": "Say hello"}]
}'
curl http://127.0.0.1:8000/v1/chat/completions \\
-H 'content-type: application/json' \\
-d '{
"model": "gpt-5.3-codex-spark@fast",
"messages": [{"role": "user", "content": "Summarize this repo"}]
}'
curl http://127.0.0.1:8000/v1/responses \\
-H 'content-type: application/json' \\
-d '{
"model": "gpt-5.4-mini",
"instructions": "Reply briefly.",
"input": "Say hello"
}'
curl http://127.0.0.1:8000/v1/models
Preset / profile notes:
- `@preset` is optional. `gpt-5.3-codex-spark` works as-is.
- `gpt-5.3-codex-spark@fast` means the wrapper calls Codex with:
`codex exec -m gpt-5.3-codex-spark -p fast ...`
- The profile name is resolved by Codex from `~/.codex/config.toml`.
- This wrapper does not parse profile contents; it only forwards the selected profile to Codex.
- In practice, profiles are useful for presetting Codex-side behavior such as model defaults,
reasoning effort, sandbox/approval settings, shell environment policy, MCP setup, and similar
CLI configuration.
Structured output:
- `response_format.type = "json_schema"` is supported.
- `text.format.type = "json_schema"` is supported for `/v1/responses`.
- The wrapper writes the schema to a temporary file and passes it to `codex exec --output-schema`.
- `/v1/chat/completions` returns the normal chat.completions shape.
- `/v1/responses` returns a stateless subset of the Responses API.
Models endpoint:
- `GET /v1/models` does not come from a Codex CLI introspection command.
- It returns a wrapper-visible model list derived from:
1. `CODEX_CHAT_WRAPPER_MODELS` env var, if set
2. `model` values found in `~/.codex/config.toml`, including profile models when present
Not supported right now:
- `stream`
- `tools`, `tool_choice`, `parallel_tool_calls`
- `n > 1`
- `previous_response_id` / server-side response state
- sampling controls like `temperature` or `top_p`
- non-text multimodal content parts
"""
app = FastAPI(title="Codex Chat Wrapper")
class RequestError(Exception):
def __init__(self, message: str, *, status_code: int = 400, code: str = "invalid_request_error") -> None:
super().__init__(message)
self.message = message
self.status_code = status_code
self.code = code
def error_response(message: str, *, status_code: int = 400, code: str = "invalid_request_error") -> JSONResponse:
return JSONResponse(
status_code=status_code,
content={
"error": {
"message": message,
"type": code,
"code": code,
}
},
)
def require_string(value: Any, field_name: str) -> str:
if not isinstance(value, str) or not value.strip():
raise RequestError(f"`{field_name}` must be a non-empty string.")
return value
def parse_model_and_preset(model: Any) -> tuple[str, str | None]:
model_name = require_string(model, "model")
if "@" not in model_name:
return model_name, None
base_model, preset = model_name.split("@", 1)
if not base_model or not preset:
raise RequestError("`model` must be either `model_name` or `model_name@preset_name`.")
return base_model, preset
def parse_reasoning_effort(body: dict[str, Any]) -> str | None:
direct_effort = body.get("reasoning_effort")
nested_reasoning = body.get("reasoning")
if direct_effort is not None and nested_reasoning is not None:
raise RequestError("Provide either `reasoning_effort` or `reasoning.effort`, not both.")
if direct_effort is not None:
effort = require_string(direct_effort, "reasoning_effort")
elif isinstance(nested_reasoning, dict) and nested_reasoning.get("effort") is not None:
effort = require_string(nested_reasoning["effort"], "reasoning.effort")
elif nested_reasoning is None:
return None
else:
raise RequestError("`reasoning` must be an object with an `effort` field.")
if effort not in SUPPORTED_REASONING_EFFORTS:
allowed = ", ".join(sorted(SUPPORTED_REASONING_EFFORTS))
raise RequestError(f"`reasoning_effort` must be one of: {allowed}.")
return effort
def ensure_supported_chat_request(body: dict[str, Any]) -> None:
unsupported_fields: list[str] = []
if body.get("stream"):
unsupported_fields.append("stream")
if body.get("n") not in (None, 1):
unsupported_fields.append("n")
for field_name in (
"tools",
"tool_choice",
"parallel_tool_calls",
"temperature",
"top_p",
"max_tokens",
"max_completion_tokens",
"presence_penalty",
"frequency_penalty",
"logprobs",
"top_logprobs",
):
if body.get(field_name) is not None:
unsupported_fields.append(field_name)
if unsupported_fields:
names = ", ".join(sorted(set(unsupported_fields)))
raise RequestError(f"Unsupported request fields for this wrapper: {names}.")
def ensure_supported_responses_request(body: dict[str, Any]) -> None:
unsupported_fields: list[str] = []
for field_name in (
"stream",
"tools",
"tool_choice",
"parallel_tool_calls",
"previous_response_id",
"conversation",
"truncation",
"temperature",
"top_p",
"max_output_tokens",
"max_tool_calls",
"prompt",
"include",
):
if body.get(field_name) is not None:
unsupported_fields.append(field_name)
if unsupported_fields:
names = ", ".join(sorted(set(unsupported_fields)))
raise RequestError(f"Unsupported request fields for this wrapper: {names}.")
def normalize_message_text(role: str, content: Any) -> str:
if isinstance(content, str):
return content
if not isinstance(content, list):
raise RequestError(f"`messages[].content` for role `{role}` must be a string or a list of text parts.")
text_parts: list[str] = []
for index, part in enumerate(content):
if not isinstance(part, dict):
raise RequestError(f"`messages[].content[{index}]` must be an object.")
part_type = part.get("type")
if part_type not in TEXT_PART_TYPES:
raise RequestError(
f"Unsupported content part type `{part_type}` in role `{role}`. Only text parts are supported."
)
text_value = part.get("text")
if not isinstance(text_value, str):
raise RequestError(f"`messages[].content[{index}].text` must be a string.")
text_parts.append(text_value)
return "\n".join(text_parts)
def render_message(role: str, content: str, name: str | None) -> str:
header = role.upper()
if name:
header = f"{header} ({name})"
return f"{header}:\n{content}".strip()
def build_codex_prompt(messages: Any, *, structured_output: bool) -> str:
if not isinstance(messages, list) or not messages:
raise RequestError("`messages` must be a non-empty array.")
system_messages: list[str] = []
transcript: list[str] = []
for index, raw_message in enumerate(messages):
if not isinstance(raw_message, dict):
raise RequestError(f"`messages[{index}]` must be an object.")
role = raw_message.get("role")
if role not in {"system", "developer", "user", "assistant"}:
raise RequestError(
f"Unsupported message role `{role}` at `messages[{index}]`. "
"Supported roles: system, developer, user, assistant."
)
if raw_message.get("tool_calls") is not None:
raise RequestError(f"`messages[{index}].tool_calls` is not supported by this wrapper.")
content = normalize_message_text(role, raw_message.get("content"))
name = raw_message.get("name")
if name is not None and not isinstance(name, str):
raise RequestError(f"`messages[{index}].name` must be a string when provided.")
rendered = render_message(role, content, name)
if role in {"system", "developer"}:
system_messages.append(rendered)
else:
transcript.append(rendered)
if not transcript:
raise RequestError("At least one `user` or `assistant` message is required.")
instructions = [
"You are answering a chat.completions-compatible request routed through Codex exec.",
"Produce exactly one assistant reply for the current conversation.",
"Do not include role labels, XML tags, or wrapper commentary in the final answer.",
]
if structured_output:
instructions.append("Structured output is enforced separately with JSON Schema. Return only the schema-matching final answer.")
sections = ["\n".join(instructions)]
if system_messages:
sections.append("<SYSTEM_MESSAGES>\n" + "\n\n".join(system_messages) + "\n</SYSTEM_MESSAGES>")
sections.append("<CONVERSATION>\n" + "\n\n".join(transcript) + "\n</CONVERSATION>")
return "\n\n".join(sections)
def extract_chat_json_schema(body: dict[str, Any]) -> dict[str, Any] | None:
response_format = body.get("response_format")
if response_format is None:
return None
if not isinstance(response_format, dict):
raise RequestError("`response_format` must be an object.")
response_type = response_format.get("type")
if response_type in (None, "text"):
return None
if response_type != "json_schema":
raise RequestError("Only `response_format.type = \"json_schema\"` is supported.")
json_schema_config = response_format.get("json_schema")
if not isinstance(json_schema_config, dict):
raise RequestError("`response_format.json_schema` must be an object.")
schema = json_schema_config.get("schema")
if not isinstance(schema, dict):
raise RequestError("`response_format.json_schema.schema` must be an object.")
return schema
def extract_responses_json_schema(body: dict[str, Any]) -> dict[str, Any] | None:
text_config = body.get("text")
if text_config is None:
return None
if not isinstance(text_config, dict):
raise RequestError("`text` must be an object.")
format_config = text_config.get("format")
if format_config is None:
return None
if not isinstance(format_config, dict):
raise RequestError("`text.format` must be an object.")
format_type = format_config.get("type")
if format_type in (None, "text"):
return None
if format_type != "json_schema":
raise RequestError("Only `text.format.type = \"json_schema\"` is supported.")
json_schema_config = format_config.get("json_schema")
if not isinstance(json_schema_config, dict):
raise RequestError("`text.format.json_schema` must be an object.")
schema = json_schema_config.get("schema")
if not isinstance(schema, dict):
raise RequestError("`text.format.json_schema.schema` must be an object.")
return schema
def coerce_response_input_item(item: Any, index: int) -> dict[str, Any]:
if not isinstance(item, dict):
raise RequestError(f"`input[{index}]` must be an object.")
item_type = item.get("type")
if item_type in (None, "message"):
role = item.get("role")
content = item.get("content")
if role is None or content is None:
raise RequestError(f"`input[{index}]` message items must contain `role` and `content`.")
normalized: dict[str, Any] = {"role": role, "content": content}
if item.get("name") is not None:
normalized["name"] = item["name"]
if item.get("tool_calls") is not None:
normalized["tool_calls"] = item["tool_calls"]
return normalized
if item_type in TEXT_PART_TYPES:
text_value = item.get("text")
if not isinstance(text_value, str):
raise RequestError(f"`input[{index}].text` must be a string.")
return {"role": "user", "content": text_value}
raise RequestError(f"Unsupported `input[{index}].type` value `{item_type}` for this wrapper.")
def build_responses_messages(body: dict[str, Any]) -> list[dict[str, Any]]:
messages: list[dict[str, Any]] = []
instructions = body.get("instructions")
if instructions is not None:
messages.append({"role": "system", "content": require_string(instructions, "instructions")})
raw_input = body.get("input")
if raw_input is None:
raise RequestError("`input` is required for `/v1/responses`.")
if isinstance(raw_input, str):
messages.append({"role": "user", "content": raw_input})
return messages
if not isinstance(raw_input, list) or not raw_input:
raise RequestError("`input` must be a non-empty string or array.")
for index, item in enumerate(raw_input):
messages.append(coerce_response_input_item(item, index))
return messages
def load_discovered_models(config_path: Path) -> list[str]:
configured_models: list[str] = []
env_models = os.environ.get("CODEX_CHAT_WRAPPER_MODELS")
if env_models:
for raw_model in env_models.split(","):
model_name = raw_model.strip()
if model_name:
configured_models.append(model_name)
try:
config_data = tomllib.loads(config_path.read_text(encoding="utf-8"))
except FileNotFoundError:
config_data = {}
except tomllib.TOMLDecodeError:
config_data = {}
top_level_model = config_data.get("model")
if isinstance(top_level_model, str) and top_level_model.strip():
configured_models.append(top_level_model.strip())
profiles = config_data.get("profiles")
if isinstance(profiles, dict):
for profile_data in profiles.values():
if isinstance(profile_data, dict):
profile_model = profile_data.get("model")
if isinstance(profile_model, str) and profile_model.strip():
configured_models.append(profile_model.strip())
unique_models: list[str] = []
seen: set[str] = set()
for model_name in configured_models:
if model_name not in seen:
seen.add(model_name)
unique_models.append(model_name)
return unique_models
def build_models_response(models: list[str]) -> dict[str, Any]:
return {
"object": "list",
"data": [
{
"id": model_name,
"object": "model",
"created": 0,
"owned_by": "codex2api",
}
for model_name in models
],
}
async def run_codex_exec(
*,
prompt: str,
model: str,
preset: str | None,
reasoning_effort: str | None,
schema: dict[str, Any] | None,
cwd: Path,
timeout_seconds: int,
) -> str:
with tempfile.TemporaryDirectory(prefix="codex2api-") as tmpdir_name:
tmpdir = Path(tmpdir_name)
output_path = tmpdir / "response.txt"
command = [
"codex",
"exec",
"--ephemeral",
"--skip-git-repo-check",
"--color",
"never",
"-m",
model,
"-o",
str(output_path),
"-",
]
if preset is not None:
command[8:8] = ["-p", preset]
if reasoning_effort is not None:
insert_at = len(command) - 3
command[insert_at:insert_at] = ["-c", f'model_reasoning_effort="{reasoning_effort}"']
if schema is not None:
schema_path = tmpdir / "schema.json"
schema_path.write_text(json.dumps(schema), encoding="utf-8")
insert_at = len(command) - 3
command[insert_at:insert_at] = ["--output-schema", str(schema_path)]
process = await asyncio.create_subprocess_exec(
*command,
cwd=str(cwd),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(prompt.encode("utf-8")),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise RequestError(
f"codex exec timed out after {timeout_seconds} seconds.",
status_code=504,
code="timeout",
)
if process.returncode != 0:
details = stderr.decode("utf-8", errors="replace").strip() or stdout.decode("utf-8", errors="replace").strip()
if not details:
details = "codex exec failed without stderr output."
raise RequestError(f"codex exec failed: {details}", status_code=500, code="codex_exec_failed")
if not output_path.exists():
raise RequestError("codex exec completed without writing the final message file.", status_code=500, code="missing_output")
return output_path.read_text(encoding="utf-8")
@app.get("/healthz")
async def healthz() -> dict[str, str]:
return {"status": "ok"}
@app.get("/v1/models")
async def list_models(request: Request) -> JSONResponse:
models = load_discovered_models(request.app.state.codex_config_path)
return JSONResponse(content=build_models_response(models))
@app.post("/v1/chat/completions")
async def create_chat_completion(request: Request) -> JSONResponse:
try:
body = await request.json()
if not isinstance(body, dict):
raise RequestError("Request body must be a JSON object.")
ensure_supported_chat_request(body)
requested_model = body.get("model")
model, preset = parse_model_and_preset(requested_model)
reasoning_effort = parse_reasoning_effort(body)
schema = extract_chat_json_schema(body)
prompt = build_codex_prompt(body.get("messages"), structured_output=schema is not None)
async with request.app.state.codex_semaphore:
completion_text = await run_codex_exec(
prompt=prompt,
model=model,
preset=preset,
reasoning_effort=reasoning_effort,
schema=schema,
cwd=request.app.state.codex_cwd,
timeout_seconds=request.app.state.codex_timeout_seconds,
)
except RequestError as error:
return error_response(error.message, status_code=error.status_code, code=error.code)
except json.JSONDecodeError:
return error_response("Request body must be valid JSON.")
except Exception as error:
return error_response(f"Unexpected server error: {error}", status_code=500, code="server_error")
created = int(time.time())
response_body = {
"id": f"chatcmpl-{uuid.uuid4().hex}",
"object": "chat.completion",
"created": created,
"model": requested_model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": completion_text,
},
"finish_reason": "stop",
}
],
}
return JSONResponse(content=response_body)
@app.post("/v1/responses")
async def create_response(request: Request) -> JSONResponse:
try:
body = await request.json()
if not isinstance(body, dict):
raise RequestError("Request body must be a JSON object.")
ensure_supported_responses_request(body)
requested_model = body.get("model")
model, preset = parse_model_and_preset(requested_model)
reasoning_effort = parse_reasoning_effort(body)
schema = extract_responses_json_schema(body)
messages = build_responses_messages(body)
prompt = build_codex_prompt(messages, structured_output=schema is not None)
async with request.app.state.codex_semaphore:
completion_text = await run_codex_exec(
prompt=prompt,
model=model,
preset=preset,
reasoning_effort=reasoning_effort,
schema=schema,
cwd=request.app.state.codex_cwd,
timeout_seconds=request.app.state.codex_timeout_seconds,
)
except RequestError as error:
return error_response(error.message, status_code=error.status_code, code=error.code)
except json.JSONDecodeError:
return error_response("Request body must be valid JSON.")
except Exception as error:
return error_response(f"Unexpected server error: {error}", status_code=500, code="server_error")
created = int(time.time())
response_body = {
"id": f"resp_{uuid.uuid4().hex}",
"object": "response",
"created_at": created,
"status": "completed",
"error": None,
"incomplete_details": None,
"instructions": body.get("instructions"),
"model": requested_model,
"output": [
{
"id": f"msg_{uuid.uuid4().hex}",
"type": "message",
"status": "completed",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": completion_text,
"annotations": [],
}
],
}
],
}
return JSONResponse(content=response_body)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=HELP_DESCRIPTION,
epilog=HELP_EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--host", default=os.environ.get("CODEX_CHAT_WRAPPER_HOST", DEFAULT_HOST))
parser.add_argument(
"--port",
type=int,
default=int(os.environ.get("CODEX_CHAT_WRAPPER_PORT", str(DEFAULT_PORT))),
)
parser.add_argument(
"--cwd",
type=Path,
default=Path(os.environ.get("CODEX_CHAT_WRAPPER_CWD", os.getcwd())).resolve(),
help="Working directory passed to codex exec.",
)
parser.add_argument(
"--codex-config",
type=Path,
default=Path(os.environ.get("CODEX_CHAT_WRAPPER_CODEX_CONFIG", "~/.codex/config.toml")).expanduser().resolve(),
help="Codex config.toml used for /v1/models discovery.",
)
parser.add_argument(
"--max-concurrency",
type=int,
default=int(os.environ.get("CODEX_CHAT_WRAPPER_MAX_CONCURRENCY", str(DEFAULT_MAX_CONCURRENCY))),
help=f"Maximum number of concurrent codex exec subprocesses. Default: {DEFAULT_MAX_CONCURRENCY}.",
)
parser.add_argument(
"--timeout-seconds",
type=int,
default=int(os.environ.get("CODEX_CHAT_WRAPPER_TIMEOUT_SECONDS", str(DEFAULT_TIMEOUT_SECONDS))),
help=f"Timeout for a single codex exec request in seconds. Default: {DEFAULT_TIMEOUT_SECONDS}.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
if args.max_concurrency < 1:
raise SystemExit("--max-concurrency must be >= 1")
if args.timeout_seconds < 1:
raise SystemExit("--timeout-seconds must be >= 1")
app.state.codex_cwd = args.cwd
app.state.codex_config_path = args.codex_config
app.state.codex_semaphore = asyncio.Semaphore(args.max_concurrency)
app.state.codex_timeout_seconds = args.timeout_seconds
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment