Skip to content

Instantly share code, notes, and snippets.

@lukestanley
Created April 26, 2026 01:18
Show Gist options
  • Select an option

  • Save lukestanley/acf50bb00c28f192d1d7ee73f6c170ca to your computer and use it in GitHub Desktop.

Select an option

Save lukestanley/acf50bb00c28f192d1d7ee73f6c170ca to your computer and use it in GitHub Desktop.
Single-file Nano RLM agent: a Recursive Language Model harness using a networked VM for experimenting with recursive and self-optimising local-model agents that can work around context-window limits.
#!/usr/bin/env python3
"""nano_rlm.py - a compact VM-isolated Recursive Language Model demo.
The full prompt is loaded into an isolated persistent REPL, the root model sees
only metadata, generated Python can call `rlm_query(...)` recursively, compact
observations are fed back, and setting JSON-serializable `Final` returns the
answer. The demo summarizes `rlm.pdf`.
Requires:
- qemu-system-x86_64
- pdftotext, for the rlm.pdf demo
- a local /api/chat model server
"""
import argparse
import json
import os
import re
import secrets
import select
import shutil
import subprocess
import sys
import time
import urllib.request
from pathlib import Path
DEFAULT_MODEL = os.environ.get("NANO_RLM_MODEL", "qwen3.6")
LLM_SERVER_URL = os.environ.get("NANO_RLM_LLM_URL", "http://localhost:11434")
ALPINE_RELEASES_URL = "https://dl-cdn.alpinelinux.org/alpine/latest-stable/releases/x86_64"
CACHE_DIR = Path.home() / ".cache" / "nano-rlm"
APK_CACHE_DIR = CACHE_DIR / "apk-cache"
BOOT_KERNEL_PATH = CACHE_DIR / "vmlinuz-virt"
BOOT_INITRAMFS_PATH = CACHE_DIR / "initramfs-virt"
PROMPT_PREFIX_CHARS = 1600
OBS_PREFIX_CHARS = 800
CODE_BLOCK_PATTERN = re.compile(r"```(?P<language>python|repl|bash|sh)?\s*\n(?P<body>.*?)```", re.DOTALL)
GUEST_REPL_SERVER_CODE = r'''
import builtins, io, json, os, queue, subprocess, sys, threading, traceback, types
raw_stdin = os.fdopen(sys.stdin.fileno(), "rb", buffering=0)
raw_stdout = sys.__stdout__.buffer
marker = raw_stdin.readline().rstrip(b"\r\n")[-16:]
sessions = {}
write_lock = threading.Lock()
def read_exact(byte_count):
byte_chunks = []
remaining = byte_count
while remaining:
chunk = raw_stdin.read(remaining)
if not chunk:
raise EOFError("host input closed")
byte_chunks.append(chunk)
remaining -= len(chunk)
return b"".join(byte_chunks)
def read_frame():
size = b""
while True:
next_byte = raw_stdin.read(1)
if not next_byte:
raise EOFError("host input closed")
if next_byte in b" \r\n\t" and not size:
continue
if next_byte == b":":
break
size += next_byte
return json.loads(read_exact(int(size)).decode("utf-8"))
def emit(frame):
data = json.dumps(frame, ensure_ascii=True).encode("utf-8")
with write_lock:
raw_stdout.write(marker + str(len(data)).encode() + b":" + data + b"\n")
raw_stdout.flush()
def text_metadata(text, prefix_chars=800):
line_count = text.count("\n")
if text:
line_count += 1
return {
"chars": len(text),
"lines": line_count,
"prefix": text[:prefix_chars],
"truncated": len(text) > prefix_chars,
}
def value_metadata(value):
metadata = {"type": type(value).__name__}
try:
metadata["len"] = len(value)
except TypeError:
pass
if isinstance(value, str):
metadata.update(text_metadata(value, 200))
return metadata
metadata["repr"] = repr(value)[:200]
return metadata
class RLMQueryModule(types.ModuleType):
def __call__(self, *positional_arguments, **keyword_arguments):
return self.rlm_query(*positional_arguments, **keyword_arguments)
class Session:
def __init__(self, session_id, initial_globals=None, context_summary=""):
self.session_id = session_id
self.context_summary = context_summary
self.reply_queue = queue.Queue()
self.env = {"__name__": "__rlm_repl__"}
self.env.update(initial_globals or {})
self.env["host_call"] = self.host_call
self.env["rlm_query"] = self.rlm_query
self.env["SHOW_VARS"] = self.show_vars
def visible_names(self):
hidden = {"host_call", "rlm_query", "SHOW_VARS", "print"}
return [name for name in sorted(self.env) if not name.startswith("_") and name not in hidden]
def show_vars(self):
rows = []
for name in self.visible_names():
value = self.env[name]
try:
rows.append(f"{name}: {type(value).__name__}(len={len(value)})")
except TypeError:
rows.append(f"{name}: {type(value).__name__}")
self.env.get("print", builtins.print)("\n".join(rows))
def state_summary(self):
names = self.visible_names()
return {
"context_summary": self.context_summary,
"variables": names[:20],
"variable_count": len(names),
}
def final_payload(self):
if "Final" not in self.env:
return {"is_set": False}
value = self.env["Final"]
try:
json.dumps(value)
except (TypeError, ValueError):
return {
"is_set": True,
"json": False,
"error": "Final must be JSON-serializable.",
"metadata": value_metadata(value),
}
return {
"is_set": True,
"json": True,
"value": value,
"metadata": value_metadata(value),
}
def host_call(self, method, payload=None):
emit({"kind": "host_call", "session": self.session_id, "method": method, "payload": payload or {}})
reply = self.reply_queue.get()
if "error" in reply:
raise RuntimeError(reply["error"])
return reply.get("value")
def rlm_query(self, prompt, model=None, **extra_prompt_fields):
if extra_prompt_fields:
notes = "\n".join(
f"{field_name}: {field_value}"
for field_name, field_value in extra_prompt_fields.items()
)
prompt = str(prompt) + "\n\n" + notes
return self.host_call("rlm_query", {"prompt": prompt, "model": model})
def set_vars(self, values):
self.env.update(values)
self.env.pop("Final", None)
emit({"kind": "set_vars_done", "session": self.session_id, "state": self.state_summary()})
def make_print(self, stdout):
def repl_print(*values, sep=" ", end="\n", file=None, flush=False):
if file is not None and file is not sys.stdout:
return builtins.print(*values, sep=sep, end=end, file=file, flush=flush)
stdout.write(sep.join(str(value) for value in values) + end)
return repl_print
def run(self, code):
stdout, stderr = io.StringIO(), io.StringIO()
old_print = self.env.get("print", builtins.print)
previous_rlm_query_module = sys.modules.get("rlm_query")
module = RLMQueryModule("rlm_query")
module.rlm_query = self.rlm_query
sys.modules["rlm_query"] = module
self.env["print"] = self.make_print(stdout)
try:
exec(compile(code, "<rlm-repl>", "exec"), self.env)
except BaseException:
stderr.write(traceback.format_exc())
finally:
self.env["print"] = old_print
if previous_rlm_query_module is None:
sys.modules.pop("rlm_query", None)
if previous_rlm_query_module is not None:
sys.modules["rlm_query"] = previous_rlm_query_module
emit({
"kind": "result",
"session": self.session_id,
"out": stdout.getvalue(),
"err": stderr.getvalue(),
"metadata": {
"stdout": text_metadata(stdout.getvalue()),
"stderr": text_metadata(stderr.getvalue()),
},
"state": self.state_summary(),
"final": self.final_payload(),
})
def run_bash(self, script):
completed = subprocess.run(
["/bin/sh", "-lc", script],
cwd="/tmp",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=300,
)
emit({
"kind": "result",
"session": self.session_id,
"out": completed.stdout,
"err": completed.stderr,
"metadata": {
"stdout": text_metadata(completed.stdout),
"stderr": text_metadata(completed.stderr),
"returncode": completed.returncode,
},
"state": self.state_summary(),
"final": self.final_payload(),
})
def run_exec_thread(session, language, code):
def execute_code():
if language in {"bash", "sh"}:
session.run_bash(code)
return
session.run(code)
thread = threading.Thread(target=execute_code, daemon=True)
thread.start()
emit({"kind": "server_ready"})
while True:
try:
frame = read_frame()
kind = frame.get("kind")
session_id = frame.get("session")
if kind == "exit":
emit({"kind": "bye"})
raise SystemExit
if kind == "new_session":
sessions[session_id] = Session(session_id, frame.get("env") or {}, frame.get("context_summary") or "")
emit({"kind": "session_ready", "session": session_id, "state": sessions[session_id].state_summary()})
continue
if kind == "set_vars":
sessions[session_id].set_vars(frame.get("values") or {})
continue
if kind == "exec":
run_exec_thread(sessions[session_id], frame.get("language") or "python", frame.get("code") or "")
continue
if kind == "host_call_reply":
sessions[session_id].reply_queue.put(frame)
continue
if kind == "close_session":
sessions.pop(session_id, None)
emit({"kind": "closed", "session": session_id})
continue
emit({"kind": "error", "session": session_id, "error": "unknown frame kind: " + repr(kind)})
except SystemExit:
raise
except BaseException as error:
emit({"kind": "error", "session": locals().get("session_id"), "error": str(error)})
'''
RLM_SYSTEM_PROMPT = """You are the root LM inside a Recursive Language Model (RLM).
You answer by writing Python or Bash that runs inside an isolated VM.
Important environment:
- `prompt` is the full user prompt as a plain Python `str`, not a dict.
Inspect it with `len(prompt)`, `prompt[:1000]`, regex, split, and slicing.
- `rlm_query(f"question plus relevant snippet")` recursively asks a child RLM to
solve a focused subtask. It is already available as a global function.
- `SHOW_VARS()` prints current REPL variables.
- `bash` blocks run as shell scripts in `/tmp`. Heredocs work. Useful installed
tools include `rg`, `jq`, `fd`, `file`, `tree`, `tar`, `unzip`, `gzip`,
`sqlite3`, `git`, `diff`, `pdftotext`, GNU `find`, and GNU coreutils.
- Set JSON-serializable `Final` when the answer is ready.
Use Python for persistent state, parsing, aggregation, `rlm_query`, and `Final`.
Use Bash for filesystem inspection, text search, archives, JSON, SQLite, Git,
PDF text extraction, and shell-native commands. Bash does not persist variables
into Python. Do not dump the whole prompt. Each response must be one fenced
`python` or `bash` block."""
def ensure_cache_dirs() -> None:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
APK_CACHE_DIR.mkdir(parents=True, exist_ok=True)
def find_qemu() -> str:
for name in ("qemu-system-x86_64", "qemu-system-x86-64"):
path = shutil.which(name)
if path:
return path
raise RuntimeError("qemu-system-x86_64 not found on PATH")
def ensure_alpine_iso() -> Path:
ensure_cache_dirs()
cached_iso_paths = sorted(CACHE_DIR.glob("alpine-virt-*-x86_64.iso"))
if cached_iso_paths:
return cached_iso_paths[-1]
index = urllib.request.urlopen(f"{ALPINE_RELEASES_URL}/").read().decode("utf-8", "replace")
names = sorted(set(re.findall(r"alpine-virt-[0-9.]+-x86_64\.iso", index)))
if not names:
raise RuntimeError(f"no Alpine virt ISO found at {ALPINE_RELEASES_URL}/")
target_path = CACHE_DIR / names[-1]
print(f"Downloading {ALPINE_RELEASES_URL}/{names[-1]} -> {target_path}")
request = urllib.request.Request(f"{ALPINE_RELEASES_URL}/{names[-1]}", headers={"User-Agent": "nano-rlm"})
with urllib.request.urlopen(request) as response, open(target_path, "wb") as output_file:
shutil.copyfileobj(response, output_file)
return target_path
def extract_iso_member(iso_path: Path, member_path: str, target_path: Path) -> Path:
if target_path.exists():
return target_path
if not shutil.which("isoinfo"):
raise RuntimeError("isoinfo is required to extract Alpine boot files")
with open(target_path, "wb") as output_file:
subprocess.run(["isoinfo", "-R", "-i", str(iso_path), "-x", member_path], check=True, stdout=output_file)
return target_path
def short_text_metadata(text: str, prefix_chars: int = OBS_PREFIX_CHARS) -> dict:
line_count = text.count("\n")
if text:
line_count += 1
return {
"chars": len(text),
"lines": line_count,
"prefix": text[:prefix_chars],
"truncated": len(text) > prefix_chars,
}
def truncate(text: str, limit: int = 1200) -> str:
if len(text) <= limit:
return text
half = limit // 2
return text[:half] + f"\n...[{len(text) - limit} chars truncated]...\n" + text[-half:]
class IsolatedReplVm:
"""One Alpine VM running a tiny multi-session Python REPL server."""
def __init__(self, memory_mb: int = 1536):
ensure_cache_dirs()
self.marker = secrets.token_hex(8).encode()
self.buffer = b""
self.echo_console = True
self.qemu_binary = find_qemu()
self.iso_path = ensure_alpine_iso()
self.kernel_path = extract_iso_member(self.iso_path, "/boot/vmlinuz-virt", BOOT_KERNEL_PATH)
self.initramfs_path = extract_iso_member(self.iso_path, "/boot/initramfs-virt", BOOT_INITRAMFS_PATH)
self.memory_mb = memory_mb
self.process: subprocess.Popen[bytes] | None = None
self.start()
def start(self) -> None:
print(f"Booting Alpine VM from {self.iso_path}")
self.process = subprocess.Popen(
[
self.qemu_binary,
"-accel", "tcg,thread=multi",
"-m", str(self.memory_mb),
"-smp", "1",
"-nic", "user",
"-display", "none",
"-monitor", "none",
"-serial", "stdio",
"-cdrom", str(self.iso_path),
"-kernel", str(self.kernel_path),
"-initrd", str(self.initramfs_path),
"-append", "console=ttyS0 modules=loop,squashfs,sd-mod,usb-storage quiet",
"-virtfs",
f"local,path={CACHE_DIR},mount_tag=cache,security_model=mapped-xattr",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
)
self._boot_guest()
def close(self) -> None:
if not self.process or self.process.poll() is not None:
return
try:
self._send_frame({"kind": "exit"})
except Exception:
pass
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
self.process.wait(timeout=5)
def _pull(self, timeout: float = 1.0) -> None:
assert self.process and self.process.stdout
ready, _, _ = select.select([self.process.stdout], [], [], timeout)
if not ready:
return
chunk = os.read(self.process.stdout.fileno(), 4096)
if chunk:
self.buffer += chunk
if self.echo_console:
sys.stderr.buffer.write(chunk)
sys.stderr.buffer.flush()
def _send(self, data: bytes | str) -> None:
assert self.process and self.process.stdin
if isinstance(data, str):
data = data.encode("utf-8")
self.process.stdin.write(data)
self.process.stdin.flush()
def _wait_for(self, target: bytes | str, timeout: int = 180) -> None:
target_bytes = target
if isinstance(target_bytes, str):
target_bytes = target_bytes.encode("utf-8")
deadline = time.time() + timeout
while target_bytes not in self.buffer:
if time.time() >= deadline:
raise TimeoutError(f"{target_bytes!r} not seen, tail={self.buffer[-300:]!r}")
if self.process and self.process.poll() is not None:
raise RuntimeError("QEMU exited while booting")
self._pull()
target_index = self.buffer.index(target_bytes)
self.buffer = self.buffer[target_index + len(target_bytes):]
def _read_frame(self, timeout: int = 1800) -> dict:
deadline = time.time() + timeout
while self.marker not in self.buffer:
if time.time() >= deadline:
raise TimeoutError("timed out waiting for guest frame marker")
if self.process and self.process.poll() is not None:
raise RuntimeError("QEMU exited while reading a frame")
self._pull(timeout=0.2)
self.buffer = self.buffer[self.buffer.index(self.marker) + len(self.marker):]
while b":" not in self.buffer:
if time.time() >= deadline:
raise TimeoutError("timed out waiting for guest frame length")
self._pull(timeout=0.2)
separator_index = self.buffer.index(b":")
payload_length = int(self.buffer[:separator_index])
self.buffer = self.buffer[separator_index + 1:]
while len(self.buffer) < payload_length:
if time.time() >= deadline:
raise TimeoutError("timed out waiting for guest frame body")
self._pull(timeout=0.2)
payload_bytes, self.buffer = self.buffer[:payload_length], self.buffer[payload_length:]
return json.loads(payload_bytes.decode("utf-8"))
def _send_frame(self, frame: dict) -> None:
data = json.dumps(frame, ensure_ascii=True).encode("utf-8")
self._send(str(len(data)).encode() + b":" + data + b"\n")
def _boot_guest(self) -> None:
self._wait_for("login:", timeout=240)
self._send("root\n")
self._wait_for("#", timeout=60)
boot_script = """
stty -echo
export TERM=dumb
ip link set eth0 up
udhcpc -n -q -i eth0 >/dev/null 2>&1 || true
ip route replace default via 10.0.2.2 dev eth0 2>/dev/null || route add default gw 10.0.2.2 eth0 2>/dev/null || true
printf 'nameserver 10.0.2.3\\nnameserver 1.1.1.1\\n' > /etc/resolv.conf
branch=$(cut -d. -f1,2 /etc/alpine-release)
printf '%s\\n%s\\n' "https://dl-cdn.alpinelinux.org/alpine/v$branch/main" "https://dl-cdn.alpinelinux.org/alpine/v$branch/community" > /etc/apk/repositories
mkdir -p /cache/apk-cache /sandbox
mount -t 9p -o trans=virtio,version=9p2000.L cache /cache
echo READY_FOR_BOOTSTRAP
"""
self._send(boot_script)
self._wait_for("READY_FOR_BOOTSTRAP", timeout=120)
self._wait_for("#", timeout=20)
print("Installing guest tools...")
self._send(
"apk --cache-dir /cache/apk-cache add "
"python3 ripgrep jq fd file tree tar unzip gzip sqlite git "
"diffutils poppler-utils coreutils findutils "
"&& echo TOOLS_READY\n"
)
self._wait_for("TOOLS_READY", timeout=600)
self._wait_for("#", timeout=20)
self._send("cat > /sandbox/guest.py <<'PY'\n")
self._send(GUEST_REPL_SERVER_CODE)
if not GUEST_REPL_SERVER_CODE.endswith("\n"):
self._send("\n")
self._send("PY\n")
self._wait_for("#", timeout=30)
self.echo_console = False
command = "stty raw -echo\nexec python3 /sandbox/guest.py"
self._send(command + "\n")
time.sleep(0.2)
self._send(self.marker + b"\n")
ready = self._read_frame(timeout=120)
if ready.get("kind") != "server_ready":
raise RuntimeError(f"guest server did not start cleanly: {ready!r}")
print("Guest REPL server ready.\n")
def new_session(self, initial_globals: dict | None = None, context_summary: str = "") -> "ReplSession":
session_id = secrets.token_hex(8)
self._send_frame({
"kind": "new_session",
"session": session_id,
"env": initial_globals or {},
"context_summary": context_summary,
})
frame = self._read_frame()
if frame.get("kind") != "session_ready" or frame.get("session") != session_id:
raise RuntimeError(f"unexpected new_session reply: {frame!r}")
return ReplSession(self, session_id)
class ReplSession:
def __init__(self, virtual_machine: IsolatedReplVm, session_id: str):
self.virtual_machine = virtual_machine
self.session_id = session_id
def set_vars(self, values: dict) -> dict:
self.virtual_machine._send_frame({"kind": "set_vars", "session": self.session_id, "values": values})
frame = self.virtual_machine._read_frame()
if frame.get("kind") != "set_vars_done" or frame.get("session") != self.session_id:
raise RuntimeError(f"unexpected set_vars reply: {frame!r}")
return frame
def run(self, code: str, language: str = "python", host_call_handler=None) -> dict:
self.virtual_machine._send_frame({
"kind": "exec",
"session": self.session_id,
"language": language,
"code": code,
})
while True:
frame = self.virtual_machine._read_frame()
kind = frame.get("kind")
if frame.get("session") not in {None, self.session_id}:
raise RuntimeError(f"unexpected frame for another session: {frame!r}")
if kind == "result":
return frame
if kind == "host_call":
reply = {"kind": "host_call_reply", "session": self.session_id, "error": "no host_call handler"}
if host_call_handler is not None:
try:
value = host_call_handler(frame.get("method"), frame.get("payload") or {})
reply = {"kind": "host_call_reply", "session": self.session_id, "value": value}
except BaseException as error:
reply = {"kind": "host_call_reply", "session": self.session_id, "error": str(error)}
self.virtual_machine._send_frame(reply)
continue
if kind == "error":
raise RuntimeError(frame.get("error"))
raise RuntimeError(f"unexpected frame while executing: {frame!r}")
def close(self) -> None:
self.virtual_machine._send_frame({"kind": "close_session", "session": self.session_id})
frame = self.virtual_machine._read_frame()
if frame.get("kind") != "closed":
raise RuntimeError(f"unexpected close reply: {frame!r}")
def llm_chat(messages: list[dict], model: str = DEFAULT_MODEL) -> str:
body = json.dumps({"model": model, "messages": messages, "stream": False}).encode("utf-8")
request = urllib.request.Request(
f"{LLM_SERVER_URL}/api/chat",
data=body,
headers={"Content-Type": "application/json"},
)
response = urllib.request.urlopen(request, timeout=900).read()
return json.loads(response)["message"]["content"]
def nano_rlm(
task: str,
virtual_machine: IsolatedReplVm,
session: ReplSession | None = None,
*,
model: str = DEFAULT_MODEL,
max_steps: int = 18,
depth: int = 0,
max_depth: int = 3,
) -> str:
owns_session = session is None
session = session or virtual_machine.new_session(context_summary="isolated Python REPL")
session.set_vars({"prompt": task})
def handle_host_call(method: str, payload: dict) -> str:
if method != "rlm_query":
raise RuntimeError(f"unknown host call: {method!r}")
child_prompt = payload["prompt"]
child_model = payload.get("model") or model
if depth + 1 >= max_depth:
print(f" [depth {depth}] leaf llm call: {child_prompt[:100]!r}")
return llm_chat([{"role": "user", "content": child_prompt}], model=child_model)
print(f" [depth {depth}] child RLM: {child_prompt[:100]!r}")
child_session = virtual_machine.new_session(context_summary=f"child RLM depth {depth + 1}")
try:
return nano_rlm(
child_prompt,
virtual_machine,
child_session,
model=child_model,
max_steps=max(6, max_steps // 2),
depth=depth + 1,
max_depth=max_depth,
)
finally:
child_session.close()
prompt_meta = short_text_metadata(task, PROMPT_PREFIX_CHARS)
history = [
{"role": "system", "content": RLM_SYSTEM_PROMPT},
{"role": "user", "content": json.dumps({
"state_metadata": {"prompt_type": "str", "prompt": prompt_meta},
"instructions": (
"The full prompt is in the isolated REPL variable `prompt`, as a Python str. "
"Write Python to inspect it, recurse with rlm_query when useful, "
"build intermediate variables, and set `Final`."
),
}, indent=2)},
]
try:
for step in range(1, max_steps + 1):
print(f"\n--- RLM depth={depth} step={step}/{max_steps} ---")
model_reply = llm_chat(history, model=model)
print("assistant:", truncate(model_reply))
code_block_match = CODE_BLOCK_PATTERN.search(model_reply)
if not code_block_match:
history.append({
"role": "user",
"content": "Reply with exactly one fenced python or bash block that advances the REPL state.",
})
continue
block_language = code_block_match.group("language") or "python"
if block_language == "repl":
block_language = "python"
generated_code = code_block_match.group("body").strip()
history.append({"role": "assistant", "content": f"```{block_language}\n{generated_code}\n```"})
result = session.run(generated_code, language=block_language, host_call_handler=handle_host_call)
final = result.get("final") or {}
if final.get("is_set"):
if not final.get("json"):
raise RuntimeError(final.get("error", "Final is not JSON-serializable"))
value = final["value"]
print("Final set:", final.get("metadata"))
if isinstance(value, str):
return value
return json.dumps(value, indent=2)
stdout = result.get("out", "")
stderr = result.get("err", "")
if stdout or stderr:
print("observation:", truncate(stdout + stderr))
history.append({
"role": "user",
"content": "REPL observation metadata:\n" + json.dumps(result.get("metadata"), indent=2),
})
return "(max_steps reached before Final was set)"
finally:
if owns_session:
session.close()
def pdf_to_text(path: Path) -> str:
if not path.exists():
raise FileNotFoundError(path)
if not shutil.which("pdftotext"):
raise RuntimeError("pdftotext is required for this demo")
result = subprocess.run(
["pdftotext", str(path), "-"],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return result.stdout
def build_paper_summary_task(text: str) -> str:
return f"""Analyze the paper text below and produce a clear structured brief.
Focus on:
- the central claim and motivation
- the RLM algorithm and what makes it different from ordinary agents
- the experiments and strongest evidence
- limitations, costs, and failure modes
- what a minimal implementation must preserve
Use recursive calls over meaningful chunks when helpful. Do not quote long
passages. The final answer should be concise but substantive.
[BEGIN PAPER TEXT]
{text}
[END PAPER TEXT]
"""
def build_smoke_task(text: str) -> str:
return f"""Smoke test: summarize this excerpt from rlm.pdf in three bullets.
Inspect `prompt` briefly if useful, then set `Final` to the answer as a string.
Do not use recursive calls for this smoke test.
[BEGIN EXCERPT]
{text}
[END EXCERPT]
"""
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run a VM-isolated nano RLM over rlm.pdf.")
parser.add_argument("--pdf", type=Path, default=Path("rlm.pdf"))
parser.add_argument("--model", default=DEFAULT_MODEL)
parser.add_argument("--max-steps", type=int, default=18)
parser.add_argument("--max-depth", type=int, default=3)
parser.add_argument("--memory-mb", type=int, default=1536)
parser.add_argument("--limit-chars", type=int, default=0)
parser.add_argument("--smoke", action="store_true")
return parser.parse_args()
def main() -> int:
options = parse_args()
paper_text = pdf_to_text(options.pdf)
if options.smoke:
paper_text = paper_text[:options.limit_chars or 4000]
if not options.smoke and options.limit_chars:
paper_text = paper_text[:options.limit_chars]
print(f"Extracted {len(paper_text):,} characters from {options.pdf}")
rlm_task = build_paper_summary_task(paper_text)
if options.smoke:
rlm_task = build_smoke_task(paper_text)
virtual_machine = IsolatedReplVm(memory_mb=options.memory_mb)
try:
answer = nano_rlm(
rlm_task,
virtual_machine,
model=options.model,
max_steps=options.max_steps,
max_depth=options.max_depth,
)
finally:
virtual_machine.close()
print("\n=== FINAL ANSWER ===\n")
print(answer)
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment