Skip to content

Instantly share code, notes, and snippets.

@PsychoSmiley
Last active November 13, 2025 01:10
Show Gist options
  • Select an option

  • Save PsychoSmiley/6b5bd86ef88dd702be8dcc81204496fc to your computer and use it in GitHub Desktop.

Select an option

Save PsychoSmiley/6b5bd86ef88dd702be8dcc81204496fc to your computer and use it in GitHub Desktop.
MCP Buttplug - Convert from LLM_Buttplug
# /// script
# dependencies = ["buttplug", "websockets", "mcp"]
# ///
"""Stroke bridge: combined server/client (requires Intiface Central running) + optional MCP tool.
- Usage Local: pip install buttplug websockets mcp -> `python MCP-buttplug.py serve` + `python MCP-buttplug.py send --duration 1500 --intensity 0.7`
- Usage MCP: claude mcp add --transport stdio buttplug -- uv run https://gist.github.com/PsychoSmiley/6b5bd86ef88dd702be8dcc81204496fc/raw/MCP-buttplug.py serve --mcp
- Optional env vars: --env STROKE_HOST=127.0.0.1 --env STROKE_PORT=8769 --env STROKE_INTIFACE=ws://127.0.0.1:12345
- Note: uv auto-installs deps from PEP 723 metadata block above
"""
import argparse, asyncio, json, logging, os, threading
from pathlib import Path
from typing import Dict, List, Optional
import websockets
from buttplug.client import (
ButtplugClient,
ButtplugClientDevice,
ButtplugClientWebsocketConnector,
)
try:
import mcp.types as types
from mcp.server.lowlevel import Server as LowLevelServer
import mcp.server.stdio
MCP_AVAILABLE = True
except ImportError: # pragma: no cover
MCP_AVAILABLE = False
HOST = os.getenv("STROKE_HOST", "127.0.0.1")
PORT = int(os.getenv("STROKE_PORT", "8769"))
INTIFACE = os.getenv("STROKE_INTIFACE", "ws://127.0.0.1:12345")
LOCK = Path.home() / ".stroke_bridge.lock"
logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(message)s")
logger = logging.getLogger("stroke_bridge")
# Global DeviceController for MCP tools (set when MCP mode starts)
_device_controller: Optional["DeviceController"] = None
_server_loop: Optional[asyncio.AbstractEventLoop] = None # Event loop for server thread
class DeviceController:
def __init__(self, intiface_address: str):
self.intiface_address = intiface_address
self.client: Optional[ButtplugClient] = None
self.devices: Dict[int, ButtplugClientDevice] = {}
self.current_tasks: List[asyncio.Task] = []
async def connect(self) -> bool:
connector = ButtplugClientWebsocketConnector(self.intiface_address)
self.client = ButtplugClient("StrokeBridge")
await self.client.connect(connector)
await self.client.start_scanning()
await asyncio.sleep(10)
await self.client.stop_scanning()
if not self.client.devices:
logger.error("No toys detected; ensure Intiface is running and a device is on.")
return False
for idx, device in self.client.devices.items():
caps = []
if "VibrateCmd" in device.allowed_messages:
caps.append("vibrate")
if "LinearCmd" in device.allowed_messages:
caps.append("linear")
if "RotateCmd" in device.allowed_messages:
caps.append("rotate")
logger.info("Device %s: %s [%s]", idx, device.name, ", ".join(caps))
self.devices[idx] = device
return True
async def disconnect(self):
if self.client:
await self.client.disconnect()
async def refresh_devices(self) -> list[dict]:
"""Scan for new devices and return snapshot. Thread-safe."""
await self.client.start_scanning()
await asyncio.sleep(10)
await self.client.stop_scanning()
# Sync devices from Buttplug client
self.devices.clear()
for idx, device in self.client.devices.items():
self.devices[idx] = device
logger.info("Device refresh completed: %d devices found", len(self.devices))
# Build and return snapshot (no cross-thread dict access)
devices = []
for device_id, device in self.devices.items():
capabilities = []
if "VibrateCmd" in device.allowed_messages:
capabilities.append("vibrate")
if "LinearCmd" in device.allowed_messages:
capabilities.append("linear")
if "RotateCmd" in device.allowed_messages:
capabilities.append("rotate")
devices.append({
"id": device_id,
"name": device.name,
"capabilities": capabilities
})
return devices
async def serve(self, listen: str):
host, port = listen.split(":")
server = await websockets.serve(self.handle_connection, host, int(port))
logger.info("Listening for commands on ws://%s/", listen)
await server.wait_closed()
async def handle_connection(self, websocket):
async for message in websocket:
try:
command = json.loads(message)
except json.JSONDecodeError:
logger.warning("Invalid JSON: %s", message)
continue
await self.apply_command(command)
async def apply_command(self, command: dict):
duration_ms = int(command.get("duration", 1000))
intensity = float(command.get("intensity", 0.6))
intensity = min(max(intensity, 0.0), 1.0)
oscillation = bool(command.get("oscillation", False))
rotation_cw = command.get("rotation_clockwise")
device_id = command.get("device_id")
# Determine which devices to target
if device_id is not None:
if device_id not in self.devices:
logger.error("Device ID %s not found. Available devices: %s", device_id, list(self.devices.keys()))
return
devices_to_drive = {device_id: self.devices[device_id]}
logger.info("Targeting device %s (%s): duration=%sms intensity=%.2f oscillation=%s",
device_id, self.devices[device_id].name, duration_ms, intensity, oscillation)
else:
devices_to_drive = self.devices
logger.info("Broadcasting to all devices: duration=%sms intensity=%.2f oscillation=%s",
duration_ms, intensity, oscillation)
for task in self.current_tasks:
task.cancel()
self.current_tasks.clear()
for device in devices_to_drive.values():
task = asyncio.create_task(self.drive_device(device, duration_ms, intensity, oscillation, rotation_cw))
self.current_tasks.append(task)
async def drive_device(
self,
device: ButtplugClientDevice,
duration_ms: int,
intensity: float,
oscillate: bool,
rotation_cw: Optional[bool] = None
):
step = max(duration_ms / 1000, 0.1)
step_ms = int(step * 1000)
try:
# Auto-detect and use appropriate actuator type
# Priority: Linear > Vibration > Rotation (use only one type)
# 1. Linear actuators (strokers like The Handy, Keon, OSR2) - PRIORITY
if "LinearCmd" in device.allowed_messages:
if oscillate:
position = intensity # Use intensity as max position (0.0-1.0)
while True: # Runs until cancelled by new command
logger.debug("Stroke %s -> %.2f position", device.name, position)
await device.send_linear_cmd((step_ms, position))
await asyncio.sleep(step)
# Oscillate between 0.0 and intensity position
position = intensity if position <= 0.01 else 0.0
else:
logger.debug("Stroke %s -> %.2f position over %dms", device.name, intensity, step_ms)
await device.send_linear_cmd((step_ms, intensity))
await asyncio.sleep(step)
# 2. Vibration (scalar actuators) - if no linear support
elif "VibrateCmd" in device.allowed_messages:
if oscillate:
level = intensity
while True: # Runs until cancelled by new command
logger.debug("Vibrate %s -> %.2f", device.name, level)
await device.send_vibrate_cmd(level)
await asyncio.sleep(step)
level = intensity if level <= 0.01 else 0.0
else:
logger.debug("Vibrate %s -> %.2f for %.2fs", device.name, intensity, step)
await device.send_vibrate_cmd(intensity)
await asyncio.sleep(step)
# 3. Rotatory actuators (rotation toys like Vorze) - if no linear/vibration
elif "RotateCmd" in device.allowed_messages:
if rotation_cw is None:
rotation_cw = True # Default to clockwise
if oscillate:
speed = intensity
while True: # Runs until cancelled by new command
logger.debug("Rotate %s -> %.2f speed %s", device.name, speed, "CW" if rotation_cw else "CCW")
await device.send_rotate_cmd((speed, rotation_cw))
await asyncio.sleep(step)
speed = intensity if speed <= 0.01 else 0.0
else:
logger.debug("Rotate %s -> %.2f speed %s for %.2fs", device.name, intensity, "CW" if rotation_cw else "CCW", step)
await device.send_rotate_cmd((intensity, rotation_cw))
await asyncio.sleep(step)
else:
logger.warning("Device %s has no supported actuators", device.name)
except asyncio.CancelledError:
pass
except Exception as exc:
logger.error("Error driving %s: %s", device.name, exc)
finally:
# Stop all actuator types
try:
await device.send_stop_device_cmd()
except Exception as exc:
logger.warning("Failed to stop %s: %s", device.name, exc)
async def run_local_server(listen: str):
global _device_controller, _server_loop
# Capture event loop for cross-thread communication
_server_loop = asyncio.get_event_loop()
controller = DeviceController(INTIFACE)
if not await controller.connect():
return
# Make controller available to MCP tools
if MCP_AVAILABLE:
_device_controller = controller
try:
await controller.serve(listen)
finally:
for task in controller.current_tasks:
task.cancel()
await controller.disconnect()
if MCP_AVAILABLE:
_device_controller = None
_server_loop = None
async def send_command(duration: int, intensity: float, oscillation: bool, rotation: Optional[str], device_id: Optional[int] = None):
payload = {
"duration": duration,
"intensity": max(0.0, min(intensity, 1.0)),
"oscillation": oscillation,
}
if rotation:
payload["rotation_clockwise"] = rotation.lower() in {"cw", "true", "1"}
if device_id is not None:
payload["device_id"] = device_id
uri = f"ws://{HOST}:{PORT}"
try:
async with websockets.connect(uri) as websocket:
await websocket.send(json.dumps(payload))
logger.info("Sent: %s -> %s", uri, payload)
except OSError as exc:
logger.error("Cannot reach %s: %s", uri, exc)
raise SystemExit(1) from exc
def acquire_lock() -> bool:
if LOCK.exists():
try:
pid = int(LOCK.read_text().strip())
if pid and pid != os.getpid():
# Check if the process is actually running (cross-platform)
import sys
if sys.platform == "win32":
# Windows: use tasklist command
import subprocess
try:
result = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}"],
capture_output=True,
text=True,
timeout=2
)
if str(pid) in result.stdout:
logger.info("Lock file exists (%s); server already running.", pid)
return False
else:
logger.info("Removing stale lock file (PID %s no longer running)", pid)
LOCK.unlink(missing_ok=True)
except Exception:
# If check fails, assume running to be safe
logger.warning("Could not check PID %s; assuming running", pid)
return False
else:
# Unix: use os.kill with signal 0
try:
os.kill(pid, 0)
logger.info("Lock file exists (%s); server already running.", pid)
return False
except (OSError, ProcessLookupError):
logger.info("Removing stale lock file (PID %s no longer running)", pid)
LOCK.unlink(missing_ok=True)
except ValueError:
pass
LOCK.write_text(str(os.getpid()))
return True
def release_lock():
LOCK.unlink(missing_ok=True)
def start_local_server_thread(listen: str) -> threading.Thread:
def runner():
asyncio.run(run_local_server(listen))
thread = threading.Thread(target=runner, daemon=True)
thread.start()
return thread
if MCP_AVAILABLE:
mcp_server = LowLevelServer("stroke-bridge")
@mcp_server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="devices.list",
description="Scan for connected devices (takes 10 seconds). Returns device IDs, names, and capabilities.",
inputSchema={
"type": "object",
"properties": {}
}
),
types.Tool(
name="stroke.send_ws",
description="Send a vibration/stroke/rotation command to connected device(s)",
inputSchema={
"type": "object",
"required": ["duration", "intensity"],
"properties": {
"duration": {"type": "integer", "minimum": 100, "description": "Duration in milliseconds"},
"intensity": {"type": "number", "minimum": 0.0, "maximum": 1.0, "description": "Vibration intensity OR stroke position (0-1)"},
"oscillation": {"type": "boolean", "default": False, "description": "Enable oscillation"},
"rotation": {"type": "string", "enum": ["cw", "ccw"], "description": "Rotation direction (optional, for rotation toys)"},
"device_id": {"type": "integer", "description": "Target specific device ID from devices.list (omit to broadcast to all devices)"},
},
},
)
]
@mcp_server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
global _device_controller
if name == "devices.list":
if _device_controller is None or _server_loop is None:
raise RuntimeError("Device controller not initialized. Server must be running in MCP mode.")
# Refresh devices on server thread - returns snapshot (10 second scan)
# NOTE: Blocks server loop for 10s - WebSocket commands queue during scan
future = asyncio.run_coroutine_threadsafe(
_device_controller.refresh_devices(),
_server_loop
)
# Await the future properly (non-blocking for MCP loop)
try:
devices = await asyncio.wait_for(asyncio.wrap_future(future), timeout=15.0)
except asyncio.TimeoutError as e:
logger.error("Device scan timed out")
raise TimeoutError("Device scan timed out after 15 seconds") from e
except Exception as e:
logger.error("Device scan failed: %s", e)
raise RuntimeError(f"Device scan failed: {str(e)}") from e
# Return as JSON text with metadata
devices_text = json.dumps({"devices": devices}, indent=2)
return [types.TextContent(
type="text",
text=devices_text,
_meta={"content_type": "application/json"}
)]
elif name == "stroke.send_ws":
if _device_controller is None or _server_loop is None:
raise RuntimeError("Device controller not initialized.")
# Build command payload
command = {
"duration": arguments["duration"],
"intensity": arguments["intensity"],
"oscillation": arguments.get("oscillation", False),
}
if arguments.get("rotation"):
command["rotation_clockwise"] = arguments["rotation"].lower() == "cw"
if arguments.get("device_id") is not None:
command["device_id"] = arguments["device_id"]
# Call apply_command in server thread's event loop (non-blocking)
future = asyncio.run_coroutine_threadsafe(
_device_controller.apply_command(command),
_server_loop
)
# Await properly without blocking MCP loop
try:
await asyncio.wait_for(asyncio.wrap_future(future), timeout=2.0)
return [types.TextContent(type="text", text="Command sent successfully")]
except asyncio.TimeoutError as e:
logger.error("Command timed out")
raise TimeoutError("Command timed out after 2 seconds") from e
except Exception as e:
logger.error("Failed to send command: %s", e)
raise RuntimeError(f"Failed to send command: {str(e)}") from e
# Unknown tool
logger.error("Unknown tool requested: %s", name)
raise ValueError(f"Unknown tool: {name}")
else:
mcp_server = None
def cli():
parser = argparse.ArgumentParser(description="Stroke bridge controller")
sub = parser.add_subparsers(dest="mode", required=True)
serve = sub.add_parser("serve", help="start the bridge server (optionally MCP)")
serve.add_argument("--listen", default=f"{HOST}:{PORT}", help="host:port for incoming commands")
serve.add_argument("--mcp", action="store_true", help="expose MCP stdio tool")
send = sub.add_parser("send", help="send a single command")
send.add_argument("--duration", type=int, default=1500)
send.add_argument("--intensity", type=float, default=0.7)
osc = send.add_mutually_exclusive_group()
osc.add_argument("--oscillation", action="store_true")
osc.add_argument("--no-oscillation", action="store_false", dest="oscillation")
send.set_defaults(oscillation=False)
send.add_argument("--rotation", choices=["cw", "ccw"], default=None)
args = parser.parse_args()
if args.mode == "serve":
if args.mcp and not MCP_AVAILABLE:
logger.error("mcp package not installed. Run `pip install mcp` to enable MCP mode.")
return
if not acquire_lock():
logger.info("Stroke bridge already running; exiting.")
return
try:
if args.mcp:
logger.info("Starting local server + MCP tool")
start_local_server_thread(args.listen)
assert mcp_server is not None
async def main_mcp():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await mcp_server.run(read_stream, write_stream, mcp_server.create_initialization_options())
asyncio.run(main_mcp())
else:
logger.info("Starting local server on %s", args.listen)
asyncio.run(run_local_server(args.listen))
finally:
release_lock()
else:
asyncio.run(send_command(args.duration, args.intensity, args.oscillation, args.rotation))
if __name__ == "__main__":
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment