Skip to content

Instantly share code, notes, and snippets.

@imnotdev25
Last active August 11, 2025 06:11
Show Gist options
  • Save imnotdev25/eeb8a7cea9b96e0c17b0f686c71bbc9b to your computer and use it in GitHub Desktop.
Save imnotdev25/eeb8a7cea9b96e0c17b0f686c71bbc9b to your computer and use it in GitHub Desktop.
VLLM Modal Deployment
import json
from typing import Any
import aiohttp
import modal
vllm_image = (
modal.Image.debian_slim(python_version="3.12")
.pip_install(
"vllm",
"huggingface_hub[hf_transfer]",
"flashinfer-python",
extra_index_url="https://download.pytorch.org/whl/cu128",
)
.env({"HF_HUB_ENABLE_HF_TRANSFER": "1"}) # faster model transfers
)
MODEL_NAME = "HF Model"
hf_cache_vol = modal.Volume.from_name("huggingface-cache", create_if_missing=True)
vllm_cache_vol = modal.Volume.from_name("vllm-cache", create_if_missing=True)
vllm_image = vllm_image.env({"VLLM_USE_V1": "1"})
FAST_BOOT = True
app = modal.App("example-vllm-openai-compatible")
N_GPU = 1
MINUTES = 60 # seconds
VLLM_PORT = 8000
@app.function(
image=vllm_image,
gpu=f"A100:{N_GPU}",
scaledown_window=1 * MINUTES, # how long should we stay up with no requests?
timeout=5 * MINUTES, # how long should we wait for container start?
volumes={
"/root/.cache/huggingface": hf_cache_vol,
"/root/.cache/vllm": vllm_cache_vol,
},
)
@modal.concurrent( # how many requests can one replica handle? tune carefully!
max_inputs=4
)
@modal.web_server(port=VLLM_PORT, startup_timeout=10 * MINUTES, requires_proxy_auth=False)
def serve():
import subprocess
cmd = [
"vllm",
"serve",
"--uvicorn-log-level=info",
MODEL_NAME,
"--served-model-name",
MODEL_NAME,
"llm",
"--host",
"0.0.0.0",
"--port",
str(VLLM_PORT),
]
# enforce-eager disables both Torch compilation and CUDA graph capture
# default is no-enforce-eager. see the --compilation-config flag for tighter control
cmd += ["--enforce-eager" if FAST_BOOT else "--no-enforce-eager"]
# assume multiple GPUs are for splitting up large matrix multiplications
cmd += ["--tensor-parallel-size", str(N_GPU)]
print(cmd)
subprocess.Popen(" ".join(cmd), shell=True)
@app.local_entrypoint()
async def test(test_timeout=10 * MINUTES, content=None, twice=True):
url = serve.get_web_url()
system_prompt = {
"role": "system",
"content": "You are a pirate who can't help but drop sly reminders that he went to Harvard.",
}
if content is None:
content = "Explain the singular value decomposition."
messages = [ # OpenAI chat format
system_prompt,
{"role": "user", "content": content},
]
async with aiohttp.ClientSession(base_url=url) as session:
print(f"Running health check for server at {url}")
async with session.get("/health", timeout=test_timeout - 1 * MINUTES) as resp:
up = resp.status == 200
assert up, f"Failed health check for server at {url}"
print(f"Successful health check for server at {url}")
print(f"Sending messages to {url}:", *messages, sep="\n\t")
await _send_request(session, "llm", messages)
if twice:
messages[0]["content"] = "You are Jar Jar Binks."
print(f"Sending messages to {url}:", *messages, sep="\n\t")
await _send_request(session, "llm", messages)
async def _send_request(
session: aiohttp.ClientSession, model: str, messages: list
) -> None:
# `stream=True` tells an OpenAI-compatible backend to stream chunks
payload: dict[str, Any] = {"messages": messages, "model": model, "stream": True}
headers = {"Content-Type": "application/json", "Accept": "text/event-stream"}
async with session.post(
"/v1/chat/completions", json=payload, headers=headers, timeout=1 * MINUTES
) as resp:
async for raw in resp.content:
resp.raise_for_status()
# extract new content and stream it
line = raw.decode().strip()
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "): # SSE prefix
line = line[len("data: "):]
chunk = json.loads(line)
assert (
chunk["object"] == "chat.completion.chunk"
) # or something went horribly wrong
print(chunk["choices"][0]["delta"]["content"], end="")
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment