Created
January 7, 2026 05:05
-
-
Save PsychoSmiley/a52c24e2e2a44f16fdc4ed23d587b2f6 to your computer and use it in GitHub Desktop.
Local proxy Lovense Remote <-> Intiface Central; WebSocket bridge translates toy commands, runs on port 30010.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Lovense Remote API -> Intiface Central Proxy - IP address in-game: 127.0.0.1 | |
| Translates Lovense HTTP commands to Buttplug WebSocket protocol. | |
| """ | |
| import asyncio, json, ssl, sys, tempfile, threading, os | |
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| for pkg in ["websockets", "cryptography"]: | |
| try: __import__(pkg) | |
| except ImportError: | |
| import subprocess; subprocess.check_call([sys.executable, "-m", "pip", "install", pkg, "-q"]) | |
| import websockets | |
| from cryptography import x509 | |
| from cryptography.x509.oid import NameOID | |
| from cryptography.hazmat.primitives import hashes, serialization | |
| from cryptography.hazmat.primitives.asymmetric import rsa | |
| from datetime import datetime, timedelta | |
| # === CONFIG (edit these settings) === | |
| LR_PORT = 30010 # HTTPS port (Lovense PC default: 30010) | |
| LR_BIND = "0.0.0.0" # Bind address (0.0.0.0=all, 127.0.0.1=localhost only) | |
| IC_WS_URL = "ws://127.0.0.1:12345" # Intiface Central WebSocket URL | |
| VERBOSE = True # Show detailed command logs | |
| INVERT_LINEAR = False # Set True if linear device moves opposite to game | |
| USE_HTTPS = True # Set False for HTTP-only (port 20010) | |
| # === STATE === | |
| ws_conn = None | |
| devices = {} | |
| msg_id = 0 | |
| stroke_task = None | |
| # Lovense scales per API docs | |
| SCALE = {"Vibrate": 20, "Rotate": 20, "Thrusting": 20, "Fingering": 20, | |
| "Suction": 20, "Oscillate": 20, "All": 20, "Pump": 100, "Depth": 3} | |
| FAKE_TOY = {"code": 200, "type": "OK", "data": { | |
| "toys": json.dumps({"lr001": {"id": "lr001", "status": "1", "version": "", | |
| "name": "LR Bridge", "battery": 100, "nickName": "Intiface", | |
| "shortFunctionNames": ["v","r","p","t","f","s","d","o"], | |
| "fullFunctionNames": ["Vibrate","Rotate","Pump","Thrusting","Fingering","Suction","Depth","Oscillate"] | |
| }}), "platform": "pc", "appType": "remote"}} | |
| OK = {"code": 200, "type": "ok"} | |
| def log(tag, msg): print(f"[{tag}] {msg}", flush=True) | |
| def nid(): global msg_id; msg_id += 1; return msg_id | |
| # === BUTTPLUG PROTOCOL === | |
| async def bp_send(msg): | |
| if ws_conn: | |
| t = list(msg.keys())[0] | |
| if VERBOSE and t in ("LinearCmd", "ScalarCmd", "StopAllDevices"): | |
| log("->IC", f"{t}: {msg[t]}") | |
| await ws_conn.send(json.dumps([msg])) | |
| async def bp_linear(idx, pos, ms): | |
| await bp_send({"LinearCmd": {"Id": nid(), "DeviceIndex": idx, | |
| "Vectors": [{"Index": 0, "Duration": ms, "Position": pos}]}}) | |
| async def bp_scalar(idx, val, typ="Vibrate"): | |
| await bp_send({"ScalarCmd": {"Id": nid(), "DeviceIndex": idx, | |
| "Scalars": [{"Index": 0, "Scalar": val, "ActuatorType": typ}]}}) | |
| async def bp_stop_all(): | |
| await bp_send({"StopAllDevices": {"Id": nid()}}) | |
| # === DEVICE CONTROL === | |
| def has_linear(dev): return "LinearCmd" in dev.get("DeviceMessages", {}) | |
| def has_scalar(dev, typ): | |
| for a in dev.get("DeviceMessages", {}).get("ScalarCmd", []): | |
| if a.get("ActuatorType") == typ: return True | |
| return False | |
| async def do_vibrate(val): | |
| for i, d in devices.items(): | |
| if has_scalar(d, "Vibrate"): await bp_scalar(i, val, "Vibrate") | |
| async def do_rotate(val): | |
| for i, d in devices.items(): | |
| if has_scalar(d, "Rotate"): await bp_scalar(i, val, "Rotate") | |
| async def do_position(pos, duration_ms=500): | |
| """Move to position and hold.""" | |
| global stroke_task | |
| if stroke_task and not stroke_task.done(): | |
| stroke_task.cancel() | |
| try: await stroke_task | |
| except asyncio.CancelledError: pass | |
| stroke_task = None | |
| for i, d in devices.items(): | |
| if has_linear(d): await bp_linear(i, pos, duration_ms) | |
| log("CMD", f"Position -> {pos:.0%}") | |
| async def do_oscillate(speed, depth=1.0): | |
| """Continuous oscillation. | |
| speed: 0-1, controls stroke duration (faster = shorter ms) | |
| depth: 0-1, controls stroke range (0=none, 1=full 0.0-1.0) | |
| """ | |
| global stroke_task | |
| if stroke_task and not stroke_task.done(): | |
| stroke_task.cancel() | |
| try: await stroke_task | |
| except asyncio.CancelledError: pass | |
| stroke_task = None | |
| if speed <= 0: | |
| return | |
| async def loop(): | |
| # Range: depth controls stroke length around center (0.5) | |
| half = depth * 0.5 | |
| lo, hi = 0.5 - half, 0.5 + half | |
| # Duration: speed controls how fast (1.0=200ms, 0.1=800ms) | |
| ms = int(800 - speed * 600) | |
| ms = max(150, min(1000, ms)) | |
| log("CMD", f"Oscillate: speed={speed:.0%}, depth={depth:.0%}, range={lo:.2f}-{hi:.2f}, {ms}ms") | |
| while True: | |
| for i, d in devices.items(): | |
| if has_linear(d): await bp_linear(i, hi, ms) | |
| await asyncio.sleep(ms/1000) | |
| for i, d in devices.items(): | |
| if has_linear(d): await bp_linear(i, lo, ms) | |
| await asyncio.sleep(ms/1000) | |
| stroke_task = asyncio.create_task(loop()) | |
| async def do_stop_linear(): | |
| """Stop linear motion, move to neutral.""" | |
| global stroke_task | |
| if stroke_task and not stroke_task.done(): | |
| stroke_task.cancel() | |
| try: await stroke_task | |
| except asyncio.CancelledError: pass | |
| stroke_task = None | |
| for i, d in devices.items(): | |
| if has_linear(d): await bp_linear(i, 0.5, 500) | |
| log("CMD", "Linear stopped") | |
| async def do_stop(): | |
| global stroke_task | |
| if stroke_task: stroke_task.cancel(); stroke_task = None | |
| await bp_stop_all() | |
| log("CMD", "Stop") | |
| # === LOVENSE COMMAND HANDLING === | |
| def parse_actions(s): | |
| """Parse action string into list of (action, value) tuples. | |
| Handles: 'Vibrate:10' -> ('Vibrate', 10) | |
| 'Stop' -> ('Stop', None) | |
| 'Stroke:0-20' -> ('Stroke', (0, 20)) | |
| """ | |
| out = [] | |
| for p in s.split(','): | |
| p = p.strip() | |
| if not p: | |
| continue | |
| if p.lower() == 'stop': | |
| out.append(('Stop', None)) | |
| elif ':' in p: | |
| k, v = p.split(':', 1) | |
| k, v = k.strip(), v.strip() | |
| if k == 'Stroke' and '-' in v: | |
| # Stroke:min-max format (0-100 scale) | |
| try: | |
| parts = v.split('-') | |
| out.append(('Stroke', (int(parts[0]), int(parts[1])))) | |
| except: pass | |
| else: | |
| try: out.append((k, int(float(v)))) # Handle scientific notation | |
| except: pass | |
| return out | |
| async def handle_function(data): | |
| actions = parse_actions(data.get("action", "")) | |
| # Check for Stop action first (overrides everything) | |
| for act, val in actions: | |
| if act == "Stop": | |
| log("CMD", "Stop action") | |
| await do_stop() | |
| return | |
| vib = rot = osc = depth = pump = None # None = not specified | |
| stroke_range = None # (min, max) in 0-100 scale | |
| for act, val in actions: | |
| # Handle Stroke:min-max specially (val is tuple) | |
| if act == "Stroke" and isinstance(val, tuple): | |
| stroke_range = (val[0] / 100.0, val[1] / 100.0) # Normalize to 0-1 | |
| log("CMD", f"Stroke:{val[0]}-{val[1]} -> range {stroke_range[0]:.0%}-{stroke_range[1]:.0%}") | |
| continue | |
| # Normal actions with integer value | |
| scale = SCALE.get(act, 20) | |
| norm = max(0.0, min(1.0, val / scale)) | |
| log("CMD", f"{act}:{val} (/{scale}) -> {norm:.0%}") | |
| if act == "Vibrate": | |
| vib = norm if vib is None else max(vib, norm) | |
| elif act == "Rotate": | |
| rot = norm if rot is None else max(rot, norm) | |
| elif act in ("Thrusting", "Oscillate"): | |
| osc = norm if osc is None else max(osc, norm) | |
| elif act == "Pump": | |
| pump = norm if pump is None else max(pump, norm) # Pump → oscillation speed | |
| elif act == "Depth": | |
| depth = norm if depth is None else max(depth, norm) | |
| elif act in ("Fingering", "Suction"): | |
| # Flexer/Tenera toys - map to vibrate for other devices | |
| vib = norm if vib is None else max(vib, norm) | |
| elif act == "All": | |
| vib = rot = norm | |
| osc = norm | |
| # Send vibrate/rotate if specified (including 0 to stop!) | |
| if vib is not None: await do_vibrate(vib) | |
| if rot is not None: await do_rotate(rot) | |
| # Linear device control: | |
| # Thrusting = target position (games send alternating values) | |
| # Pump = oscillation speed (games send constant intensity) | |
| if pump is not None: | |
| # Pump controls continuous oscillation speed | |
| if pump > 0: | |
| await do_oscillate(pump, 1.0) # Full stroke depth | |
| else: | |
| await do_stop_linear() | |
| elif osc is not None: | |
| if stroke_range: | |
| lo, hi = stroke_range | |
| target = lo + osc * (hi - lo) | |
| else: | |
| target = osc # Thrusting:20 -> 1.0, Thrusting:0 -> 0.0 | |
| if INVERT_LINEAR: | |
| target = 1.0 - target | |
| await do_position(target) | |
| async def handle_position(data): | |
| """Direct position control (0-100).""" | |
| val = int(data.get("value", 50)) | |
| await do_position(val / 100.0) | |
| async def handle_cmd(data): | |
| cmd = data.get("command", "") | |
| if cmd == "Function": await handle_function(data) | |
| elif cmd == "Position": await handle_position(data) | |
| elif cmd == "Stop": await do_stop() | |
| elif cmd == "Pattern": | |
| s = data.get("strength", "10") | |
| try: | |
| v = int(s.split(';')[0]) | |
| await do_vibrate(v/20) | |
| await do_oscillate(v/20, 1.0) # Full depth | |
| except: pass | |
| # === WEBSOCKET CLIENT === | |
| async def ws_handler(msg): | |
| global devices | |
| try: | |
| for m in json.loads(msg): | |
| if "ServerInfo" in m: | |
| log("IC", f"Connected: {m['ServerInfo'].get('ServerName','?')}") | |
| await bp_send({"RequestDeviceList": {"Id": nid()}}) | |
| await bp_send({"StartScanning": {"Id": nid()}}) | |
| elif "DeviceList" in m: | |
| for d in m["DeviceList"].get("Devices", []): | |
| devices[d["DeviceIndex"]] = d | |
| log("IC", f"Device {d['DeviceIndex']}: {d['DeviceName']}") | |
| elif "DeviceAdded" in m: | |
| d = m["DeviceAdded"] | |
| devices[d["DeviceIndex"]] = d | |
| caps = ["Linear"] if has_linear(d) else [] | |
| log("IC", f"+ {d['DeviceName']} [{','.join(caps)}]") | |
| elif "DeviceRemoved" in m: | |
| i = m["DeviceRemoved"]["DeviceIndex"] | |
| if i in devices: del devices[i] | |
| log("IC", f"- Device {i}") | |
| elif "Error" in m: | |
| log("ERR", m["Error"].get("ErrorMessage", "?")) | |
| except: pass | |
| async def ws_client(): | |
| global ws_conn | |
| while True: | |
| try: | |
| async with websockets.connect(IC_WS_URL) as ws: | |
| ws_conn = ws | |
| log("IC", f"Connecting to {IC_WS_URL}") | |
| await bp_send({"RequestServerInfo": {"Id": nid(), "ClientName": "LR_Bridge", "MessageVersion": 3}}) | |
| async for msg in ws: await ws_handler(msg) | |
| except Exception as e: | |
| log("IC", f"Disconnected: {e}") | |
| ws_conn = None; devices.clear() | |
| await asyncio.sleep(3) | |
| # === HTTP SERVER === | |
| class Handler(BaseHTTPRequestHandler): | |
| loop = None | |
| def log_message(self, *a): pass | |
| def send_json(self, d): | |
| self.send_response(200) | |
| self.send_header('Content-Type', 'application/json') | |
| self.send_header('Access-Control-Allow-Origin', '*') | |
| self.end_headers() | |
| self.wfile.write(json.dumps(d).encode()) | |
| def do_GET(self): | |
| self.send_json({"status": "ok", "devices": len(devices)}) | |
| def do_POST(self): | |
| try: | |
| raw = self.rfile.read(int(self.headers.get('Content-Length', 0))) | |
| log("RAW", f"{self.path} | {raw}") # Raw bytes before ANY parsing | |
| body = raw.decode() | |
| data = json.loads(body) if body else {} | |
| cmd = data.get("command", "") | |
| if cmd == "GetToys": | |
| self.send_json(FAKE_TOY) | |
| elif cmd == "GetToyName": | |
| self.send_json({"code": 200, "data": ["LR Bridge"], "type": "OK"}) | |
| else: | |
| if self.loop: asyncio.run_coroutine_threadsafe(handle_cmd(data), self.loop) | |
| self.send_json(OK) | |
| except Exception as e: | |
| log("ERR", e) | |
| self.send_json(OK) | |
| def do_OPTIONS(self): | |
| self.send_response(200) | |
| self.send_header('Access-Control-Allow-Origin', '*') | |
| self.send_header('Access-Control-Allow-Methods', 'GET,POST,OPTIONS') | |
| self.send_header('Access-Control-Allow-Headers', 'Content-Type') | |
| self.end_headers() | |
| def make_ssl_context(): | |
| key = rsa.generate_private_key(65537, 2048) | |
| name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")]) | |
| cert = x509.CertificateBuilder().subject_name(name).issuer_name(name).public_key(key.public_key()).serial_number(1).not_valid_before(datetime.utcnow()).not_valid_after(datetime.utcnow() + timedelta(365)).sign(key, hashes.SHA256()) | |
| with tempfile.NamedTemporaryFile(delete=False) as cf, tempfile.NamedTemporaryFile(delete=False) as kf: | |
| cf.write(cert.public_bytes(serialization.Encoding.PEM)) | |
| kf.write(key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption())) | |
| ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER); ctx.load_cert_chain(cf.name, kf.name) | |
| os.unlink(cf.name); os.unlink(kf.name) | |
| return ctx | |
| def run_server(loop): | |
| Handler.loop = loop | |
| server = HTTPServer((LR_BIND, LR_PORT), Handler) | |
| if USE_HTTPS: | |
| server.socket = make_ssl_context().wrap_socket(server.socket, server_side=True) | |
| server.serve_forever() | |
| # === MAIN === | |
| async def main(): | |
| print("=" * 40) | |
| print("Lovense -> Intiface Proxy") | |
| print(f"{'HTTPS' if USE_HTTPS else 'HTTP'} on {LR_BIND}:{LR_PORT}") | |
| print("=" * 40) | |
| threading.Thread(target=run_server, args=(asyncio.get_event_loop(),), daemon=True).start() | |
| log("LR", f"{'HTTPS' if USE_HTTPS else 'HTTP'} on {LR_BIND}:{LR_PORT}") | |
| await ws_client() | |
| if __name__ == "__main__": | |
| try: asyncio.run(main()) | |
| except KeyboardInterrupt: print("\nBye!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment