Skip to content

Instantly share code, notes, and snippets.

@ericboehs
Last active May 27, 2026 20:42
Show Gist options
  • Select an option

  • Save ericboehs/979e5050ab05da97e15b7ab2e1995df9 to your computer and use it in GitHub Desktop.

Select an option

Save ericboehs/979e5050ab05da97e15b7ab2e1995df9 to your computer and use it in GitHub Desktop.
kokorod: single-file persistent daemon + auto-launching client for Kokoro-82M TTS (mlx-audio). Cuts time-to-first-audio from ~5s to ~0.2s; self-shuts down after 30 min idle.
#!/usr/bin/env python3
# /// script
# requires-python = "==3.12.*"
# dependencies = [
# "mlx-audio>=0.3.0",
# "mlx",
# "misaki[en]<0.9",
# "num2words",
# "spacy",
# "espeakng-loader",
# "numpy",
# "sounddevice",
# ]
# ///
"""kokorod — Kokoro-82M TTS daemon + `kk` client in one standalone file.
Install: drop this file at `~/bin/kk` (chmod +x) and put `~/bin` on PATH.
The same file is both the client and the daemon; `--daemon` (or `kk daemon`)
runs it as the long-lived daemon, anything else runs as a client. The client
auto-launches the daemon on first use and waits for the socket to come up.
Heavy deps are declared in the PEP 723 block above; the daemon re-execs
under `uv run --script` when first launched so the env is auto-managed.
Requires: `brew install uv espeak` on Apple Silicon macOS.
Why a daemon: the upstream `kokoro` CLI pays ~5s per invocation for model
load + KokoroPipeline init + macOS audio device wake-up. Keeping the model
resident and one callback-driven sd.OutputStream open cuts time-to-first-
audio to ~0.2s.
Usage:
kk "Hello, world." # implicit `say`
kk say "Hello, world."
echo "text" | kk # implicit `say` from stdin
kk -v bm_fable say "British male."
kk pause
kk play # alias: kk resume
kk stop # stop current playback, drop buffered audio
kk status # idle | playing | paused
kk kill # stop the daemon (alias: stop-daemon)
kokorod # run daemon in foreground (e.g. in tmux)
kk daemon # same
Env:
KOKOROD_TIMEOUT_MINUTES idle minutes before daemon self-exits (default 30)
KK_LAUNCH_TIMEOUT seconds client waits for socket after launch (default 60)
"""
import os
import sys
SCRIPT_PATH = os.path.realpath(__file__)
SOCKET_PATH = "/tmp/kokoro.sock"
LOG_DIR = os.path.expanduser("~/.cache/kokorod")
LOG_FILE = os.path.join(LOG_DIR, "daemon.log")
SUBCOMMANDS = {"say", "pause", "play", "resume", "stop", "status", "daemon", "kill", "stop-daemon"}
def _is_daemon_mode() -> bool:
"""Daemon when invoked as `kokorod` / `kokorod.py` / with --daemon /
via `kk daemon`. Client otherwise."""
prog = os.path.basename(sys.argv[0])
if prog in ("kokorod", "kokorod.py"):
return True
if "--daemon" in sys.argv[1:]:
return True
if prog == "kk" and len(sys.argv) > 1 and sys.argv[1] == "daemon":
return True
return False
# ─────────────────────────────────────────────────────────────────────────
# CLIENT MODE — pure stdlib, ~50ms startup. No heavy imports.
# ─────────────────────────────────────────────────────────────────────────
def _client_main() -> None:
import argparse
argv = sys.argv[1:]
# Detect subcommand at the first non-flag token so `kk -v X say "hi"`
# works the same as `kk say -v X "hi"`. -v/--voice/-s/--speed each
# consume a value; skip those pairs while scanning. If no subcommand
# token is found, treat the whole call as `say` for backward compat
# (`kk "hi"`, `echo hi | kk`, `kk -v X "hi"`).
FLAGS_WITH_VALUE = {"-v", "--voice", "-s", "--speed"}
subcommand = "say"
rest: list[str] = list(argv)
i = 0
while i < len(argv):
tok = argv[i]
if tok in FLAGS_WITH_VALUE:
i += 2
continue
if tok.startswith("-"):
i += 1
continue
if tok in SUBCOMMANDS:
subcommand = tok
rest = argv[:i] + argv[i + 1 :]
break
# Normalize aliases.
if subcommand == "resume":
subcommand = "play"
if subcommand == "stop-daemon":
subcommand = "kill"
if subcommand == "kill":
_kill_daemon()
return
if subcommand == "say":
parser = argparse.ArgumentParser(prog="kk say")
parser.add_argument("text", nargs="*")
parser.add_argument("-v", "--voice", default="af_heart")
parser.add_argument("-s", "--speed", type=float, default=1.0)
args = parser.parse_args(rest)
text = " ".join(args.text) if args.text else sys.stdin.read()
text = text.strip()
if not text:
print("kk: empty text", file=sys.stderr)
sys.exit(1)
_ensure_daemon()
body = _send({"cmd": "say", "text": text, "voice": args.voice, "speed": args.speed})
elif subcommand in ("pause", "play", "stop", "status"):
if not os.path.exists(SOCKET_PATH):
if subcommand == "status":
print("idle (daemon not running)")
return
print("kk: daemon is not running", file=sys.stderr)
sys.exit(1)
body = _send({"cmd": subcommand})
else:
print(f"kk: unknown subcommand: {subcommand}", file=sys.stderr)
sys.exit(2)
if not body.get("ok"):
print(f"kk: {body.get('error', 'unknown error')}", file=sys.stderr)
sys.exit(1)
if subcommand == "status":
print(body.get("state", "unknown"))
elif body.get("msg"):
# Suppress for `say` to keep behavior quiet on success; print other
# commands' acknowledgement to stderr.
if subcommand != "say":
print(body["msg"], file=sys.stderr)
def _send(req: dict) -> dict:
import json
import socket
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(SOCKET_PATH)
s.sendall((json.dumps(req) + "\n").encode("utf-8"))
resp = s.makefile("r").readline().strip()
try:
return json.loads(resp)
except json.JSONDecodeError:
return {"ok": False, "error": f"invalid response: {resp!r}"}
def _kill_daemon() -> None:
import subprocess
if not os.path.exists(SOCKET_PATH):
print("kk: daemon is not running", file=sys.stderr)
return
# Ask the daemon to exit cleanly via RPC; the daemon will unlink the
# socket on its way out.
try:
body = _send({"cmd": "shutdown"})
if body.get("ok"):
print("kk: stopped daemon", file=sys.stderr)
return
except Exception:
pass
# Fall back to pkill if RPC fails (orphaned socket, hung daemon, etc.).
rc = subprocess.run(["pkill", "-f", "--", "kokorod.*--daemon"]).returncode
if rc == 0:
print("kk: stopped daemon", file=sys.stderr)
else:
print("kk: no kokorod process found", file=sys.stderr)
try:
os.unlink(SOCKET_PATH)
except FileNotFoundError:
pass
def _ensure_daemon() -> None:
import subprocess
import time
if os.path.exists(SOCKET_PATH):
return
if not os.access(SCRIPT_PATH, os.X_OK):
print(f"kk: {SCRIPT_PATH} is not executable", file=sys.stderr)
sys.exit(1)
os.makedirs(LOG_DIR, exist_ok=True)
log_fh = open(LOG_FILE, "ab")
print(f"kk: launching kokorod (log: {LOG_FILE})...", file=sys.stderr)
env = {**os.environ, "HF_HUB_OFFLINE": "1"}
subprocess.Popen(
[SCRIPT_PATH, "--daemon"],
stdin=subprocess.DEVNULL,
stdout=log_fh,
stderr=log_fh,
start_new_session=True,
env=env,
)
launch_timeout = int(os.environ.get("KK_LAUNCH_TIMEOUT", "60"))
for waited in range(launch_timeout):
if os.path.exists(SOCKET_PATH):
print(f"kk: daemon ready after {waited}s", file=sys.stderr)
return
time.sleep(1)
print(
f"kk: daemon did not start within {launch_timeout}s — see {LOG_FILE}",
file=sys.stderr,
)
sys.exit(1)
# ─────────────────────────────────────────────────────────────────────────
# DAEMON MODE — re-execs under `uv run --script`, imports heavy deps.
# ─────────────────────────────────────────────────────────────────────────
def _exec_under_uv() -> None:
import shutil
uv = shutil.which("uv")
if not uv:
sys.exit("kokorod: uv not found on PATH. Install with: brew install uv")
args = [a for a in sys.argv[1:] if a != "daemon"]
if "--daemon" not in args:
args.append("--daemon")
os.execv(uv, [uv, "run", "--script", SCRIPT_PATH, *args])
def _ensure_spacy_model() -> None:
try:
import en_core_web_sm # noqa: F401
return
except ImportError:
pass
import subprocess
print("kokorod: downloading spacy en_core_web_sm (one-time)...", flush=True)
subprocess.check_call([sys.executable, "-m", "spacy", "download", "en_core_web_sm"])
def _daemon_main() -> None:
# If heavy deps aren't importable, re-exec under uv-managed env (PEP 723).
try:
import mlx_audio # noqa: F401
except ImportError:
_exec_under_uv()
_ensure_spacy_model()
os.environ.setdefault("HF_HUB_OFFLINE", "1")
import collections
import contextlib
import json
import queue
import re
import socketserver
import threading
import time
import numpy as np
import sounddevice as sd
# ── Inlined helpers (was from mcp_server) ────────────────────────────
MODEL_ID = "mlx-community/Kokoro-82M-bf16"
DEFAULT_VOICE = "af_heart"
DEFAULT_SPEED = 1.0
SAMPLE_RATE = 24000
SHORT_TEXT_THRESHOLD = 25
SHORT_TEXT_PAD = " ... ..."
BLOCK_SIZE = 2048
IDLE_TIMEOUT_SECONDS = int(os.environ.get("KOKOROD_TIMEOUT_MINUTES", "30")) * 60
_model = [None]
_model_lock = threading.Lock()
def get_model():
if _model[0] is not None:
return _model[0]
with _model_lock:
if _model[0] is not None:
return _model[0]
from mlx_audio.tts.utils import load_model
with contextlib.redirect_stdout(sys.stderr):
_model[0] = load_model(model_path=MODEL_ID)
return _model[0]
def lang_code(voice: str) -> str:
if voice and voice[0] in ("a", "b", "j", "z"):
return voice[0]
return "a"
def preprocess(text: str) -> str:
# Negative numbers: -3 → minus 3 (prevents silent drop in phonemizer).
return re.sub(r"(?<!\w)-(\d)", r"minus \1", text)
# ── Playback state ───────────────────────────────────────────────────
# (text, voice, speed, t_enqueued)
play_queue: "queue.Queue" = queue.Queue()
last_activity = [time.time()]
last_activity_lock = threading.Lock()
persistent_stream: "sd.OutputStream | None" = None
pause_event = threading.Event()
stop_event = threading.Event()
playing_flag = [False]
first_audio_at: "list[float | None]" = [None]
first_audio_lock = threading.Lock()
def touch_activity() -> None:
with last_activity_lock:
last_activity[0] = time.time()
class AudioBuffer:
"""FIFO of float32 numpy arrays drained sample-by-sample by the
sd.OutputStream callback. clear() drops anything not yet played,
which is how preempt and stop work without restarting the stream.
"""
def __init__(self) -> None:
self._queue: "collections.deque[np.ndarray]" = collections.deque()
self._current: "np.ndarray | None" = None
self._pos = 0
self._lock = threading.Lock()
def append(self, arr: np.ndarray) -> None:
with self._lock:
self._queue.append(arr)
def clear(self) -> None:
with self._lock:
self._queue.clear()
self._current = None
self._pos = 0
def is_empty(self) -> bool:
with self._lock:
current_done = self._current is None or self._pos >= len(self._current)
return current_done and not self._queue
def fill(self, outdata: np.ndarray) -> bool:
"""Copy up to len(outdata) samples into outdata's first channel.
Pads remainder with silence. Returns True if any real audio was
written (used to record time-to-first-audio)."""
needed = len(outdata)
out_idx = 0
wrote_audio = False
with self._lock:
while needed > 0:
if self._current is None or self._pos >= len(self._current):
if not self._queue:
break
self._current = self._queue.popleft()
self._pos = 0
take = min(needed, len(self._current) - self._pos)
outdata[out_idx : out_idx + take, 0] = self._current[
self._pos : self._pos + take
]
out_idx += take
self._pos += take
needed -= take
wrote_audio = True
if needed > 0:
outdata[out_idx:, :].fill(0)
return wrote_audio
# Always-on keep-alive: 10Hz sine wave at -40 dBFS. Fills every underrun
# sample so the USB DAC's auto-mute silence detector always sees periodic
# non-zero signal. 10Hz is below human hearing AND below any speaker's
# frequency response (inaudible), but unambiguous to the DAC. Random
# dither doesn't work because DACs use RMS-windowed detection, not
# "is it zero".
_KEEPALIVE_FREQ = 10.0
_KEEPALIVE_AMP = 0.01
_keepalive_phase = [0.0]
audio_buf = AudioBuffer()
def audio_callback(outdata, frames, time_info, status):
if pause_event.is_set():
outdata.fill(0)
return
wrote = audio_buf.fill(outdata)
if not wrote:
n = outdata.shape[0]
phase = _keepalive_phase[0]
step = 2 * np.pi * _KEEPALIVE_FREQ / SAMPLE_RATE
t = phase + step * np.arange(n, dtype=np.float64)
outdata[:, 0] = (_KEEPALIVE_AMP * np.sin(t)).astype(np.float32)
_keepalive_phase[0] = (phase + step * n) % (2 * np.pi)
if wrote and first_audio_at[0] is None:
with first_audio_lock:
if first_audio_at[0] is None:
first_audio_at[0] = time.time()
def ensure_stream() -> "sd.OutputStream":
nonlocal persistent_stream
if persistent_stream is None:
persistent_stream = sd.OutputStream(
samplerate=SAMPLE_RATE,
channels=1,
blocksize=BLOCK_SIZE,
dtype="float32",
latency="high",
callback=audio_callback,
)
persistent_stream.start()
print(
f"kokorod: persistent OutputStream opened "
f"(sr={SAMPLE_RATE}, blocksize={BLOCK_SIZE}, latency=high, callback-driven)",
flush=True,
)
return persistent_stream
def compute_state() -> str:
if pause_event.is_set():
return "paused"
if playing_flag[0] or not audio_buf.is_empty() or not play_queue.empty():
return "playing"
return "idle"
def drain_queue() -> int:
n = 0
while True:
try:
play_queue.get_nowait()
n += 1
except queue.Empty:
return n
def _shutdown_soon() -> None:
time.sleep(0.05) # let the RPC response flush
print("kokorod: shutdown requested via RPC", flush=True)
try:
os.unlink(SOCKET_PATH)
except FileNotFoundError:
pass
os._exit(0)
def play_one(text: str, voice: str, speed: float, t_enqueued: float) -> None:
"""Generate audio and feed it to the playback buffer. Returns once
all chunks are queued (or generation is preempted / stopped).
"""
text = preprocess(text)
model = get_model()
ensure_stream()
audio_buf.clear()
pause_event.clear()
stop_event.clear()
first_audio_at[0] = None
first_queue = True
with contextlib.redirect_stdout(sys.stderr):
for result in model.generate(
text=text, voice=voice, speed=speed, lang_code=lang_code(voice),
):
# Abandon generation if a new request arrived or stop was
# requested. The next play_one (if any) will clear the buf.
if not play_queue.empty() or stop_event.is_set():
break
audio = np.array(result.audio, dtype=np.float32)
if len(audio) == 0:
continue
if first_queue:
first_queue = False
print(
f"kokorod: first chunk ready "
f"({time.time() - t_enqueued:.2f}s from enqueue)",
flush=True,
)
audio_buf.append(audio)
# ── RPC handlers ─────────────────────────────────────────────────────
class KokoroHandler(socketserver.StreamRequestHandler):
def handle(self):
line = self.rfile.readline().decode("utf-8").strip()
if not line:
return
try:
req = json.loads(line)
except json.JSONDecodeError as e:
self._respond(False, error=f"invalid JSON: {e}")
return
cmd = req.get("cmd", "say")
if cmd == "resume":
cmd = "play"
touch_activity()
if cmd == "say":
text = (req.get("text") or "").strip()
voice = req.get("voice", DEFAULT_VOICE)
speed = float(req.get("speed", DEFAULT_SPEED))
if not text:
self._respond(False, error="empty text")
return
if len(text) < SHORT_TEXT_THRESHOLD:
text = text + SHORT_TEXT_PAD
word_count = len(text.split())
play_queue.put((text, voice, speed, time.time()))
print(
f"kokorod: queued {word_count} words ({voice} @ {speed}x)",
flush=True,
)
self._respond(True, msg=f"Queued {word_count} words ({voice} @ {speed}x)")
elif cmd == "pause":
pause_event.set()
self._respond(True, msg="paused", state="paused")
elif cmd == "play":
pause_event.clear()
self._respond(True, msg="resumed", state=compute_state())
elif cmd == "stop":
drained = drain_queue()
audio_buf.clear()
pause_event.clear()
stop_event.set()
self._respond(True, msg=f"stopped (dropped {drained} queued)", state="idle")
elif cmd == "status":
self._respond(True, state=compute_state())
elif cmd == "shutdown":
self._respond(True, msg="shutting down")
# Defer the exit slightly so the response actually flushes.
threading.Thread(target=_shutdown_soon, daemon=True).start()
else:
self._respond(False, error=f"unknown cmd: {cmd}")
def _respond(self, ok, msg=None, error=None, state=None):
body = {"ok": ok}
if msg:
body["msg"] = msg
if error:
body["error"] = error
if state:
body["state"] = state
self.wfile.write((json.dumps(body) + "\n").encode("utf-8"))
class ThreadedUnixServer(socketserver.ThreadingMixIn, socketserver.UnixStreamServer):
daemon_threads = True
allow_reuse_address = True
def prewarm() -> None:
"""Run one throwaway generation to force pipeline init."""
t0 = time.time()
model = get_model()
with contextlib.redirect_stdout(sys.stderr):
for _ in model.generate(
text="warm up.", voice=DEFAULT_VOICE, speed=DEFAULT_SPEED,
lang_code=lang_code(DEFAULT_VOICE),
):
pass
print(f"kokorod: pipeline pre-warmed in {time.time() - t0:.2f}s", flush=True)
def prewarm_audio() -> None:
# Opening the callback stream + start() begins driving the audio
# device (callback returns silence until audio_buf has data).
# Keeps AirPods / CoreAudio engaged so the first real speak
# doesn't pay the 2-3s device wake cost.
ensure_stream()
print("kokorod: audio device engaged (callback stream running)", flush=True)
def idle_watcher() -> None:
while True:
time.sleep(60)
with last_activity_lock:
idle = time.time() - last_activity[0]
if idle >= IDLE_TIMEOUT_SECONDS:
print(
f"kokorod: idle for {idle / 60:.1f} min "
f"(limit {IDLE_TIMEOUT_SECONDS / 60:.0f}); shutting down",
flush=True,
)
try:
os.unlink(SOCKET_PATH)
except FileNotFoundError:
pass
os._exit(0)
t_start = time.time()
print("kokorod: loading Kokoro-82M model...", flush=True)
get_model()
print(f"kokorod: model loaded in {time.time() - t_start:.2f}s", flush=True)
prewarm()
prewarm_audio()
try:
os.unlink(SOCKET_PATH)
except FileNotFoundError:
pass
server = ThreadedUnixServer(SOCKET_PATH, KokoroHandler)
os.chmod(SOCKET_PATH, 0o600)
print(
f"kokorod: listening on {SOCKET_PATH} "
f"(ready in {time.time() - t_start:.2f}s, "
f"idle timeout {IDLE_TIMEOUT_SECONDS // 60} min)",
flush=True,
)
threading.Thread(target=server.serve_forever, daemon=True, name="kokorod-socket").start()
threading.Thread(target=idle_watcher, daemon=True, name="kokorod-idle").start()
touch_activity()
# Main thread owns audio playback. macOS CoreAudio silently no-ops
# sd.OutputStream operations started from transient worker threads in
# a long-running socket-server process.
try:
while True:
text, voice, speed, t_enqueued = play_queue.get()
playing_flag[0] = True
try:
play_one(text, voice, speed, t_enqueued)
if first_audio_at[0] is not None:
ttfa = first_audio_at[0] - t_enqueued
print(
f"kokorod: done; total {time.time() - t_enqueued:.2f}s, "
f"TTFA {ttfa:.2f}s",
flush=True,
)
else:
print(
f"kokorod: done (preempted or empty); total {time.time() - t_enqueued:.2f}s",
flush=True,
)
except Exception as e:
print(f"kokorod: playback error: {e}", flush=True)
finally:
playing_flag[0] = False
touch_activity()
except KeyboardInterrupt:
print("\nkokorod: shutting down", flush=True)
finally:
try:
os.unlink(SOCKET_PATH)
except FileNotFoundError:
pass
if persistent_stream is not None:
try:
persistent_stream.stop()
persistent_stream.close()
except Exception:
pass
if __name__ == "__main__":
if _is_daemon_mode():
_daemon_main()
else:
_client_main()
@ericboehs

ericboehs commented May 27, 2026

Copy link
Copy Markdown
Author

kokorod — persistent Kokoro-82M TTS daemon (single file)

A standalone TTS daemon that keeps the Kokoro-82M model resident in memory and feeds a callback-driven audio stream. Time-to-first-audio drops from ~5s (cold kokoro CLI) to ~0.2s.

Single Python file, PEP 723 inline metadata. No requirements.txt, no manual venv, no repo to clone — uv reads the dependency block at the top of the script and manages a cached environment for you.

Built on mlx-audio — Apple Silicon only.

Why

The kokoro CLI pays the full cost on every invocation: Python startup, ~600 MB model load, KokoroPipeline initialization, and macOS CoreAudio / Bluetooth device wake-up. That's ~5 seconds before any sound. kokorod does it once at startup and stays resident. Each subsequent speak is sub-second.

Features

  • Subcommand CLI: kk say "hi", kk pause, kk play, kk stop, kk status, kk kill
  • Implicit say: kk "hi" and echo hi | kk still work
  • Auto-launch: client launches the daemon on first use; daemon self-exits after 30 minutes idle (configurable)
  • Real pause / preempt / stop: callback-driven sd.OutputStream lets us drop buffered audio mid-playback instead of waiting for PortAudio to drain
  • Persistent audio device: stream stays open across speaks so AirPods / Bluetooth devices don't re-engage on every call
  • Single file, zero-touch env: uv run --script handles deps via PEP 723 metadata; spacy's en_core_web_sm is downloaded on first daemon launch

Install

Apple Silicon Mac. Make sure ~/bin is on your PATH (add export PATH="$HOME/bin:$PATH" to your shell rc if not), then:

brew install uv espeak
mkdir -p ~/bin
curl -L https://gist.githubusercontent.com/ericboehs/979e5050ab05da97e15b7ab2e1995df9/raw/kokorod.py -o ~/bin/kk
chmod +x ~/bin/kk

That's it. First kk call:

  1. Launches the daemon
  2. Daemon detects missing deps and re-execs under uv run --script
  3. uv reads the PEP 723 block at the top of the file, resolves dependencies into a cached env (~30–60s, one-time)
  4. Daemon downloads spacy's en_core_web_sm (~12 MB, one-time)
  5. Daemon loads the Kokoro-82M model (~600 MB, one-time HuggingFace download)

Total first-launch ~45–60 seconds. All subsequent daemon launches are ~5 seconds (cached). Speaks within a running daemon are ~0.2s.

Usage

kk "Hello, world."                    # implicit `say`
kk say "Explicit say subcommand."
echo "from stdin" | kk                # pipe input
kk -v bm_fable say "British male."
kk -s 1.2 say "Faster speech."
kk -v bm_fable "British male implicit say."

kk pause                              # pause current playback
kk play                               # resume (alias: kk resume)
kk stop                               # stop, drop buffered audio
kk status                             # idle | playing | paused

kk daemon                             # run daemon in foreground (debugging)
kk kill                               # stop the daemon (alias: stop-daemon)

Configuration

Env var Default Meaning
KOKOROD_TIMEOUT_MINUTES 30 Idle minutes before the daemon self-exits
KK_LAUNCH_TIMEOUT 60 Seconds the client waits for the socket after launching the daemon

Architecture

  • Client (kk): pure stdlib, ~50 ms startup. Sends newline-terminated JSON over /tmp/kokoro.sock. Doesn't touch uv — runs under the system python3 shebang.

  • Daemon (kokorod): first launch re-execs into uv run --script so the PEP 723 dependency block is honored. Loads Kokoro-82M, opens a persistent callback-driven sd.OutputStream, then runs a ThreadedUnixServer on a background thread.

  • Playback runs on the main thread. macOS CoreAudio silently no-ops sd.OutputStream operations started from transient worker threads in a long-running socket-server process. The socket server is the background thread; main thread drains play_queue and feeds AudioBuffer.

  • AudioBuffer is a thread-safe FIFO of float32 numpy arrays drained sample-by-sample by the audio callback. clear() drops anything not yet played — that's how stop and preempt work.

  • RPC protocol (over /tmp/kokoro.sock, newline-terminated JSON):

    {"cmd": "say", "text": "...", "voice": "af_heart", "speed": 1.0}
    {"cmd": "pause"}
    {"cmd": "play"}
    {"cmd": "stop"}
    {"cmd": "status"}
    {"cmd": "shutdown"}

    Response: {"ok": true, "msg": "...", "state": "idle|playing|paused"} or {"ok": false, "error": "..."}.

Memory budget

  • ~30–40 MB before first speak
  • ~600 MB resident after model load
  • ~800 MB peak during generation

Gotchas

  • Python 3.12 only — spacy / pydantic don't yet support 3.13. The PEP 723 block pins this; uv will fetch 3.12 automatically.
  • misaki[en]<0.9 — 0.9+ breaks EspeakWrapper.set_data_path. Pinned in the PEP 723 block.
  • Apple Silicon onlymlx requirement.
  • Short text (<25 chars) is padded with ... ... to avoid an mlx-audio hang bug. Minor audible artifact on very short phrases, accepted as a tradeoff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment