Created
April 26, 2026 01:18
-
-
Save lukestanley/acf50bb00c28f192d1d7ee73f6c170ca to your computer and use it in GitHub Desktop.
Single-file Nano RLM agent: a Recursive Language Model harness using a networked VM for experimenting with recursive and self-optimising local-model agents that can work around context-window limits.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """nano_rlm.py - a compact VM-isolated Recursive Language Model demo. | |
| The full prompt is loaded into an isolated persistent REPL, the root model sees | |
| only metadata, generated Python can call `rlm_query(...)` recursively, compact | |
| observations are fed back, and setting JSON-serializable `Final` returns the | |
| answer. The demo summarizes `rlm.pdf`. | |
| Requires: | |
| - qemu-system-x86_64 | |
| - pdftotext, for the rlm.pdf demo | |
| - a local /api/chat model server | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import secrets | |
| import select | |
| import shutil | |
| import subprocess | |
| import sys | |
| import time | |
| import urllib.request | |
| from pathlib import Path | |
| DEFAULT_MODEL = os.environ.get("NANO_RLM_MODEL", "qwen3.6") | |
| LLM_SERVER_URL = os.environ.get("NANO_RLM_LLM_URL", "http://localhost:11434") | |
| ALPINE_RELEASES_URL = "https://dl-cdn.alpinelinux.org/alpine/latest-stable/releases/x86_64" | |
| CACHE_DIR = Path.home() / ".cache" / "nano-rlm" | |
| APK_CACHE_DIR = CACHE_DIR / "apk-cache" | |
| BOOT_KERNEL_PATH = CACHE_DIR / "vmlinuz-virt" | |
| BOOT_INITRAMFS_PATH = CACHE_DIR / "initramfs-virt" | |
| PROMPT_PREFIX_CHARS = 1600 | |
| OBS_PREFIX_CHARS = 800 | |
| CODE_BLOCK_PATTERN = re.compile(r"```(?P<language>python|repl|bash|sh)?\s*\n(?P<body>.*?)```", re.DOTALL) | |
| GUEST_REPL_SERVER_CODE = r''' | |
| import builtins, io, json, os, queue, subprocess, sys, threading, traceback, types | |
| raw_stdin = os.fdopen(sys.stdin.fileno(), "rb", buffering=0) | |
| raw_stdout = sys.__stdout__.buffer | |
| marker = raw_stdin.readline().rstrip(b"\r\n")[-16:] | |
| sessions = {} | |
| write_lock = threading.Lock() | |
| def read_exact(byte_count): | |
| byte_chunks = [] | |
| remaining = byte_count | |
| while remaining: | |
| chunk = raw_stdin.read(remaining) | |
| if not chunk: | |
| raise EOFError("host input closed") | |
| byte_chunks.append(chunk) | |
| remaining -= len(chunk) | |
| return b"".join(byte_chunks) | |
| def read_frame(): | |
| size = b"" | |
| while True: | |
| next_byte = raw_stdin.read(1) | |
| if not next_byte: | |
| raise EOFError("host input closed") | |
| if next_byte in b" \r\n\t" and not size: | |
| continue | |
| if next_byte == b":": | |
| break | |
| size += next_byte | |
| return json.loads(read_exact(int(size)).decode("utf-8")) | |
| def emit(frame): | |
| data = json.dumps(frame, ensure_ascii=True).encode("utf-8") | |
| with write_lock: | |
| raw_stdout.write(marker + str(len(data)).encode() + b":" + data + b"\n") | |
| raw_stdout.flush() | |
| def text_metadata(text, prefix_chars=800): | |
| line_count = text.count("\n") | |
| if text: | |
| line_count += 1 | |
| return { | |
| "chars": len(text), | |
| "lines": line_count, | |
| "prefix": text[:prefix_chars], | |
| "truncated": len(text) > prefix_chars, | |
| } | |
| def value_metadata(value): | |
| metadata = {"type": type(value).__name__} | |
| try: | |
| metadata["len"] = len(value) | |
| except TypeError: | |
| pass | |
| if isinstance(value, str): | |
| metadata.update(text_metadata(value, 200)) | |
| return metadata | |
| metadata["repr"] = repr(value)[:200] | |
| return metadata | |
| class RLMQueryModule(types.ModuleType): | |
| def __call__(self, *positional_arguments, **keyword_arguments): | |
| return self.rlm_query(*positional_arguments, **keyword_arguments) | |
| class Session: | |
| def __init__(self, session_id, initial_globals=None, context_summary=""): | |
| self.session_id = session_id | |
| self.context_summary = context_summary | |
| self.reply_queue = queue.Queue() | |
| self.env = {"__name__": "__rlm_repl__"} | |
| self.env.update(initial_globals or {}) | |
| self.env["host_call"] = self.host_call | |
| self.env["rlm_query"] = self.rlm_query | |
| self.env["SHOW_VARS"] = self.show_vars | |
| def visible_names(self): | |
| hidden = {"host_call", "rlm_query", "SHOW_VARS", "print"} | |
| return [name for name in sorted(self.env) if not name.startswith("_") and name not in hidden] | |
| def show_vars(self): | |
| rows = [] | |
| for name in self.visible_names(): | |
| value = self.env[name] | |
| try: | |
| rows.append(f"{name}: {type(value).__name__}(len={len(value)})") | |
| except TypeError: | |
| rows.append(f"{name}: {type(value).__name__}") | |
| self.env.get("print", builtins.print)("\n".join(rows)) | |
| def state_summary(self): | |
| names = self.visible_names() | |
| return { | |
| "context_summary": self.context_summary, | |
| "variables": names[:20], | |
| "variable_count": len(names), | |
| } | |
| def final_payload(self): | |
| if "Final" not in self.env: | |
| return {"is_set": False} | |
| value = self.env["Final"] | |
| try: | |
| json.dumps(value) | |
| except (TypeError, ValueError): | |
| return { | |
| "is_set": True, | |
| "json": False, | |
| "error": "Final must be JSON-serializable.", | |
| "metadata": value_metadata(value), | |
| } | |
| return { | |
| "is_set": True, | |
| "json": True, | |
| "value": value, | |
| "metadata": value_metadata(value), | |
| } | |
| def host_call(self, method, payload=None): | |
| emit({"kind": "host_call", "session": self.session_id, "method": method, "payload": payload or {}}) | |
| reply = self.reply_queue.get() | |
| if "error" in reply: | |
| raise RuntimeError(reply["error"]) | |
| return reply.get("value") | |
| def rlm_query(self, prompt, model=None, **extra_prompt_fields): | |
| if extra_prompt_fields: | |
| notes = "\n".join( | |
| f"{field_name}: {field_value}" | |
| for field_name, field_value in extra_prompt_fields.items() | |
| ) | |
| prompt = str(prompt) + "\n\n" + notes | |
| return self.host_call("rlm_query", {"prompt": prompt, "model": model}) | |
| def set_vars(self, values): | |
| self.env.update(values) | |
| self.env.pop("Final", None) | |
| emit({"kind": "set_vars_done", "session": self.session_id, "state": self.state_summary()}) | |
| def make_print(self, stdout): | |
| def repl_print(*values, sep=" ", end="\n", file=None, flush=False): | |
| if file is not None and file is not sys.stdout: | |
| return builtins.print(*values, sep=sep, end=end, file=file, flush=flush) | |
| stdout.write(sep.join(str(value) for value in values) + end) | |
| return repl_print | |
| def run(self, code): | |
| stdout, stderr = io.StringIO(), io.StringIO() | |
| old_print = self.env.get("print", builtins.print) | |
| previous_rlm_query_module = sys.modules.get("rlm_query") | |
| module = RLMQueryModule("rlm_query") | |
| module.rlm_query = self.rlm_query | |
| sys.modules["rlm_query"] = module | |
| self.env["print"] = self.make_print(stdout) | |
| try: | |
| exec(compile(code, "<rlm-repl>", "exec"), self.env) | |
| except BaseException: | |
| stderr.write(traceback.format_exc()) | |
| finally: | |
| self.env["print"] = old_print | |
| if previous_rlm_query_module is None: | |
| sys.modules.pop("rlm_query", None) | |
| if previous_rlm_query_module is not None: | |
| sys.modules["rlm_query"] = previous_rlm_query_module | |
| emit({ | |
| "kind": "result", | |
| "session": self.session_id, | |
| "out": stdout.getvalue(), | |
| "err": stderr.getvalue(), | |
| "metadata": { | |
| "stdout": text_metadata(stdout.getvalue()), | |
| "stderr": text_metadata(stderr.getvalue()), | |
| }, | |
| "state": self.state_summary(), | |
| "final": self.final_payload(), | |
| }) | |
| def run_bash(self, script): | |
| completed = subprocess.run( | |
| ["/bin/sh", "-lc", script], | |
| cwd="/tmp", | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| timeout=300, | |
| ) | |
| emit({ | |
| "kind": "result", | |
| "session": self.session_id, | |
| "out": completed.stdout, | |
| "err": completed.stderr, | |
| "metadata": { | |
| "stdout": text_metadata(completed.stdout), | |
| "stderr": text_metadata(completed.stderr), | |
| "returncode": completed.returncode, | |
| }, | |
| "state": self.state_summary(), | |
| "final": self.final_payload(), | |
| }) | |
| def run_exec_thread(session, language, code): | |
| def execute_code(): | |
| if language in {"bash", "sh"}: | |
| session.run_bash(code) | |
| return | |
| session.run(code) | |
| thread = threading.Thread(target=execute_code, daemon=True) | |
| thread.start() | |
| emit({"kind": "server_ready"}) | |
| while True: | |
| try: | |
| frame = read_frame() | |
| kind = frame.get("kind") | |
| session_id = frame.get("session") | |
| if kind == "exit": | |
| emit({"kind": "bye"}) | |
| raise SystemExit | |
| if kind == "new_session": | |
| sessions[session_id] = Session(session_id, frame.get("env") or {}, frame.get("context_summary") or "") | |
| emit({"kind": "session_ready", "session": session_id, "state": sessions[session_id].state_summary()}) | |
| continue | |
| if kind == "set_vars": | |
| sessions[session_id].set_vars(frame.get("values") or {}) | |
| continue | |
| if kind == "exec": | |
| run_exec_thread(sessions[session_id], frame.get("language") or "python", frame.get("code") or "") | |
| continue | |
| if kind == "host_call_reply": | |
| sessions[session_id].reply_queue.put(frame) | |
| continue | |
| if kind == "close_session": | |
| sessions.pop(session_id, None) | |
| emit({"kind": "closed", "session": session_id}) | |
| continue | |
| emit({"kind": "error", "session": session_id, "error": "unknown frame kind: " + repr(kind)}) | |
| except SystemExit: | |
| raise | |
| except BaseException as error: | |
| emit({"kind": "error", "session": locals().get("session_id"), "error": str(error)}) | |
| ''' | |
| RLM_SYSTEM_PROMPT = """You are the root LM inside a Recursive Language Model (RLM). | |
| You answer by writing Python or Bash that runs inside an isolated VM. | |
| Important environment: | |
| - `prompt` is the full user prompt as a plain Python `str`, not a dict. | |
| Inspect it with `len(prompt)`, `prompt[:1000]`, regex, split, and slicing. | |
| - `rlm_query(f"question plus relevant snippet")` recursively asks a child RLM to | |
| solve a focused subtask. It is already available as a global function. | |
| - `SHOW_VARS()` prints current REPL variables. | |
| - `bash` blocks run as shell scripts in `/tmp`. Heredocs work. Useful installed | |
| tools include `rg`, `jq`, `fd`, `file`, `tree`, `tar`, `unzip`, `gzip`, | |
| `sqlite3`, `git`, `diff`, `pdftotext`, GNU `find`, and GNU coreutils. | |
| - Set JSON-serializable `Final` when the answer is ready. | |
| Use Python for persistent state, parsing, aggregation, `rlm_query`, and `Final`. | |
| Use Bash for filesystem inspection, text search, archives, JSON, SQLite, Git, | |
| PDF text extraction, and shell-native commands. Bash does not persist variables | |
| into Python. Do not dump the whole prompt. Each response must be one fenced | |
| `python` or `bash` block.""" | |
| def ensure_cache_dirs() -> None: | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| APK_CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| def find_qemu() -> str: | |
| for name in ("qemu-system-x86_64", "qemu-system-x86-64"): | |
| path = shutil.which(name) | |
| if path: | |
| return path | |
| raise RuntimeError("qemu-system-x86_64 not found on PATH") | |
| def ensure_alpine_iso() -> Path: | |
| ensure_cache_dirs() | |
| cached_iso_paths = sorted(CACHE_DIR.glob("alpine-virt-*-x86_64.iso")) | |
| if cached_iso_paths: | |
| return cached_iso_paths[-1] | |
| index = urllib.request.urlopen(f"{ALPINE_RELEASES_URL}/").read().decode("utf-8", "replace") | |
| names = sorted(set(re.findall(r"alpine-virt-[0-9.]+-x86_64\.iso", index))) | |
| if not names: | |
| raise RuntimeError(f"no Alpine virt ISO found at {ALPINE_RELEASES_URL}/") | |
| target_path = CACHE_DIR / names[-1] | |
| print(f"Downloading {ALPINE_RELEASES_URL}/{names[-1]} -> {target_path}") | |
| request = urllib.request.Request(f"{ALPINE_RELEASES_URL}/{names[-1]}", headers={"User-Agent": "nano-rlm"}) | |
| with urllib.request.urlopen(request) as response, open(target_path, "wb") as output_file: | |
| shutil.copyfileobj(response, output_file) | |
| return target_path | |
| def extract_iso_member(iso_path: Path, member_path: str, target_path: Path) -> Path: | |
| if target_path.exists(): | |
| return target_path | |
| if not shutil.which("isoinfo"): | |
| raise RuntimeError("isoinfo is required to extract Alpine boot files") | |
| with open(target_path, "wb") as output_file: | |
| subprocess.run(["isoinfo", "-R", "-i", str(iso_path), "-x", member_path], check=True, stdout=output_file) | |
| return target_path | |
| def short_text_metadata(text: str, prefix_chars: int = OBS_PREFIX_CHARS) -> dict: | |
| line_count = text.count("\n") | |
| if text: | |
| line_count += 1 | |
| return { | |
| "chars": len(text), | |
| "lines": line_count, | |
| "prefix": text[:prefix_chars], | |
| "truncated": len(text) > prefix_chars, | |
| } | |
| def truncate(text: str, limit: int = 1200) -> str: | |
| if len(text) <= limit: | |
| return text | |
| half = limit // 2 | |
| return text[:half] + f"\n...[{len(text) - limit} chars truncated]...\n" + text[-half:] | |
| class IsolatedReplVm: | |
| """One Alpine VM running a tiny multi-session Python REPL server.""" | |
| def __init__(self, memory_mb: int = 1536): | |
| ensure_cache_dirs() | |
| self.marker = secrets.token_hex(8).encode() | |
| self.buffer = b"" | |
| self.echo_console = True | |
| self.qemu_binary = find_qemu() | |
| self.iso_path = ensure_alpine_iso() | |
| self.kernel_path = extract_iso_member(self.iso_path, "/boot/vmlinuz-virt", BOOT_KERNEL_PATH) | |
| self.initramfs_path = extract_iso_member(self.iso_path, "/boot/initramfs-virt", BOOT_INITRAMFS_PATH) | |
| self.memory_mb = memory_mb | |
| self.process: subprocess.Popen[bytes] | None = None | |
| self.start() | |
| def start(self) -> None: | |
| print(f"Booting Alpine VM from {self.iso_path}") | |
| self.process = subprocess.Popen( | |
| [ | |
| self.qemu_binary, | |
| "-accel", "tcg,thread=multi", | |
| "-m", str(self.memory_mb), | |
| "-smp", "1", | |
| "-nic", "user", | |
| "-display", "none", | |
| "-monitor", "none", | |
| "-serial", "stdio", | |
| "-cdrom", str(self.iso_path), | |
| "-kernel", str(self.kernel_path), | |
| "-initrd", str(self.initramfs_path), | |
| "-append", "console=ttyS0 modules=loop,squashfs,sd-mod,usb-storage quiet", | |
| "-virtfs", | |
| f"local,path={CACHE_DIR},mount_tag=cache,security_model=mapped-xattr", | |
| ], | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| bufsize=0, | |
| ) | |
| self._boot_guest() | |
| def close(self) -> None: | |
| if not self.process or self.process.poll() is not None: | |
| return | |
| try: | |
| self._send_frame({"kind": "exit"}) | |
| except Exception: | |
| pass | |
| try: | |
| self.process.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| self.process.kill() | |
| self.process.wait(timeout=5) | |
| def _pull(self, timeout: float = 1.0) -> None: | |
| assert self.process and self.process.stdout | |
| ready, _, _ = select.select([self.process.stdout], [], [], timeout) | |
| if not ready: | |
| return | |
| chunk = os.read(self.process.stdout.fileno(), 4096) | |
| if chunk: | |
| self.buffer += chunk | |
| if self.echo_console: | |
| sys.stderr.buffer.write(chunk) | |
| sys.stderr.buffer.flush() | |
| def _send(self, data: bytes | str) -> None: | |
| assert self.process and self.process.stdin | |
| if isinstance(data, str): | |
| data = data.encode("utf-8") | |
| self.process.stdin.write(data) | |
| self.process.stdin.flush() | |
| def _wait_for(self, target: bytes | str, timeout: int = 180) -> None: | |
| target_bytes = target | |
| if isinstance(target_bytes, str): | |
| target_bytes = target_bytes.encode("utf-8") | |
| deadline = time.time() + timeout | |
| while target_bytes not in self.buffer: | |
| if time.time() >= deadline: | |
| raise TimeoutError(f"{target_bytes!r} not seen, tail={self.buffer[-300:]!r}") | |
| if self.process and self.process.poll() is not None: | |
| raise RuntimeError("QEMU exited while booting") | |
| self._pull() | |
| target_index = self.buffer.index(target_bytes) | |
| self.buffer = self.buffer[target_index + len(target_bytes):] | |
| def _read_frame(self, timeout: int = 1800) -> dict: | |
| deadline = time.time() + timeout | |
| while self.marker not in self.buffer: | |
| if time.time() >= deadline: | |
| raise TimeoutError("timed out waiting for guest frame marker") | |
| if self.process and self.process.poll() is not None: | |
| raise RuntimeError("QEMU exited while reading a frame") | |
| self._pull(timeout=0.2) | |
| self.buffer = self.buffer[self.buffer.index(self.marker) + len(self.marker):] | |
| while b":" not in self.buffer: | |
| if time.time() >= deadline: | |
| raise TimeoutError("timed out waiting for guest frame length") | |
| self._pull(timeout=0.2) | |
| separator_index = self.buffer.index(b":") | |
| payload_length = int(self.buffer[:separator_index]) | |
| self.buffer = self.buffer[separator_index + 1:] | |
| while len(self.buffer) < payload_length: | |
| if time.time() >= deadline: | |
| raise TimeoutError("timed out waiting for guest frame body") | |
| self._pull(timeout=0.2) | |
| payload_bytes, self.buffer = self.buffer[:payload_length], self.buffer[payload_length:] | |
| return json.loads(payload_bytes.decode("utf-8")) | |
| def _send_frame(self, frame: dict) -> None: | |
| data = json.dumps(frame, ensure_ascii=True).encode("utf-8") | |
| self._send(str(len(data)).encode() + b":" + data + b"\n") | |
| def _boot_guest(self) -> None: | |
| self._wait_for("login:", timeout=240) | |
| self._send("root\n") | |
| self._wait_for("#", timeout=60) | |
| boot_script = """ | |
| stty -echo | |
| export TERM=dumb | |
| ip link set eth0 up | |
| udhcpc -n -q -i eth0 >/dev/null 2>&1 || true | |
| ip route replace default via 10.0.2.2 dev eth0 2>/dev/null || route add default gw 10.0.2.2 eth0 2>/dev/null || true | |
| printf 'nameserver 10.0.2.3\\nnameserver 1.1.1.1\\n' > /etc/resolv.conf | |
| branch=$(cut -d. -f1,2 /etc/alpine-release) | |
| printf '%s\\n%s\\n' "https://dl-cdn.alpinelinux.org/alpine/v$branch/main" "https://dl-cdn.alpinelinux.org/alpine/v$branch/community" > /etc/apk/repositories | |
| mkdir -p /cache/apk-cache /sandbox | |
| mount -t 9p -o trans=virtio,version=9p2000.L cache /cache | |
| echo READY_FOR_BOOTSTRAP | |
| """ | |
| self._send(boot_script) | |
| self._wait_for("READY_FOR_BOOTSTRAP", timeout=120) | |
| self._wait_for("#", timeout=20) | |
| print("Installing guest tools...") | |
| self._send( | |
| "apk --cache-dir /cache/apk-cache add " | |
| "python3 ripgrep jq fd file tree tar unzip gzip sqlite git " | |
| "diffutils poppler-utils coreutils findutils " | |
| "&& echo TOOLS_READY\n" | |
| ) | |
| self._wait_for("TOOLS_READY", timeout=600) | |
| self._wait_for("#", timeout=20) | |
| self._send("cat > /sandbox/guest.py <<'PY'\n") | |
| self._send(GUEST_REPL_SERVER_CODE) | |
| if not GUEST_REPL_SERVER_CODE.endswith("\n"): | |
| self._send("\n") | |
| self._send("PY\n") | |
| self._wait_for("#", timeout=30) | |
| self.echo_console = False | |
| command = "stty raw -echo\nexec python3 /sandbox/guest.py" | |
| self._send(command + "\n") | |
| time.sleep(0.2) | |
| self._send(self.marker + b"\n") | |
| ready = self._read_frame(timeout=120) | |
| if ready.get("kind") != "server_ready": | |
| raise RuntimeError(f"guest server did not start cleanly: {ready!r}") | |
| print("Guest REPL server ready.\n") | |
| def new_session(self, initial_globals: dict | None = None, context_summary: str = "") -> "ReplSession": | |
| session_id = secrets.token_hex(8) | |
| self._send_frame({ | |
| "kind": "new_session", | |
| "session": session_id, | |
| "env": initial_globals or {}, | |
| "context_summary": context_summary, | |
| }) | |
| frame = self._read_frame() | |
| if frame.get("kind") != "session_ready" or frame.get("session") != session_id: | |
| raise RuntimeError(f"unexpected new_session reply: {frame!r}") | |
| return ReplSession(self, session_id) | |
| class ReplSession: | |
| def __init__(self, virtual_machine: IsolatedReplVm, session_id: str): | |
| self.virtual_machine = virtual_machine | |
| self.session_id = session_id | |
| def set_vars(self, values: dict) -> dict: | |
| self.virtual_machine._send_frame({"kind": "set_vars", "session": self.session_id, "values": values}) | |
| frame = self.virtual_machine._read_frame() | |
| if frame.get("kind") != "set_vars_done" or frame.get("session") != self.session_id: | |
| raise RuntimeError(f"unexpected set_vars reply: {frame!r}") | |
| return frame | |
| def run(self, code: str, language: str = "python", host_call_handler=None) -> dict: | |
| self.virtual_machine._send_frame({ | |
| "kind": "exec", | |
| "session": self.session_id, | |
| "language": language, | |
| "code": code, | |
| }) | |
| while True: | |
| frame = self.virtual_machine._read_frame() | |
| kind = frame.get("kind") | |
| if frame.get("session") not in {None, self.session_id}: | |
| raise RuntimeError(f"unexpected frame for another session: {frame!r}") | |
| if kind == "result": | |
| return frame | |
| if kind == "host_call": | |
| reply = {"kind": "host_call_reply", "session": self.session_id, "error": "no host_call handler"} | |
| if host_call_handler is not None: | |
| try: | |
| value = host_call_handler(frame.get("method"), frame.get("payload") or {}) | |
| reply = {"kind": "host_call_reply", "session": self.session_id, "value": value} | |
| except BaseException as error: | |
| reply = {"kind": "host_call_reply", "session": self.session_id, "error": str(error)} | |
| self.virtual_machine._send_frame(reply) | |
| continue | |
| if kind == "error": | |
| raise RuntimeError(frame.get("error")) | |
| raise RuntimeError(f"unexpected frame while executing: {frame!r}") | |
| def close(self) -> None: | |
| self.virtual_machine._send_frame({"kind": "close_session", "session": self.session_id}) | |
| frame = self.virtual_machine._read_frame() | |
| if frame.get("kind") != "closed": | |
| raise RuntimeError(f"unexpected close reply: {frame!r}") | |
| def llm_chat(messages: list[dict], model: str = DEFAULT_MODEL) -> str: | |
| body = json.dumps({"model": model, "messages": messages, "stream": False}).encode("utf-8") | |
| request = urllib.request.Request( | |
| f"{LLM_SERVER_URL}/api/chat", | |
| data=body, | |
| headers={"Content-Type": "application/json"}, | |
| ) | |
| response = urllib.request.urlopen(request, timeout=900).read() | |
| return json.loads(response)["message"]["content"] | |
| def nano_rlm( | |
| task: str, | |
| virtual_machine: IsolatedReplVm, | |
| session: ReplSession | None = None, | |
| *, | |
| model: str = DEFAULT_MODEL, | |
| max_steps: int = 18, | |
| depth: int = 0, | |
| max_depth: int = 3, | |
| ) -> str: | |
| owns_session = session is None | |
| session = session or virtual_machine.new_session(context_summary="isolated Python REPL") | |
| session.set_vars({"prompt": task}) | |
| def handle_host_call(method: str, payload: dict) -> str: | |
| if method != "rlm_query": | |
| raise RuntimeError(f"unknown host call: {method!r}") | |
| child_prompt = payload["prompt"] | |
| child_model = payload.get("model") or model | |
| if depth + 1 >= max_depth: | |
| print(f" [depth {depth}] leaf llm call: {child_prompt[:100]!r}") | |
| return llm_chat([{"role": "user", "content": child_prompt}], model=child_model) | |
| print(f" [depth {depth}] child RLM: {child_prompt[:100]!r}") | |
| child_session = virtual_machine.new_session(context_summary=f"child RLM depth {depth + 1}") | |
| try: | |
| return nano_rlm( | |
| child_prompt, | |
| virtual_machine, | |
| child_session, | |
| model=child_model, | |
| max_steps=max(6, max_steps // 2), | |
| depth=depth + 1, | |
| max_depth=max_depth, | |
| ) | |
| finally: | |
| child_session.close() | |
| prompt_meta = short_text_metadata(task, PROMPT_PREFIX_CHARS) | |
| history = [ | |
| {"role": "system", "content": RLM_SYSTEM_PROMPT}, | |
| {"role": "user", "content": json.dumps({ | |
| "state_metadata": {"prompt_type": "str", "prompt": prompt_meta}, | |
| "instructions": ( | |
| "The full prompt is in the isolated REPL variable `prompt`, as a Python str. " | |
| "Write Python to inspect it, recurse with rlm_query when useful, " | |
| "build intermediate variables, and set `Final`." | |
| ), | |
| }, indent=2)}, | |
| ] | |
| try: | |
| for step in range(1, max_steps + 1): | |
| print(f"\n--- RLM depth={depth} step={step}/{max_steps} ---") | |
| model_reply = llm_chat(history, model=model) | |
| print("assistant:", truncate(model_reply)) | |
| code_block_match = CODE_BLOCK_PATTERN.search(model_reply) | |
| if not code_block_match: | |
| history.append({ | |
| "role": "user", | |
| "content": "Reply with exactly one fenced python or bash block that advances the REPL state.", | |
| }) | |
| continue | |
| block_language = code_block_match.group("language") or "python" | |
| if block_language == "repl": | |
| block_language = "python" | |
| generated_code = code_block_match.group("body").strip() | |
| history.append({"role": "assistant", "content": f"```{block_language}\n{generated_code}\n```"}) | |
| result = session.run(generated_code, language=block_language, host_call_handler=handle_host_call) | |
| final = result.get("final") or {} | |
| if final.get("is_set"): | |
| if not final.get("json"): | |
| raise RuntimeError(final.get("error", "Final is not JSON-serializable")) | |
| value = final["value"] | |
| print("Final set:", final.get("metadata")) | |
| if isinstance(value, str): | |
| return value | |
| return json.dumps(value, indent=2) | |
| stdout = result.get("out", "") | |
| stderr = result.get("err", "") | |
| if stdout or stderr: | |
| print("observation:", truncate(stdout + stderr)) | |
| history.append({ | |
| "role": "user", | |
| "content": "REPL observation metadata:\n" + json.dumps(result.get("metadata"), indent=2), | |
| }) | |
| return "(max_steps reached before Final was set)" | |
| finally: | |
| if owns_session: | |
| session.close() | |
| def pdf_to_text(path: Path) -> str: | |
| if not path.exists(): | |
| raise FileNotFoundError(path) | |
| if not shutil.which("pdftotext"): | |
| raise RuntimeError("pdftotext is required for this demo") | |
| result = subprocess.run( | |
| ["pdftotext", str(path), "-"], | |
| check=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| ) | |
| return result.stdout | |
| def build_paper_summary_task(text: str) -> str: | |
| return f"""Analyze the paper text below and produce a clear structured brief. | |
| Focus on: | |
| - the central claim and motivation | |
| - the RLM algorithm and what makes it different from ordinary agents | |
| - the experiments and strongest evidence | |
| - limitations, costs, and failure modes | |
| - what a minimal implementation must preserve | |
| Use recursive calls over meaningful chunks when helpful. Do not quote long | |
| passages. The final answer should be concise but substantive. | |
| [BEGIN PAPER TEXT] | |
| {text} | |
| [END PAPER TEXT] | |
| """ | |
| def build_smoke_task(text: str) -> str: | |
| return f"""Smoke test: summarize this excerpt from rlm.pdf in three bullets. | |
| Inspect `prompt` briefly if useful, then set `Final` to the answer as a string. | |
| Do not use recursive calls for this smoke test. | |
| [BEGIN EXCERPT] | |
| {text} | |
| [END EXCERPT] | |
| """ | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Run a VM-isolated nano RLM over rlm.pdf.") | |
| parser.add_argument("--pdf", type=Path, default=Path("rlm.pdf")) | |
| parser.add_argument("--model", default=DEFAULT_MODEL) | |
| parser.add_argument("--max-steps", type=int, default=18) | |
| parser.add_argument("--max-depth", type=int, default=3) | |
| parser.add_argument("--memory-mb", type=int, default=1536) | |
| parser.add_argument("--limit-chars", type=int, default=0) | |
| parser.add_argument("--smoke", action="store_true") | |
| return parser.parse_args() | |
| def main() -> int: | |
| options = parse_args() | |
| paper_text = pdf_to_text(options.pdf) | |
| if options.smoke: | |
| paper_text = paper_text[:options.limit_chars or 4000] | |
| if not options.smoke and options.limit_chars: | |
| paper_text = paper_text[:options.limit_chars] | |
| print(f"Extracted {len(paper_text):,} characters from {options.pdf}") | |
| rlm_task = build_paper_summary_task(paper_text) | |
| if options.smoke: | |
| rlm_task = build_smoke_task(paper_text) | |
| virtual_machine = IsolatedReplVm(memory_mb=options.memory_mb) | |
| try: | |
| answer = nano_rlm( | |
| rlm_task, | |
| virtual_machine, | |
| model=options.model, | |
| max_steps=options.max_steps, | |
| max_depth=options.max_depth, | |
| ) | |
| finally: | |
| virtual_machine.close() | |
| print("\n=== FINAL ANSWER ===\n") | |
| print(answer) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment