Skip to content

Instantly share code, notes, and snippets.

@lunkwill42
Last active May 18, 2026 13:45
Show Gist options
  • Select an option

  • Save lunkwill42/5b02f66034a99e53c37956e783cc148a to your computer and use it in GitHub Desktop.

Select an option

Save lunkwill42/5b02f66034a99e53c37956e783cc148a to your computer and use it in GitHub Desktop.
Zino 2 case-fetch perf reproduction harness (May 2026)

Zino 2 case-fetch performance reproduction

This is a small harness used to confirm — and later verify the fix for — a performance problem in Zino 2's legacy API server: existing clients (curitz, anything built on zinolib) were taking many seconds to fetch all open cases from production Zino 2, whereas the same clients are near-instant against the legacy Tcl Zino on the same network.

This is a snapshot of the tools used during one investigation. It is not part of the Zino repo and has no maintainer. If you revive it later, expect it to need touch-ups against the current Zino source.

  • Investigation date: May 2026
  • Validated against: Zino 2 commit 3d19876 (master at the time)
  • Fix PR: (link once filed)

Background: what's going on?

The legacy Zino protocol is "chatty": to fetch one case the client sends three commands sequentially — GETATTRS, GETHIST, GETLOG — each waiting for the server's reply before sending the next. For 67 open cases that's ~200 round trips on a single TCP connection.

The Zino 2 server was answering each command by calling transport.write() once per line of output. A typical GETATTRS response has 10–15 lines, so the server was emitting 10–15 tiny TCP segments per command. That's where the two TCP optimizations get in each other's way:

  • Nagle's algorithm (on by default) tells the kernel to hold back small outgoing segments if there's already an un-ACK'd small segment in flight, hoping to coalesce them. So after the server sends line 1, the kernel waits for the client's ACK before sending line 2.
  • TCP delayed-ACK (on by default on the client side) tells the kernel "don't bother ACK'ing immediately if the application hasn't sent any data back yet — wait up to 40 ms in case there's a piggyback opportunity".

Put those two together on a chatty request/response protocol and every multi-segment server response stalls for ~40 ms waiting for the client's delayed ACK before the next segment can go out. Multiply by 200 round trips and you get the multi-second symptom.

The legacy Tcl server doesn't have this problem. Its accepted channels are configured with fconfigure -buffering line -blocking false, which parks puts data in a userspace buffer; on a non-blocking channel, the actual write(2) only happens when the Tcl event loop sees the channel writable, which coalesces a whole response's lines into one syscall and therefore one TCP segment. The client ACKs that single segment immediately (no delayed-ACK trigger), and Nagle never engages.

The fix to Zino 2 was to replicate that pattern in asyncio. This harness exists to (a) prove the diagnosis and (b) measure the fix.

Why we need a TCP proxy

The natural first instinct is to run the client against zino on 127.0.0.1 and time it. This does not reproduce the problem. On localhost, ACKs come back in microseconds, so Nagle has nothing to hold back and delayed-ACK is moot. A direct loopback run finishes in tens of milliseconds even with the pathological write pattern.

To trigger the bug locally without root (i.e., without tcpdump/tc qdisc) we route the client through a small Python TCP forwarder running on the same host. Just having a userspace hop in the path adds enough scheduling jitter between recv-on-one-fd and send-on-the-other that the kernel can no longer ACK back instantly. That tiny delay is enough for Nagle to start gating subsequent small writes and the delayed-ACK timer to start engaging — the exact pathology that real WAN clients see.

The proxy can also optionally inject artificial one-way delay, so we can simulate a 30 ms WAN link as well.

What's in this gist

File Role
make_fixture.py Generates an mkdtemp zino fixture: zino.toml, polldevs.cf, secrets, and a zino-state.json with N open events of mixed types.
client.py Blocking Zino-1 protocol client that mirrors zinolib's send-then-recv-to-. loop. Times each command. --via-zinolib switches to the real zinolib client.
latency_proxy.py Local TCP forwarder on 127.0.0.1:9001127.0.0.1:8001. Optionally --delay-ms N per chunk, optionally --log to record every recv chunk's size and timestamp.
run_bench.py Orchestrator. Builds the fixture, spawns uv run zino against it, optionally starts the proxy, runs the client, tears everything down.

The three scenarios

The headline table at the bottom refers to "Scenario B". Here's what all three mean:

Scenario A — direct loopback

client ──── 127.0.0.1:8001 ──── zino

Run run_bench.py --num-events 100 with no proxy flags. This is the "looks fine!" sanity check that misled the initial investigation. Expected to complete in well under 100 ms regardless of whether the fix is in place. Confirms the client and fixture work, but tells you nothing about the bug.

Scenario B — through the proxy, zero added delay

client ──── 127.0.0.1:9001 ──── proxy ──── 127.0.0.1:8001 ──── zino

Run run_bench.py --num-events 100 --proxy-delay-ms 0. Same machine, no artificial latency. The proxy just forwards bytes. This is where the bug shows up, because the userspace hop disrupts the timing enough for the Nagle/delayed-ACK interaction to activate.

This is the core demonstration:

  • Without the fix: ~7 seconds total, GETATTRS mean ~41 ms (every sample pinned to the Linux delayed-ACK timer)
  • With the fix: ~0.1 seconds total, GETATTRS mean ~0.3 ms

Scenario C — through the proxy, 15 ms one-way (~30 ms RTT)

client ──── 127.0.0.1:9001 ──── [+15 ms] ──── proxy ──── [+15 ms] ──── zino

Run run_bench.py --num-events 100 --proxy-delay-ms 15. Simulates what a real WAN client experiences. Useful for understanding what the fix does and does not address:

  • Without the fix: ~17 seconds (~30 ms RTT + ~40 ms delayed-ACK stall per command)
  • With the fix: ~9 seconds. The 40 ms stalls are gone, but each round trip still costs one RTT. The remaining time is pure N round trips × RTT and can only be reduced by changing the protocol (pipelining, batch commands) — a separate, larger problem.

Inspecting the wire pattern

latency_proxy.py --log <path> writes one line per recv() chunk — direction, byte count, time since the previous chunk. This shows directly whether the server is emitting per-line writes or coalesced ones.

uv run python run_bench.py --num-events 100 --proxy-delay-ms 0 \
    --proxy-log /tmp/zino-proxy.log

# Server-to-client chunk size distribution:
awk -F'\t' '$1=="s2c"{print $2}' /tmp/zino-proxy.log | sort -n | uniq -c

Pre-fix you'll see lots of 3-, 4-, 8-byte chunks (one TCP segment per protocol line). Post-fix every chunk is 80+ bytes — one whole response per chunk.

Prerequisites

  • A working Zino 2 dev environment in the same checkout (uv pip install --group dev), with ports 8001/8002 free
  • The fixture sets snmp.backend = "pysnmp" so the netsnmp MIB load doesn't inflate startup; if you only have netsnmp installed, edit ZINO_TOML in make_fixture.py
  • Optional: pip install zinolib if you want the --via-zinolib path

How to run

Place the scripts in any directory inside the Zino repo (so uv run resolves the zino package), then:

# Scenario A — sanity check.
uv run python run_bench.py --num-events 100

# Scenario B — the core demonstration.
uv run python run_bench.py --num-events 100 --proxy-delay-ms 0

# Scenario C — simulated WAN.
uv run python run_bench.py --num-events 100 --proxy-delay-ms 15

To reproduce the pre/post comparison, run scenario B, git stash the fix, re-run, then git stash pop.

Headline numbers (Linux, Zino 2 @ 3d19876, scenario B)

Metric Pre-fix Post-fix
Total fetch (100 cases) 7.07 s 0.091 s
GETATTRS mean 41.5 ms 0.34 ms
GETATTRS distribution every sample on the 40 ms delayed-ACK timer smooth, sub-ms
Server→client TCP chunks ~1220 (mostly tiny) 304 (one full response each)

Caveats

  • The 40 ms peak is TCP_DELACK_MIN, a Linux kernel constant. On macOS/BSD the absolute number shifts but the shape of the result (pre-fix per-line writes stall, post-fix coalesced writes don't) reproduces anywhere.
  • The proxy is load-bearing, not optional. See "Why we need a TCP proxy" above. If you skip it, the bug doesn't show up and you might wrongly conclude there's nothing wrong.
  • It's not a regression test. No fixture pinning, no CI integration. If a future Zino refactor renames _respond_raw or changes the auth handshake, the harness will silently break.
  • Port conflict. The orchestrator binds 8001/8002. Stop any local zino before running.
"""Synchronous Zino 1 protocol client used to time case-fetch round trips.
Mirrors the I/O pattern of zinolib.ritz.ritz: one blocking TCP connection,
strictly sequential request/response, no pipelining. Measurements are taken
with ``time.perf_counter_ns()`` around each request.
"""
from __future__ import annotations
import argparse
import socket
import statistics
import sys
import time
from hashlib import sha1
from typing import Optional
DELIMITER = b"\r\n"
class ZinoBenchClient:
"""Minimal blocking Zino 1 client just for benchmarking."""
def __init__(self, host: str, port: int, timeout: float = 10.0):
self.host = host
self.port = port
self.timeout = timeout
self.sock: Optional[socket.socket] = None
self.buf = b""
def connect(self) -> str:
"""Connects to the server and returns the auth challenge from the greeting."""
self.sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
greeting = self._read_line()
# Format: "200 <challenge> Hello, there"
parts = greeting.split(" ", 2)
if len(parts) < 3 or parts[0] != "200":
raise RuntimeError(f"unexpected greeting: {greeting!r}")
return parts[1]
def authenticate(self, user: str, secret: str, challenge: str) -> None:
response = sha1(f"{challenge} {secret}".encode()).hexdigest()
self.sock.sendall(f"USER {user} {response}\r\n".encode())
line = self._read_line()
if not line.startswith("200"):
raise RuntimeError(f"auth failed: {line!r}")
def request_single(self, command: str) -> str:
"""Send a command expected to yield a single-line response."""
self.sock.sendall(f"{command}\r\n".encode())
return self._read_line()
def request_multi(self, command: str) -> tuple[str, list[str]]:
"""Send a command expected to yield a multi-line response terminated by '.'."""
self.sock.sendall(f"{command}\r\n".encode())
header = self._read_line()
if header.startswith("5"):
return header, []
lines: list[str] = []
while True:
line = self._read_line()
if line == ".":
return header, lines
lines.append(line)
def close(self) -> None:
try:
if self.sock:
self.sock.sendall(b"QUIT\r\n")
# Best-effort drain of the bye line
try:
self._read_line()
except (socket.timeout, OSError, RuntimeError):
pass
finally:
if self.sock:
self.sock.close()
self.sock = None
def _read_line(self) -> str:
while True:
idx = self.buf.find(DELIMITER)
if idx >= 0:
line = self.buf[:idx]
self.buf = self.buf[idx + len(DELIMITER) :]
return line.decode("utf-8", errors="replace")
chunk = self.sock.recv(4096)
if not chunk:
raise RuntimeError("server closed connection mid-line")
self.buf += chunk
def _percentile(values: list[float], pct: float) -> float:
if not values:
return 0.0
s = sorted(values)
k = (len(s) - 1) * pct / 100.0
lo = int(k)
hi = min(lo + 1, len(s) - 1)
frac = k - lo
return s[lo] * (1 - frac) + s[hi] * frac
def _summarize(name: str, samples_ms: list[float]) -> str:
if not samples_ms:
return f"{name}: (no samples)"
return (
f"{name}: n={len(samples_ms):>4} "
f"total={sum(samples_ms) / 1000:7.3f} s "
f"mean={statistics.fmean(samples_ms):7.2f} ms "
f"p50={_percentile(samples_ms, 50):7.2f} ms "
f"p95={_percentile(samples_ms, 95):7.2f} ms "
f"max={max(samples_ms):7.2f} ms"
)
def _nagle_hint(per_cmd_ms: list[float]) -> str:
if not per_cmd_ms:
return ""
mean = statistics.fmean(per_cmd_ms)
near_40 = sum(1 for v in per_cmd_ms if 30 <= v <= 60) / len(per_cmd_ms)
if mean >= 30 and near_40 >= 0.5:
return (
f"\nNagle/delayed-ACK suspicion: mean={mean:.1f} ms and {near_40 * 100:.0f}% of "
"samples land in the 30–60 ms band — consistent with TCP delayed-ACK stalls."
)
return f"\n(no strong Nagle signature: mean={mean:.1f} ms)"
def run_fetch_cycle(client: ZinoBenchClient, label: str) -> dict:
timings = {"caseids": [], "getattrs": [], "gethist": [], "getlog": []}
t0 = time.perf_counter_ns()
header, lines = client.request_multi("CASEIDS")
timings["caseids"].append((time.perf_counter_ns() - t0) / 1e6)
ids = [int(s) for s in lines if s.isdigit()]
n_ids = len(ids)
fetch_start = time.perf_counter_ns()
for case_id in ids:
t0 = time.perf_counter_ns()
client.request_multi(f"GETATTRS {case_id}")
timings["getattrs"].append((time.perf_counter_ns() - t0) / 1e6)
t0 = time.perf_counter_ns()
client.request_multi(f"GETHIST {case_id}")
timings["gethist"].append((time.perf_counter_ns() - t0) / 1e6)
t0 = time.perf_counter_ns()
client.request_multi(f"GETLOG {case_id}")
timings["getlog"].append((time.perf_counter_ns() - t0) / 1e6)
fetch_total_ms = (time.perf_counter_ns() - fetch_start) / 1e6
print(f"=== {label}: {n_ids} cases fetched in {fetch_total_ms / 1000:.3f} s ===")
for k in ("caseids", "getattrs", "gethist", "getlog"):
print(" " + _summarize(k, timings[k]))
all_per_cmd = timings["getattrs"] + timings["gethist"] + timings["getlog"]
print(_nagle_hint(all_per_cmd))
return {"fetch_total_ms": fetch_total_ms, "n_ids": n_ids, "timings": timings}
def run_with_zinolib(host: str, port: int, user: str, secret: str, label: str) -> dict:
"""Apples-to-apples run through the real zinolib client."""
from zinolib.ritz import ritz
timings = {"caseids": [], "getattrs": [], "gethist": [], "getlog": []}
with ritz(host, port=port, username=user, password=secret) as session:
t0 = time.perf_counter_ns()
ids = session.get_caseids()
timings["caseids"].append((time.perf_counter_ns() - t0) / 1e6)
fetch_start = time.perf_counter_ns()
for case_id in ids:
t0 = time.perf_counter_ns()
session.get_raw_attributes(case_id)
timings["getattrs"].append((time.perf_counter_ns() - t0) / 1e6)
t0 = time.perf_counter_ns()
session.get_raw_history(case_id)
timings["gethist"].append((time.perf_counter_ns() - t0) / 1e6)
t0 = time.perf_counter_ns()
session.get_raw_log(case_id)
timings["getlog"].append((time.perf_counter_ns() - t0) / 1e6)
fetch_total_ms = (time.perf_counter_ns() - fetch_start) / 1e6
print(f"=== {label}: {len(ids)} cases fetched in {fetch_total_ms / 1000:.3f} s ===")
for k in ("caseids", "getattrs", "gethist", "getlog"):
print(" " + _summarize(k, timings[k]))
all_per_cmd = timings["getattrs"] + timings["gethist"] + timings["getlog"]
print(_nagle_hint(all_per_cmd))
return {"fetch_total_ms": fetch_total_ms, "n_ids": len(ids), "timings": timings}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8001)
parser.add_argument("--user", default="bench")
parser.add_argument("--secret", default="benchsecret")
parser.add_argument("--repeat", type=int, default=1)
parser.add_argument("--via-zinolib", action="store_true")
parser.add_argument(
"--expected-num-events",
type=int,
default=None,
help="If set, fail fast unless CASEIDS returns this many ids",
)
args = parser.parse_args()
for i in range(args.repeat):
label = f"run {i + 1}/{args.repeat}"
if args.via_zinolib:
result = run_with_zinolib(args.host, args.port, args.user, args.secret, label)
else:
client = ZinoBenchClient(args.host, args.port)
challenge = client.connect()
client.authenticate(args.user, args.secret, challenge)
try:
result = run_fetch_cycle(client, label)
finally:
client.close()
if args.expected_num_events is not None:
diff = result["n_ids"] - args.expected_num_events
# Zino unavoidably polls the first scheduled device at startup,
# which creates one extra reachability event before the client
# connects. We tolerate small overshoots rather than failing.
if diff < 0 or diff > 5:
print(
f"!! expected ~{args.expected_num_events} cases, got {result['n_ids']}",
file=sys.stderr,
)
return 2
return 0
if __name__ == "__main__":
sys.exit(main())
"""TCP proxy that injects per-chunk latency and logs the wire pattern.
Useful for two things:
1. **Logging**: --log writes one line per recv() — direction, byte count, and
wall-clock delta since the previous packet. Run with --delay-ms 0 and a
low-volume client and you can see whether the server emits each response
as one chunk or several. Loopback preserves the per-send() boundaries
because Nagle has nothing to wait for (ACKs come back in microseconds),
so server-side per-line writes show up as separate recv()s here.
2. **Latency**: --delay-ms N sleeps N ms before forwarding every chunk in
each direction. This is a crude WAN simulator — it doesn't reproduce
real TCP segmentation/Nagle interaction, but it does add round-trip
time, which is the dominant cost when ~300 sequential request/response
pairs are issued.
"""
from __future__ import annotations
import argparse
import os
import select
import socket
import sys
import time
from typing import Optional
def _handle_connection(
client: socket.socket,
backend_host: str,
backend_port: int,
delay_s: float,
log_fh,
) -> None:
backend = socket.create_connection((backend_host, backend_port))
sockets = {client: ("c2s", backend), backend: ("s2c", client)}
last_ts = time.perf_counter_ns()
try:
while True:
readable, _, _ = select.select(list(sockets), [], [], 30.0)
if not readable:
return
for src in readable:
direction, dst = sockets[src]
try:
chunk = src.recv(65536)
except ConnectionResetError:
return
if not chunk:
return
now = time.perf_counter_ns()
if log_fh:
dt_ms = (now - last_ts) / 1e6
log_fh.write(f"{direction}\t{len(chunk):5d}\t+{dt_ms:8.3f} ms\n")
log_fh.flush()
last_ts = now
if delay_s > 0:
time.sleep(delay_s)
try:
dst.sendall(chunk)
except (BrokenPipeError, ConnectionResetError):
return
finally:
client.close()
backend.close()
def _serve(
listen_port: int,
backend_host: str,
backend_port: int,
delay_s: float,
log_path: Optional[str],
max_connections: int,
) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("127.0.0.1", listen_port))
server.listen(5)
# Signal readiness to the orchestrator BEFORE blocking in accept(); a
# TCP-connect readiness check would otherwise consume the accept slot.
print(f"proxy listening on 127.0.0.1:{listen_port}", flush=True)
log_fh = open(log_path, "w") if log_path else None
served = 0
try:
while served < max_connections:
client, _ = server.accept()
try:
_handle_connection(client, backend_host, backend_port, delay_s, log_fh)
except Exception as exc: # noqa: BLE001
print(f"proxy: connection error: {exc}", file=sys.stderr)
served += 1
finally:
server.close()
if log_fh:
log_fh.close()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--listen-port", type=int, required=True)
parser.add_argument("--backend-host", default="127.0.0.1")
parser.add_argument("--backend-port", type=int, required=True)
parser.add_argument(
"--delay-ms",
type=float,
default=0.0,
help="Per-chunk one-way delay in milliseconds (RTT ≈ 2 × this)",
)
parser.add_argument(
"--log",
type=str,
default=None,
help="Optional path to write a per-chunk log",
)
parser.add_argument(
"--accept-loops",
type=int,
default=1,
help="How many client connections to handle before exiting",
)
args = parser.parse_args()
_serve(
args.listen_port,
args.backend_host,
args.backend_port,
args.delay_ms / 1000.0,
args.log,
args.accept_loops,
)
return 0
if __name__ == "__main__":
sys.exit(main())
"""Generate a self-contained zino fixture directory for benchmarking.
Writes zino.toml, polldevs.cf, secrets, and a zino-state.json that contains a
configurable number of open events of mixed types (reachability / portstate /
bgp / bfd / alarm). The resulting directory is ready to be passed to
``uv run zino --config-file <dir>/zino.toml --trap-port 0``.
"""
from __future__ import annotations
import argparse
import datetime
import json
import random
from pathlib import Path
from zino.statemodels import (
AlarmEvent,
BFDEvent,
BFDSessState,
BGPAdminStatus,
BGPEvent,
BGPOperState,
DeviceState,
EventState,
InterfaceState,
LogEntry,
PortStateEvent,
ReachabilityEvent,
ReachabilityState,
)
from zino.state import ZinoState
DEFAULT_USER = "bench"
DEFAULT_SECRET = "benchsecret"
def build_state(num_events: int, seed: int = 42) -> ZinoState:
rng = random.Random(seed)
state = ZinoState()
base_time = datetime.datetime(2026, 5, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)
# Give every event a unique router so (router, subindex, type) tuples never
# collide and ``clean_state`` doesn't auto-close duplicates on startup.
routers = [f"router{i:03d}.example.org" for i in range(num_events)]
for router in routers:
state.devices.devices[router] = DeviceState(name=router, enterprise_id=2636)
for i in range(num_events):
router = routers[i]
opened = base_time + datetime.timedelta(minutes=i)
updated = opened + datetime.timedelta(seconds=rng.randint(1, 300))
kind = i % 5
if kind == 0:
event = ReachabilityEvent(
router=router,
state=EventState.OPEN,
opened=opened,
updated=updated,
lasttrans=updated,
polladdr=f"10.0.0.{(i % 250) + 1}",
reachability=ReachabilityState.NORESPONSE,
)
elif kind == 1:
event = PortStateEvent(
router=router,
state=EventState.OPEN,
opened=opened,
updated=updated,
lasttrans=updated,
polladdr=f"10.0.0.{(i % 250) + 1}",
port=f"ge-0/0/{i % 48}",
ifindex=500 + i,
portstate=InterfaceState.DOWN,
descr=f"Link to neighbor-{i}",
flaps=rng.randint(0, 10),
ac_down=datetime.timedelta(seconds=rng.randint(10, 3600)),
)
elif kind == 2:
event = BGPEvent(
router=router,
state=EventState.OPEN,
opened=opened,
updated=updated,
lasttrans=updated,
polladdr=f"10.0.0.{(i % 250) + 1}",
remote_address=f"192.0.2.{(i % 250) + 1}",
remote_as=64500 + (i % 100),
peer_uptime=rng.randint(0, 86400),
operational_state=BGPOperState.ACTIVE,
admin_status=BGPAdminStatus.RUNNING,
)
elif kind == 3:
event = BFDEvent(
router=router,
state=EventState.OPEN,
opened=opened,
updated=updated,
lasttrans=updated,
polladdr=f"10.0.0.{(i % 250) + 1}",
ifindex=600 + i,
bfdstate=BFDSessState.DOWN,
bfdix=i,
bfddiscr=1000 + i,
bfdaddr=f"198.51.100.{(i % 250) + 1}",
neigh_rdns=f"neighbor-{i}.example.org",
)
else:
event = AlarmEvent(
router=router,
state=EventState.OPEN,
opened=opened,
updated=updated,
lasttrans=updated,
polladdr=f"10.0.0.{(i % 250) + 1}",
alarm_type="yellow" if i % 2 else "red",
alarm_count=rng.randint(1, 4),
)
# Fabricate a small but non-trivial log/history so GETHIST/GETLOG do
# meaningful work too.
for j in range(rng.randint(3, 8)):
event.log.append(
LogEntry(
timestamp=opened + datetime.timedelta(seconds=j * 10),
message=f"log line {j} for event {i + 1}",
)
)
event.history.append(
LogEntry(
timestamp=opened,
message="state change embryonic -> open (monitor)",
)
)
for j in range(rng.randint(1, 4)):
event.history.append(
LogEntry(
timestamp=opened + datetime.timedelta(minutes=j),
message=f"history entry {j} for event {i + 1}",
)
)
event.id = i + 1
state.events.events[event.id] = event
state.events.last_event_id = num_events
state.events._rebuild_indexes()
return state
ZINO_TOML = """\
[archiving]
old_events_dir = "old-events"
[authentication]
file = "secrets"
[persistence]
file = "zino-state.json"
period = 60
[polling]
file = "polldevs.cf"
period = 60
[snmp]
backend = "pysnmp"
[snmp.trap]
require_community = []
source = "direct"
[snmp.agent]
enabled = false
[event]
make_events_for_new_interfaces = false
[logging]
version = 1
disable_existing_loggers = false
[logging.loggers.root]
level = "WARNING"
handlers = ["console"]
[logging.loggers.apscheduler]
level = "ERROR"
[logging.formatters.standard]
format = "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
[logging.handlers.console]
class = "logging.StreamHandler"
formatter = "standard"
stream = "ext://sys.stderr"
"""
POLLDEVS_CF_HEADER = """\
default interval: 60
default community: public
default domain: example.org
default snmpversion: v2c
"""
def _polldevs_for_routers(router_names: list[str]) -> str:
"""Render polldevs.cf entries for every router in the fixture.
All routers listed in zino-state.json must also appear in polldevs.cf,
otherwise zino's ``load_polldevs`` will treat them as orphans and close
every associated event on startup.
"""
blocks = [POLLDEVS_CF_HEADER]
for index, name in enumerate(router_names):
blocks.append(f"\nname: {name}\naddress: 127.0.0.{index + 1}\n")
return "".join(blocks)
def write_fixture(target: Path, num_events: int) -> None:
target.mkdir(parents=True, exist_ok=True)
(target / "zino.toml").write_text(ZINO_TOML)
secrets_path = target / "secrets"
secrets_path.write_text(f"{DEFAULT_USER} {DEFAULT_SECRET}\n")
secrets_path.chmod(0o600)
state = build_state(num_events)
router_names = sorted(state.devices.devices.keys())
(target / "polldevs.cf").write_text(_polldevs_for_routers(router_names))
state_json = state.model_dump_json(exclude_none=True, indent=2)
(target / "zino-state.json").write_text(state_json)
# Read back to double-check parseability.
parsed = json.loads(state_json)
assert len(parsed["events"]["events"]) == num_events, (
f"expected {num_events} events, got {len(parsed['events']['events'])}"
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("target", type=Path, help="Target directory for the fixture")
parser.add_argument("-n", "--num-events", type=int, default=100)
args = parser.parse_args()
write_fixture(args.target, args.num_events)
print(f"Wrote fixture with {args.num_events} events to {args.target}")
if __name__ == "__main__":
main()
"""Spin up a Zino server against a temporary fixture and run the timing client.
Usage:
uv run python tmp/bench/run_bench.py --num-events 100
uv run python tmp/bench/run_bench.py --num-events 100 --via-zinolib
uv run python tmp/bench/run_bench.py --num-events 100 --tcpdump /tmp/zino-bench.pcap
"""
from __future__ import annotations
import argparse
import os
import shutil
import signal
import socket
import subprocess
import sys
import tempfile
import time
from pathlib import Path
HERE = Path(__file__).resolve().parent
sys.path.insert(0, str(HERE))
from make_fixture import DEFAULT_SECRET, DEFAULT_USER, write_fixture # noqa: E402
def port_is_free(host: str, port: int) -> bool:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
return True
except OSError:
return False
def wait_for_port(host: str, port: int, deadline_s: float) -> None:
end = time.monotonic() + deadline_s
while time.monotonic() < end:
try:
with socket.create_connection((host, port), timeout=0.5):
return
except OSError:
time.sleep(0.1)
raise TimeoutError(f"server did not start listening on {host}:{port} within {deadline_s}s")
def start_tcpdump(pcap_path: Path) -> subprocess.Popen:
cmd = ["sudo", "tcpdump", "-i", "lo", "-ttt", "-n", "port", "8001", "-w", str(pcap_path)]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
time.sleep(0.5)
if proc.poll() is not None:
stderr = proc.stderr.read().decode("utf-8", errors="replace") if proc.stderr else ""
raise RuntimeError(f"tcpdump failed to start: {stderr}")
return proc
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--num-events", type=int, default=100)
parser.add_argument("--repeat", type=int, default=1)
parser.add_argument("--via-zinolib", action="store_true")
parser.add_argument(
"--keep",
action="store_true",
help="Keep the temporary fixture directory after the run",
)
parser.add_argument(
"--tcpdump",
type=Path,
default=None,
help="If set, capture loopback traffic to this pcap file (needs sudo)",
)
parser.add_argument(
"--proxy-delay-ms",
type=float,
default=None,
help=(
"If set, route the client through a local TCP proxy on port 9001 "
"that adds this much one-way delay per chunk (RTT ≈ 2× this). "
"Useful for showing how the per-line write pattern degrades when "
"the link has non-trivial RTT."
),
)
parser.add_argument(
"--proxy-log",
type=Path,
default=None,
help="If --proxy-delay-ms is set, also write the proxy chunk log here",
)
parser.add_argument(
"--server-startup-timeout",
type=float,
default=20.0,
)
args = parser.parse_args()
if not port_is_free("127.0.0.1", 8001):
print("port 8001 is already in use; cannot run benchmark", file=sys.stderr)
return 1
if not port_is_free("127.0.0.1", 8002):
print("port 8002 is already in use; cannot run benchmark", file=sys.stderr)
return 1
tmp = Path(tempfile.mkdtemp(prefix="zino-bench-"))
print(f"fixture: {tmp}")
write_fixture(tmp, args.num_events)
tcpdump_proc = None
server_proc = None
proxy_proc = None
client_port = 8001
try:
if args.tcpdump:
tcpdump_proc = start_tcpdump(args.tcpdump)
print(f"tcpdump → {args.tcpdump}")
server_cmd = [
"uv",
"run",
"zino",
"--config-file",
str(tmp / "zino.toml"),
"--trap-port",
"0",
]
print(f"starting: {' '.join(server_cmd)} (cwd={tmp})")
server_log = (tmp / "zino-server.log").open("w")
server_proc = subprocess.Popen(
server_cmd,
cwd=tmp,
stdout=server_log,
stderr=subprocess.STDOUT,
)
wait_for_port("127.0.0.1", 8001, args.server_startup_timeout)
print("server is up; running client...")
if args.proxy_delay_ms is not None:
proxy_cmd = [
"uv",
"run",
"python",
str(HERE / "latency_proxy.py"),
"--listen-port",
"9001",
"--backend-port",
"8001",
"--delay-ms",
str(args.proxy_delay_ms),
"--accept-loops",
str(args.repeat),
]
if args.proxy_log is not None:
proxy_cmd += ["--log", str(args.proxy_log)]
print(f"starting proxy with one-way delay {args.proxy_delay_ms} ms")
proxy_proc = subprocess.Popen(
proxy_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# Wait for the proxy's readiness banner rather than TCP-probing,
# since a probe consumes one of its accept slots.
ready_line = proxy_proc.stdout.readline()
if "proxy listening" not in ready_line:
raise RuntimeError(f"proxy did not become ready: {ready_line!r}")
client_port = 9001
client_cmd = [
"uv",
"run",
"python",
str(HERE / "client.py"),
"--host",
"127.0.0.1",
"--port",
str(client_port),
"--user",
DEFAULT_USER,
"--secret",
DEFAULT_SECRET,
"--repeat",
str(args.repeat),
"--expected-num-events",
str(args.num_events),
]
if args.via_zinolib:
client_cmd.append("--via-zinolib")
rc = subprocess.call(client_cmd)
return rc
finally:
if proxy_proc and proxy_proc.poll() is None:
proxy_proc.terminate()
try:
proxy_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proxy_proc.kill()
proxy_proc.wait()
if server_proc and server_proc.poll() is None:
server_proc.send_signal(signal.SIGTERM)
try:
server_proc.wait(timeout=10)
except subprocess.TimeoutExpired:
server_proc.kill()
server_proc.wait()
if tcpdump_proc and tcpdump_proc.poll() is None:
# tcpdump was launched under sudo; use sudo to kill it.
subprocess.run(["sudo", "kill", str(tcpdump_proc.pid)], check=False)
try:
tcpdump_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
subprocess.run(["sudo", "kill", "-9", str(tcpdump_proc.pid)], check=False)
if args.keep:
print(f"keeping fixture at {tmp}")
else:
shutil.rmtree(tmp, ignore_errors=True)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment