Last active
November 13, 2025 01:10
-
-
Save PsychoSmiley/6b5bd86ef88dd702be8dcc81204496fc to your computer and use it in GitHub Desktop.
MCP Buttplug - Convert from LLM_Buttplug
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = ["buttplug", "websockets", "mcp"] | |
| # /// | |
| """Stroke bridge: combined server/client (requires Intiface Central running) + optional MCP tool. | |
| - Usage Local: pip install buttplug websockets mcp -> `python MCP-buttplug.py serve` + `python MCP-buttplug.py send --duration 1500 --intensity 0.7` | |
| - Usage MCP: claude mcp add --transport stdio buttplug -- uv run https://gist.github.com/PsychoSmiley/6b5bd86ef88dd702be8dcc81204496fc/raw/MCP-buttplug.py serve --mcp | |
| - Optional env vars: --env STROKE_HOST=127.0.0.1 --env STROKE_PORT=8769 --env STROKE_INTIFACE=ws://127.0.0.1:12345 | |
| - Note: uv auto-installs deps from PEP 723 metadata block above | |
| """ | |
| import argparse, asyncio, json, logging, os, threading | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| import websockets | |
| from buttplug.client import ( | |
| ButtplugClient, | |
| ButtplugClientDevice, | |
| ButtplugClientWebsocketConnector, | |
| ) | |
| try: | |
| import mcp.types as types | |
| from mcp.server.lowlevel import Server as LowLevelServer | |
| import mcp.server.stdio | |
| MCP_AVAILABLE = True | |
| except ImportError: # pragma: no cover | |
| MCP_AVAILABLE = False | |
| HOST = os.getenv("STROKE_HOST", "127.0.0.1") | |
| PORT = int(os.getenv("STROKE_PORT", "8769")) | |
| INTIFACE = os.getenv("STROKE_INTIFACE", "ws://127.0.0.1:12345") | |
| LOCK = Path.home() / ".stroke_bridge.lock" | |
| logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(message)s") | |
| logger = logging.getLogger("stroke_bridge") | |
| # Global DeviceController for MCP tools (set when MCP mode starts) | |
| _device_controller: Optional["DeviceController"] = None | |
| _server_loop: Optional[asyncio.AbstractEventLoop] = None # Event loop for server thread | |
| class DeviceController: | |
| def __init__(self, intiface_address: str): | |
| self.intiface_address = intiface_address | |
| self.client: Optional[ButtplugClient] = None | |
| self.devices: Dict[int, ButtplugClientDevice] = {} | |
| self.current_tasks: List[asyncio.Task] = [] | |
| async def connect(self) -> bool: | |
| connector = ButtplugClientWebsocketConnector(self.intiface_address) | |
| self.client = ButtplugClient("StrokeBridge") | |
| await self.client.connect(connector) | |
| await self.client.start_scanning() | |
| await asyncio.sleep(10) | |
| await self.client.stop_scanning() | |
| if not self.client.devices: | |
| logger.error("No toys detected; ensure Intiface is running and a device is on.") | |
| return False | |
| for idx, device in self.client.devices.items(): | |
| caps = [] | |
| if "VibrateCmd" in device.allowed_messages: | |
| caps.append("vibrate") | |
| if "LinearCmd" in device.allowed_messages: | |
| caps.append("linear") | |
| if "RotateCmd" in device.allowed_messages: | |
| caps.append("rotate") | |
| logger.info("Device %s: %s [%s]", idx, device.name, ", ".join(caps)) | |
| self.devices[idx] = device | |
| return True | |
| async def disconnect(self): | |
| if self.client: | |
| await self.client.disconnect() | |
| async def refresh_devices(self) -> list[dict]: | |
| """Scan for new devices and return snapshot. Thread-safe.""" | |
| await self.client.start_scanning() | |
| await asyncio.sleep(10) | |
| await self.client.stop_scanning() | |
| # Sync devices from Buttplug client | |
| self.devices.clear() | |
| for idx, device in self.client.devices.items(): | |
| self.devices[idx] = device | |
| logger.info("Device refresh completed: %d devices found", len(self.devices)) | |
| # Build and return snapshot (no cross-thread dict access) | |
| devices = [] | |
| for device_id, device in self.devices.items(): | |
| capabilities = [] | |
| if "VibrateCmd" in device.allowed_messages: | |
| capabilities.append("vibrate") | |
| if "LinearCmd" in device.allowed_messages: | |
| capabilities.append("linear") | |
| if "RotateCmd" in device.allowed_messages: | |
| capabilities.append("rotate") | |
| devices.append({ | |
| "id": device_id, | |
| "name": device.name, | |
| "capabilities": capabilities | |
| }) | |
| return devices | |
| async def serve(self, listen: str): | |
| host, port = listen.split(":") | |
| server = await websockets.serve(self.handle_connection, host, int(port)) | |
| logger.info("Listening for commands on ws://%s/", listen) | |
| await server.wait_closed() | |
| async def handle_connection(self, websocket): | |
| async for message in websocket: | |
| try: | |
| command = json.loads(message) | |
| except json.JSONDecodeError: | |
| logger.warning("Invalid JSON: %s", message) | |
| continue | |
| await self.apply_command(command) | |
| async def apply_command(self, command: dict): | |
| duration_ms = int(command.get("duration", 1000)) | |
| intensity = float(command.get("intensity", 0.6)) | |
| intensity = min(max(intensity, 0.0), 1.0) | |
| oscillation = bool(command.get("oscillation", False)) | |
| rotation_cw = command.get("rotation_clockwise") | |
| device_id = command.get("device_id") | |
| # Determine which devices to target | |
| if device_id is not None: | |
| if device_id not in self.devices: | |
| logger.error("Device ID %s not found. Available devices: %s", device_id, list(self.devices.keys())) | |
| return | |
| devices_to_drive = {device_id: self.devices[device_id]} | |
| logger.info("Targeting device %s (%s): duration=%sms intensity=%.2f oscillation=%s", | |
| device_id, self.devices[device_id].name, duration_ms, intensity, oscillation) | |
| else: | |
| devices_to_drive = self.devices | |
| logger.info("Broadcasting to all devices: duration=%sms intensity=%.2f oscillation=%s", | |
| duration_ms, intensity, oscillation) | |
| for task in self.current_tasks: | |
| task.cancel() | |
| self.current_tasks.clear() | |
| for device in devices_to_drive.values(): | |
| task = asyncio.create_task(self.drive_device(device, duration_ms, intensity, oscillation, rotation_cw)) | |
| self.current_tasks.append(task) | |
| async def drive_device( | |
| self, | |
| device: ButtplugClientDevice, | |
| duration_ms: int, | |
| intensity: float, | |
| oscillate: bool, | |
| rotation_cw: Optional[bool] = None | |
| ): | |
| step = max(duration_ms / 1000, 0.1) | |
| step_ms = int(step * 1000) | |
| try: | |
| # Auto-detect and use appropriate actuator type | |
| # Priority: Linear > Vibration > Rotation (use only one type) | |
| # 1. Linear actuators (strokers like The Handy, Keon, OSR2) - PRIORITY | |
| if "LinearCmd" in device.allowed_messages: | |
| if oscillate: | |
| position = intensity # Use intensity as max position (0.0-1.0) | |
| while True: # Runs until cancelled by new command | |
| logger.debug("Stroke %s -> %.2f position", device.name, position) | |
| await device.send_linear_cmd((step_ms, position)) | |
| await asyncio.sleep(step) | |
| # Oscillate between 0.0 and intensity position | |
| position = intensity if position <= 0.01 else 0.0 | |
| else: | |
| logger.debug("Stroke %s -> %.2f position over %dms", device.name, intensity, step_ms) | |
| await device.send_linear_cmd((step_ms, intensity)) | |
| await asyncio.sleep(step) | |
| # 2. Vibration (scalar actuators) - if no linear support | |
| elif "VibrateCmd" in device.allowed_messages: | |
| if oscillate: | |
| level = intensity | |
| while True: # Runs until cancelled by new command | |
| logger.debug("Vibrate %s -> %.2f", device.name, level) | |
| await device.send_vibrate_cmd(level) | |
| await asyncio.sleep(step) | |
| level = intensity if level <= 0.01 else 0.0 | |
| else: | |
| logger.debug("Vibrate %s -> %.2f for %.2fs", device.name, intensity, step) | |
| await device.send_vibrate_cmd(intensity) | |
| await asyncio.sleep(step) | |
| # 3. Rotatory actuators (rotation toys like Vorze) - if no linear/vibration | |
| elif "RotateCmd" in device.allowed_messages: | |
| if rotation_cw is None: | |
| rotation_cw = True # Default to clockwise | |
| if oscillate: | |
| speed = intensity | |
| while True: # Runs until cancelled by new command | |
| logger.debug("Rotate %s -> %.2f speed %s", device.name, speed, "CW" if rotation_cw else "CCW") | |
| await device.send_rotate_cmd((speed, rotation_cw)) | |
| await asyncio.sleep(step) | |
| speed = intensity if speed <= 0.01 else 0.0 | |
| else: | |
| logger.debug("Rotate %s -> %.2f speed %s for %.2fs", device.name, intensity, "CW" if rotation_cw else "CCW", step) | |
| await device.send_rotate_cmd((intensity, rotation_cw)) | |
| await asyncio.sleep(step) | |
| else: | |
| logger.warning("Device %s has no supported actuators", device.name) | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as exc: | |
| logger.error("Error driving %s: %s", device.name, exc) | |
| finally: | |
| # Stop all actuator types | |
| try: | |
| await device.send_stop_device_cmd() | |
| except Exception as exc: | |
| logger.warning("Failed to stop %s: %s", device.name, exc) | |
| async def run_local_server(listen: str): | |
| global _device_controller, _server_loop | |
| # Capture event loop for cross-thread communication | |
| _server_loop = asyncio.get_event_loop() | |
| controller = DeviceController(INTIFACE) | |
| if not await controller.connect(): | |
| return | |
| # Make controller available to MCP tools | |
| if MCP_AVAILABLE: | |
| _device_controller = controller | |
| try: | |
| await controller.serve(listen) | |
| finally: | |
| for task in controller.current_tasks: | |
| task.cancel() | |
| await controller.disconnect() | |
| if MCP_AVAILABLE: | |
| _device_controller = None | |
| _server_loop = None | |
| async def send_command(duration: int, intensity: float, oscillation: bool, rotation: Optional[str], device_id: Optional[int] = None): | |
| payload = { | |
| "duration": duration, | |
| "intensity": max(0.0, min(intensity, 1.0)), | |
| "oscillation": oscillation, | |
| } | |
| if rotation: | |
| payload["rotation_clockwise"] = rotation.lower() in {"cw", "true", "1"} | |
| if device_id is not None: | |
| payload["device_id"] = device_id | |
| uri = f"ws://{HOST}:{PORT}" | |
| try: | |
| async with websockets.connect(uri) as websocket: | |
| await websocket.send(json.dumps(payload)) | |
| logger.info("Sent: %s -> %s", uri, payload) | |
| except OSError as exc: | |
| logger.error("Cannot reach %s: %s", uri, exc) | |
| raise SystemExit(1) from exc | |
| def acquire_lock() -> bool: | |
| if LOCK.exists(): | |
| try: | |
| pid = int(LOCK.read_text().strip()) | |
| if pid and pid != os.getpid(): | |
| # Check if the process is actually running (cross-platform) | |
| import sys | |
| if sys.platform == "win32": | |
| # Windows: use tasklist command | |
| import subprocess | |
| try: | |
| result = subprocess.run( | |
| ["tasklist", "/FI", f"PID eq {pid}"], | |
| capture_output=True, | |
| text=True, | |
| timeout=2 | |
| ) | |
| if str(pid) in result.stdout: | |
| logger.info("Lock file exists (%s); server already running.", pid) | |
| return False | |
| else: | |
| logger.info("Removing stale lock file (PID %s no longer running)", pid) | |
| LOCK.unlink(missing_ok=True) | |
| except Exception: | |
| # If check fails, assume running to be safe | |
| logger.warning("Could not check PID %s; assuming running", pid) | |
| return False | |
| else: | |
| # Unix: use os.kill with signal 0 | |
| try: | |
| os.kill(pid, 0) | |
| logger.info("Lock file exists (%s); server already running.", pid) | |
| return False | |
| except (OSError, ProcessLookupError): | |
| logger.info("Removing stale lock file (PID %s no longer running)", pid) | |
| LOCK.unlink(missing_ok=True) | |
| except ValueError: | |
| pass | |
| LOCK.write_text(str(os.getpid())) | |
| return True | |
| def release_lock(): | |
| LOCK.unlink(missing_ok=True) | |
| def start_local_server_thread(listen: str) -> threading.Thread: | |
| def runner(): | |
| asyncio.run(run_local_server(listen)) | |
| thread = threading.Thread(target=runner, daemon=True) | |
| thread.start() | |
| return thread | |
| if MCP_AVAILABLE: | |
| mcp_server = LowLevelServer("stroke-bridge") | |
| @mcp_server.list_tools() | |
| async def list_tools() -> list[types.Tool]: | |
| return [ | |
| types.Tool( | |
| name="devices.list", | |
| description="Scan for connected devices (takes 10 seconds). Returns device IDs, names, and capabilities.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": {} | |
| } | |
| ), | |
| types.Tool( | |
| name="stroke.send_ws", | |
| description="Send a vibration/stroke/rotation command to connected device(s)", | |
| inputSchema={ | |
| "type": "object", | |
| "required": ["duration", "intensity"], | |
| "properties": { | |
| "duration": {"type": "integer", "minimum": 100, "description": "Duration in milliseconds"}, | |
| "intensity": {"type": "number", "minimum": 0.0, "maximum": 1.0, "description": "Vibration intensity OR stroke position (0-1)"}, | |
| "oscillation": {"type": "boolean", "default": False, "description": "Enable oscillation"}, | |
| "rotation": {"type": "string", "enum": ["cw", "ccw"], "description": "Rotation direction (optional, for rotation toys)"}, | |
| "device_id": {"type": "integer", "description": "Target specific device ID from devices.list (omit to broadcast to all devices)"}, | |
| }, | |
| }, | |
| ) | |
| ] | |
| @mcp_server.call_tool() | |
| async def call_tool(name: str, arguments: dict) -> list[types.TextContent]: | |
| global _device_controller | |
| if name == "devices.list": | |
| if _device_controller is None or _server_loop is None: | |
| raise RuntimeError("Device controller not initialized. Server must be running in MCP mode.") | |
| # Refresh devices on server thread - returns snapshot (10 second scan) | |
| # NOTE: Blocks server loop for 10s - WebSocket commands queue during scan | |
| future = asyncio.run_coroutine_threadsafe( | |
| _device_controller.refresh_devices(), | |
| _server_loop | |
| ) | |
| # Await the future properly (non-blocking for MCP loop) | |
| try: | |
| devices = await asyncio.wait_for(asyncio.wrap_future(future), timeout=15.0) | |
| except asyncio.TimeoutError as e: | |
| logger.error("Device scan timed out") | |
| raise TimeoutError("Device scan timed out after 15 seconds") from e | |
| except Exception as e: | |
| logger.error("Device scan failed: %s", e) | |
| raise RuntimeError(f"Device scan failed: {str(e)}") from e | |
| # Return as JSON text with metadata | |
| devices_text = json.dumps({"devices": devices}, indent=2) | |
| return [types.TextContent( | |
| type="text", | |
| text=devices_text, | |
| _meta={"content_type": "application/json"} | |
| )] | |
| elif name == "stroke.send_ws": | |
| if _device_controller is None or _server_loop is None: | |
| raise RuntimeError("Device controller not initialized.") | |
| # Build command payload | |
| command = { | |
| "duration": arguments["duration"], | |
| "intensity": arguments["intensity"], | |
| "oscillation": arguments.get("oscillation", False), | |
| } | |
| if arguments.get("rotation"): | |
| command["rotation_clockwise"] = arguments["rotation"].lower() == "cw" | |
| if arguments.get("device_id") is not None: | |
| command["device_id"] = arguments["device_id"] | |
| # Call apply_command in server thread's event loop (non-blocking) | |
| future = asyncio.run_coroutine_threadsafe( | |
| _device_controller.apply_command(command), | |
| _server_loop | |
| ) | |
| # Await properly without blocking MCP loop | |
| try: | |
| await asyncio.wait_for(asyncio.wrap_future(future), timeout=2.0) | |
| return [types.TextContent(type="text", text="Command sent successfully")] | |
| except asyncio.TimeoutError as e: | |
| logger.error("Command timed out") | |
| raise TimeoutError("Command timed out after 2 seconds") from e | |
| except Exception as e: | |
| logger.error("Failed to send command: %s", e) | |
| raise RuntimeError(f"Failed to send command: {str(e)}") from e | |
| # Unknown tool | |
| logger.error("Unknown tool requested: %s", name) | |
| raise ValueError(f"Unknown tool: {name}") | |
| else: | |
| mcp_server = None | |
| def cli(): | |
| parser = argparse.ArgumentParser(description="Stroke bridge controller") | |
| sub = parser.add_subparsers(dest="mode", required=True) | |
| serve = sub.add_parser("serve", help="start the bridge server (optionally MCP)") | |
| serve.add_argument("--listen", default=f"{HOST}:{PORT}", help="host:port for incoming commands") | |
| serve.add_argument("--mcp", action="store_true", help="expose MCP stdio tool") | |
| send = sub.add_parser("send", help="send a single command") | |
| send.add_argument("--duration", type=int, default=1500) | |
| send.add_argument("--intensity", type=float, default=0.7) | |
| osc = send.add_mutually_exclusive_group() | |
| osc.add_argument("--oscillation", action="store_true") | |
| osc.add_argument("--no-oscillation", action="store_false", dest="oscillation") | |
| send.set_defaults(oscillation=False) | |
| send.add_argument("--rotation", choices=["cw", "ccw"], default=None) | |
| args = parser.parse_args() | |
| if args.mode == "serve": | |
| if args.mcp and not MCP_AVAILABLE: | |
| logger.error("mcp package not installed. Run `pip install mcp` to enable MCP mode.") | |
| return | |
| if not acquire_lock(): | |
| logger.info("Stroke bridge already running; exiting.") | |
| return | |
| try: | |
| if args.mcp: | |
| logger.info("Starting local server + MCP tool") | |
| start_local_server_thread(args.listen) | |
| assert mcp_server is not None | |
| async def main_mcp(): | |
| async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): | |
| await mcp_server.run(read_stream, write_stream, mcp_server.create_initialization_options()) | |
| asyncio.run(main_mcp()) | |
| else: | |
| logger.info("Starting local server on %s", args.listen) | |
| asyncio.run(run_local_server(args.listen)) | |
| finally: | |
| release_lock() | |
| else: | |
| asyncio.run(send_command(args.duration, args.intensity, args.oscillation, args.rotation)) | |
| if __name__ == "__main__": | |
| cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment