Skip to content

Instantly share code, notes, and snippets.

@manzt
Created April 20, 2026 17:10
Show Gist options
  • Select an option

  • Save manzt/8ba26e3aae0ecba016c7632549f73ea6 to your computer and use it in GitHub Desktop.

Select an option

Save manzt/8ba26e3aae0ecba016c7632549f73ea6 to your computer and use it in GitHub Desktop.
marimo IPC stress test: cross-Python-version + schema-skew (pickle -> msgspec.msgpack)
=== python 3.10 ===
[PASS] warmup
[PASS] execute
[PASS] forward_command
[PASS] broadcast
[INFO] unknown_tag bytes_arrived=True, decode_errors=1
=== python 3.11 ===
[PASS] warmup
[PASS] execute
[PASS] forward_command
[PASS] broadcast
[INFO] unknown_tag bytes_arrived=True, decode_errors=1
=== python 3.12 ===
[PASS] warmup
[PASS] execute
[PASS] forward_command
[PASS] broadcast
[INFO] unknown_tag bytes_arrived=True, decode_errors=1
=== python 3.13 ===
[PASS] warmup
[PASS] execute
[PASS] forward_command
[PASS] broadcast
[INFO] unknown_tag bytes_arrived=True, decode_errors=1
=== python 3.14 ===
[PASS] warmup
[PASS] execute
[PASS] forward_command
[PASS] broadcast
[INFO] unknown_tag bytes_arrived=True, decode_errors=1
=== Summary ===
3.10: PASS (5/5)
note unknown_tag_hazard: unknown_tag bytes_arrived=True, decode_errors=1
3.11: PASS (5/5)
note unknown_tag_hazard: unknown_tag bytes_arrived=True, decode_errors=1
3.12: PASS (5/5)
note unknown_tag_hazard: unknown_tag bytes_arrived=True, decode_errors=1
3.13: PASS (5/5)
note unknown_tag_hazard: unknown_tag bytes_arrived=True, decode_errors=1
3.14: PASS (5/5)
note unknown_tag_hazard: unknown_tag bytes_arrived=True, decode_errors=1

marimo IPC stress test (pickle → msgspec.msgpack)

One-shot driver script that stress-tests marimo's ZeroMQ IPC across five Python versions and exercises schema-skew in both directions.

Context

Branch manzt/msgpack-ipc swaps pickle.dumps/loads for msgspec.msgpack.encode / msgspec.msgpack.Decoder[T] in marimo/_ipc/queue_proxy.py and marimo/_ipc/connection.py. Existing unit tests cover encode/decode in-process; this script validates the change end-to-end against a real kernel subprocess running on a different Python interpreter than the driver.

What it does

  1. uv build --wheel once from the current tree.
  2. For each Python version in {3.10, 3.11, 3.12, 3.13, 3.14}:
    uv run --isolated --no-project --python=X.Y --with=<wheel> \
           python -m marimo._ipc.launch_kernel
    
    — the kernel's marimo._ipc.launch_kernel entry point is a public API used by marimo-lsp, so it's the right handshake to lean on.
  3. Drives five event-driven checks per kernel (no fixed sleeps):
    • warmup — first round-tripped ExecuteCellsCommand proves ZMQ sockets + kernel runtime loop are live.
    • execute — real typed command round-trips control → stream.
    • forward_command — driver puts an ExecuteCellsCommand-shaped msgspec.Struct with an EXTRA field onto the typed control queue. The kernel's real decoder must ignore the extra field and still run the cell. (Host→kernel forward-compat.)
    • broadcast — one cell defines two Notification subclasses and broadcasts both up the stream:
      • CellNotificationV2(Notification, tag="cell-op") with an EXTRA field — host decoder must accept it cleanly as CellNotification.
      • StressFoo(Notification, tag="stress-foo-v1") — a brand-new tag not in the host's closed NotificationMessage union. (Kernel→host forward-compat, both happy path and hazard.)
    • unknown_tag_hazard [INFO] — records whether the brand-new-tag bytes reached the host (they do) and whether the host's msgspec.json.decode(..., type=NotificationMessage) raised DecodeError (it does). Reported as a note, not a pass/fail — it's a documented compat limit, not a regression.

Findings

  • All 5 scenarios PASS on all 5 Python versions. msgpack IPC round-trips cleanly regardless of interpreter.
  • Host→kernel commands accept extra fields (forward_command scenario). A newer server sending a command with added optional fields won't break an older kernel.
  • Kernel→host same-tag + extra field works (broadcast scenario part a). Adding an optional field to an existing op is safe.
  • Kernel→host brand-new tags are rejected by the host decoder (unknown_tag_hazard). The NotificationMessage union in marimo/_messaging/notification.py is closed — any op the host doesn't know the tag for raises msgspec.DecodeError. This is the real forward-compat hazard for mixed-version deployments (e.g. a newer kernel paired with an older marimo-lsp consumer).

Running it

Requires a marimo checkout on manzt/msgpack-ipc (or any branch with the msgspec IPC change):

# From repo root:
uv run --extra sandbox scripts/stress_test_ipc.py          # full matrix
uv run --extra sandbox scripts/stress_test_ipc.py --python 3.12
uv run --extra sandbox scripts/stress_test_ipc.py --wheel /path/to/marimo.whl

Exits non-zero if any hard assertion fails. Builds take ~15 s; each kernel run is ~8–10 s, so the full 5-version matrix is ~1 min.

Possible extensions (not yet covered)

Things that could still break but this script doesn't exercise:

  • Large payloads. msgpack has no 4 GiB str limit like pickle's, but real-world 10 MB code strings or dataframes-as-buffers haven't been round-tripped end-to-end.
  • Binary buffers. ModelCommand.buffers: list[bytes] — msgpack has native bin; pickle smuggled bytes inside object graphs. Worth asserting binary round-trips byte-for-byte.
  • Burst / ordering. 100+ rapid ExecuteCellsCommands back-to-back on the control channel; verify FIFO ordering of completed-run ops. A --burst N flag existed in an earlier draft; removed for simplicity.
  • Kernel-side missing required field. If the kernel removes a field the host still includes, the decoder should fail cleanly with a useful error rather than silently dropping the command.
  • Nested @dataclass evolution. HTTPRequest is a @dataclass nested inside ExecuteCellsCommand. Adding a field there exercises a different msgspec path than top-level Struct fields.
  • Unicode edge cases (emoji in codes, non-UTF8 bytes in buffers).
  • NaN / Inf floats in HTTPRequest.meta — msgpack allows them, JSON doesn't, so the kernel→host stream (JSON) could fail where the host→kernel control (msgpack) succeeds.
  • Concurrent senders. Code completion runs on a separate worker thread; verify PushQueue.put under contention on both channels.
  • Kernel crash mid-send. Partial message on the wire — does the host's receiver thread recover or hang?
  • msgspec version skew between driver and kernel. The driver uses the repo's msgspec; the kernel uses the wheel's. If they pin different msgspec majors, wire format might diverge.
"""Stress-test marimo IPC across Python versions and schema skew.
Builds a wheel from the current tree, then for each Python version in the
matrix spawns a kernel via
uv run --isolated --no-project --python=X.Y --with=<wheel>
-m marimo._ipc.launch_kernel
and runs three end-to-end checks against it:
1. execute - plain ExecuteCellsCommand round-trips (control + stream).
2. forward_command - driver sends an ExecuteCellsCommand-shaped msgpack
payload with an EXTRA field; the kernel's real decoder
must ignore it and still execute the cell.
3. broadcast - cell code broadcasts two Notification variants:
(a) a registered tag (CompletedRunNotification),
(b) a known tag with an EXTRA field (CellNotification
subclass on tag="cell-op").
Both must decode cleanly on the host.
A brand-new Notification tag (closed NotificationMessage union) is recorded
separately as a known forward-compat hazard, not a pass/fail.
Completion is event-driven: each exec uses a unique cell_id, and a scenario
is done when we've seen a cell-op for that cell_id followed by completed-run.
That isolates scenarios from each other without any fixed sleeps.
Usage (from repo root):
uv run --extra sandbox scripts/stress_test_ipc.py
uv run --extra sandbox scripts/stress_test_ipc.py --python 3.11
"""
from __future__ import annotations
import argparse
import pathlib
import queue
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass, field
from typing import Any, Callable
import msgspec
REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
from marimo._ast.app_config import _AppConfig # noqa: E402
from marimo._ast.cell import CellConfig # noqa: E402
from marimo._config.config import DEFAULT_CONFIG # noqa: E402
from marimo._config.settings import GLOBAL_SETTINGS # noqa: E402
from marimo._ipc.queue_manager import QueueManager # noqa: E402
from marimo._ipc.types import KernelArgs # noqa: E402
from marimo._messaging.serde import deserialize_kernel_message # noqa: E402
from marimo._runtime.commands import ( # noqa: E402
AppMetadata,
ExecuteCellsCommand,
)
from marimo._types.ids import CellId_t # noqa: E402
DEFAULT_PYTHON_VERSIONS = ["3.10", "3.11", "3.12", "3.13", "3.14"]
TIMEOUT = 10.0
UNKNOWN_TAG = "stress-foo-v1"
@dataclass
class Result:
python_version: str
scenario: str
passed: bool
detail: str = ""
@dataclass
class Session:
proc: subprocess.Popen[bytes]
qm: QueueManager
unknown_tag_bytes_seen: bool = field(default=False)
host_decode_errors: int = field(default=0)
def close(self) -> None:
self.proc.terminate()
try:
self.proc.wait(timeout=3.0)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc.wait(timeout=3.0)
self.qm.close_queues()
def build_wheel() -> pathlib.Path:
out_dir = pathlib.Path(tempfile.mkdtemp(prefix="marimo-stress-"))
print(f"[build] uv build --wheel --out-dir {out_dir}")
subprocess.run(
["uv", "build", "--wheel", "--out-dir", str(out_dir)],
check=True,
cwd=REPO_ROOT,
)
wheel = next(out_dir.glob("marimo-*.whl"))
print(f"[build] built {wheel.name}")
return wheel
def spawn(py: str, wheel: pathlib.Path, cell_ids: list[CellId_t]) -> Session | None:
qm, connection_info = QueueManager.create()
args = KernelArgs(
connection_info=connection_info,
profile_path=None,
configs={cid: CellConfig() for cid in cell_ids},
user_config=DEFAULT_CONFIG,
log_level=GLOBAL_SETTINGS.LOG_LEVEL,
app_metadata=AppMetadata(
query_params={}, cli_args={}, app_config=_AppConfig()
),
)
proc = subprocess.Popen(
[
"uv", "run", "--isolated", "--no-project",
f"--python={py}", f"--with={wheel}",
"python", "-m", "marimo._ipc.launch_kernel",
],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
assert proc.stdin and proc.stdout and proc.stderr
proc.stdin.write(args.encode_json())
proc.stdin.flush()
proc.stdin.close()
ready = proc.stdout.readline().decode(errors="replace").strip()
if ready != "KERNEL_READY":
stderr = proc.stderr.read().decode(errors="replace")
qm.close_queues()
if "No interpreter found" in stderr or "No interpreters found" in stderr:
return None
raise RuntimeError(
f"handshake failed (exit={proc.poll()}, py={py}): {stderr[-1500:]}"
)
return Session(proc=proc, qm=qm)
GRACE = 0.3 # seconds to keep draining after completed-run
def await_run(session: Session, cell_id: str) -> list[Any]:
"""Block until the run for `cell_id` finishes, plus a short grace period.
Two-phase: look for a cell-op matching `cell_id`, then the next
completed-run. Unique cell_ids per scenario mean stale completed-run
ops from prior scenarios can't satisfy phase 2.
The grace period catches late messages: in-cell `broadcast_notification`
calls race against the kernel's natural completed-run, so a broadcast's
bytes may arrive AFTER completed-run.
Side effect: populates session.unknown_tag_bytes_seen and
session.host_decode_errors, which the unknown-tag check reports on.
"""
deadline = time.time() + TIMEOUT
collected: list[Any] = []
saw_our_cell_op = False
grace_until: float | None = None
while True:
now = time.time()
if now > deadline:
break
if grace_until is not None and now > grace_until:
break
try:
raw = session.qm.stream_queue.get(timeout=0.05)
except queue.Empty:
continue
assert isinstance(raw, bytes)
if UNKNOWN_TAG.encode() in raw:
session.unknown_tag_bytes_seen = True
try:
note = deserialize_kernel_message(raw)
except msgspec.DecodeError:
session.host_decode_errors += 1
continue
collected.append(note)
name = getattr(note, "name", None)
if (
not saw_our_cell_op
and name == "cell-op"
and getattr(note, "cell_id", None) == cell_id
):
saw_our_cell_op = True
elif (
saw_our_cell_op and name == "completed-run" and grace_until is None
):
grace_until = time.time() + GRACE
if not saw_our_cell_op:
raise TimeoutError(
f"run for {cell_id!r} did not start within {TIMEOUT}s; "
f"names: {[getattr(n, 'name', '?') for n in collected]}"
)
return collected
# ---- scenarios ------------------------------------------------------------
def scenario_execute(session: Session) -> None:
"""Happy path: real typed ExecuteCellsCommand round-trips."""
session.qm.control_queue.put(
ExecuteCellsCommand(cell_ids=[CellId_t("exec-cell")], codes=["x = 42"])
)
batch = await_run(session, "exec-cell")
assert any(getattr(n, "name", None) == "cell-op" for n in batch)
def scenario_forward_command(session: Session) -> None:
"""Driver sends a command struct with an EXTRA field; kernel ignores it."""
# Module-level class (not a closure) so msgspec encoding has no issues.
class ExecuteCellsCommandV2(
msgspec.Struct, rename="camel", tag_field="type", tag="execute-cells"
):
cell_ids: list[str]
codes: list[str]
new_field_added_later: str = "future"
cell_id = "forward-cmd-cell"
session.qm.control_queue.put(
ExecuteCellsCommandV2(cell_ids=[cell_id], codes=["y = 7"]) # type: ignore[arg-type]
)
await_run(session, cell_id)
def scenario_broadcast(session: Session) -> dict[str, Any]:
"""One cell broadcasts three Notification variants up the stream:
(a) a registered tag + EXTRA field -> host must decode cleanly
(forward-compat on a known op).
(b) a brand-new tag -> host decoder rejects it
(forward-compat HAZARD -- the real finding).
Combining both into a single cell avoids marimo's MultipleDefinitionError
from re-importing `Notification` / `ClassVar` across cells.
The forward-compat case (a) is a hard assertion. The unknown-tag case (b)
is reported as info so we can see whether the bytes made it to the wire
and whether the host raised the expected DecodeError.
"""
cell_id = "broadcast-cell"
v2_target = "v2-broadcast-target"
before_bytes = session.unknown_tag_bytes_seen
before_errs = session.host_decode_errors
code = f"""\
from typing import ClassVar
from marimo._messaging.notification import Notification
from marimo._messaging.notification_utils import broadcast_notification
class CellNotificationV2(Notification, tag="cell-op"):
name: ClassVar[str] = "cell-op"
cell_id: str
new_field_added_later: str = "future"
class StressFoo(Notification, tag={UNKNOWN_TAG!r}):
name: ClassVar[str] = {UNKNOWN_TAG!r}
payload: str = "hi"
broadcast_notification(CellNotificationV2(cell_id={v2_target!r}))
broadcast_notification(StressFoo())
"""
session.qm.control_queue.put(
ExecuteCellsCommand(cell_ids=[CellId_t(cell_id)], codes=[code])
)
batch = await_run(session, cell_id)
saw_v2 = any(
getattr(n, "name", None) == "cell-op"
and getattr(n, "cell_id", None) == v2_target
for n in batch
)
assert saw_v2, (
f"V2 cell-op (extra field on known tag) missing from batch: "
f"names={[getattr(n, 'name', '?') for n in batch]}"
)
return {
"unknown_tag_bytes_arrived": (
session.unknown_tag_bytes_seen and not before_bytes
),
"unknown_tag_decode_errors": (
session.host_decode_errors - before_errs
),
}
# ---- driver ---------------------------------------------------------------
CELL_IDS = ["warmup", "exec-cell", "forward-cmd-cell", "broadcast-cell"]
def run_for_python(py: str, wheel: pathlib.Path) -> list[Result]:
print(f"\n=== python {py} ===")
cell_ids = [CellId_t(cid) for cid in CELL_IDS]
try:
session = spawn(py, wheel, cell_ids)
except Exception as e:
print(f" [FAIL] spawn: {e}")
return [Result(py, "spawn", False, str(e))]
if session is None:
print(f" [skip] {py} not available")
return [Result(py, "spawn", True, "interpreter unavailable")]
results: list[Result] = []
def run(name: str, fn: Callable[[Session], Any]) -> Any:
try:
out = fn(session)
except Exception as e:
print(f" [FAIL] {name}: {e}")
results.append(Result(py, name, False, str(e)))
return None
print(f" [PASS] {name}")
results.append(Result(py, name, True))
return out
try:
# Event-driven readiness probe: a round-tripped exec proves the
# runtime loop is live. No fixed sleep needed after KERNEL_READY.
session.qm.control_queue.put(
ExecuteCellsCommand(cell_ids=[CellId_t("warmup")], codes=["1"])
)
await_run(session, "warmup")
print(" [PASS] warmup")
results.append(Result(py, "warmup", True))
run("execute", scenario_execute)
run("forward_command", scenario_forward_command)
info = run("broadcast", scenario_broadcast)
if info is not None:
detail = (
f"unknown_tag bytes_arrived="
f"{info['unknown_tag_bytes_arrived']}, "
f"decode_errors={info['unknown_tag_decode_errors']}"
)
print(f" [INFO] {detail}")
results.append(Result(py, "unknown_tag_hazard", True, detail))
finally:
session.close()
return results
def print_report(results: list[Result]) -> bool:
print("\n=== Summary ===")
by_py: dict[str, list[Result]] = {}
for r in results:
by_py.setdefault(r.python_version, []).append(r)
ok = True
for py, rs in by_py.items():
status = "PASS" if all(r.passed for r in rs) else "FAIL"
print(f" {py}: {status} ({sum(r.passed for r in rs)}/{len(rs)})")
for r in rs:
if not r.passed:
ok = False
print(f" FAIL {r.scenario}: {r.detail}")
elif r.detail:
print(f" note {r.scenario}: {r.detail}")
return ok
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--python", action="append", default=None,
help="Python version (repeatable). Default: 3.10-3.14.")
parser.add_argument("--wheel", type=pathlib.Path, default=None,
help="Use an existing wheel instead of building.")
args = parser.parse_args()
wheel = args.wheel or build_wheel()
pys = args.python or DEFAULT_PYTHON_VERSIONS
all_results: list[Result] = []
for py in pys:
all_results.extend(run_for_python(py, wheel))
return 0 if print_report(all_results) else 1
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment