Skip to content

Instantly share code, notes, and snippets.

@PsychoSmiley
Created January 7, 2026 05:05
Show Gist options
  • Select an option

  • Save PsychoSmiley/a52c24e2e2a44f16fdc4ed23d587b2f6 to your computer and use it in GitHub Desktop.

Select an option

Save PsychoSmiley/a52c24e2e2a44f16fdc4ed23d587b2f6 to your computer and use it in GitHub Desktop.
Local proxy Lovense Remote <-> Intiface Central; WebSocket bridge translates toy commands, runs on port 30010.
"""
Lovense Remote API -> Intiface Central Proxy - IP address in-game: 127.0.0.1
Translates Lovense HTTP commands to Buttplug WebSocket protocol.
"""
import asyncio, json, ssl, sys, tempfile, threading, os
from http.server import HTTPServer, BaseHTTPRequestHandler
for pkg in ["websockets", "cryptography"]:
try: __import__(pkg)
except ImportError:
import subprocess; subprocess.check_call([sys.executable, "-m", "pip", "install", pkg, "-q"])
import websockets
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from datetime import datetime, timedelta
# === CONFIG (edit these settings) ===
LR_PORT = 30010 # HTTPS port (Lovense PC default: 30010)
LR_BIND = "0.0.0.0" # Bind address (0.0.0.0=all, 127.0.0.1=localhost only)
IC_WS_URL = "ws://127.0.0.1:12345" # Intiface Central WebSocket URL
VERBOSE = True # Show detailed command logs
INVERT_LINEAR = False # Set True if linear device moves opposite to game
USE_HTTPS = True # Set False for HTTP-only (port 20010)
# === STATE ===
ws_conn = None
devices = {}
msg_id = 0
stroke_task = None
# Lovense scales per API docs
SCALE = {"Vibrate": 20, "Rotate": 20, "Thrusting": 20, "Fingering": 20,
"Suction": 20, "Oscillate": 20, "All": 20, "Pump": 100, "Depth": 3}
FAKE_TOY = {"code": 200, "type": "OK", "data": {
"toys": json.dumps({"lr001": {"id": "lr001", "status": "1", "version": "",
"name": "LR Bridge", "battery": 100, "nickName": "Intiface",
"shortFunctionNames": ["v","r","p","t","f","s","d","o"],
"fullFunctionNames": ["Vibrate","Rotate","Pump","Thrusting","Fingering","Suction","Depth","Oscillate"]
}}), "platform": "pc", "appType": "remote"}}
OK = {"code": 200, "type": "ok"}
def log(tag, msg): print(f"[{tag}] {msg}", flush=True)
def nid(): global msg_id; msg_id += 1; return msg_id
# === BUTTPLUG PROTOCOL ===
async def bp_send(msg):
if ws_conn:
t = list(msg.keys())[0]
if VERBOSE and t in ("LinearCmd", "ScalarCmd", "StopAllDevices"):
log("->IC", f"{t}: {msg[t]}")
await ws_conn.send(json.dumps([msg]))
async def bp_linear(idx, pos, ms):
await bp_send({"LinearCmd": {"Id": nid(), "DeviceIndex": idx,
"Vectors": [{"Index": 0, "Duration": ms, "Position": pos}]}})
async def bp_scalar(idx, val, typ="Vibrate"):
await bp_send({"ScalarCmd": {"Id": nid(), "DeviceIndex": idx,
"Scalars": [{"Index": 0, "Scalar": val, "ActuatorType": typ}]}})
async def bp_stop_all():
await bp_send({"StopAllDevices": {"Id": nid()}})
# === DEVICE CONTROL ===
def has_linear(dev): return "LinearCmd" in dev.get("DeviceMessages", {})
def has_scalar(dev, typ):
for a in dev.get("DeviceMessages", {}).get("ScalarCmd", []):
if a.get("ActuatorType") == typ: return True
return False
async def do_vibrate(val):
for i, d in devices.items():
if has_scalar(d, "Vibrate"): await bp_scalar(i, val, "Vibrate")
async def do_rotate(val):
for i, d in devices.items():
if has_scalar(d, "Rotate"): await bp_scalar(i, val, "Rotate")
async def do_position(pos, duration_ms=500):
"""Move to position and hold."""
global stroke_task
if stroke_task and not stroke_task.done():
stroke_task.cancel()
try: await stroke_task
except asyncio.CancelledError: pass
stroke_task = None
for i, d in devices.items():
if has_linear(d): await bp_linear(i, pos, duration_ms)
log("CMD", f"Position -> {pos:.0%}")
async def do_oscillate(speed, depth=1.0):
"""Continuous oscillation.
speed: 0-1, controls stroke duration (faster = shorter ms)
depth: 0-1, controls stroke range (0=none, 1=full 0.0-1.0)
"""
global stroke_task
if stroke_task and not stroke_task.done():
stroke_task.cancel()
try: await stroke_task
except asyncio.CancelledError: pass
stroke_task = None
if speed <= 0:
return
async def loop():
# Range: depth controls stroke length around center (0.5)
half = depth * 0.5
lo, hi = 0.5 - half, 0.5 + half
# Duration: speed controls how fast (1.0=200ms, 0.1=800ms)
ms = int(800 - speed * 600)
ms = max(150, min(1000, ms))
log("CMD", f"Oscillate: speed={speed:.0%}, depth={depth:.0%}, range={lo:.2f}-{hi:.2f}, {ms}ms")
while True:
for i, d in devices.items():
if has_linear(d): await bp_linear(i, hi, ms)
await asyncio.sleep(ms/1000)
for i, d in devices.items():
if has_linear(d): await bp_linear(i, lo, ms)
await asyncio.sleep(ms/1000)
stroke_task = asyncio.create_task(loop())
async def do_stop_linear():
"""Stop linear motion, move to neutral."""
global stroke_task
if stroke_task and not stroke_task.done():
stroke_task.cancel()
try: await stroke_task
except asyncio.CancelledError: pass
stroke_task = None
for i, d in devices.items():
if has_linear(d): await bp_linear(i, 0.5, 500)
log("CMD", "Linear stopped")
async def do_stop():
global stroke_task
if stroke_task: stroke_task.cancel(); stroke_task = None
await bp_stop_all()
log("CMD", "Stop")
# === LOVENSE COMMAND HANDLING ===
def parse_actions(s):
"""Parse action string into list of (action, value) tuples.
Handles: 'Vibrate:10' -> ('Vibrate', 10)
'Stop' -> ('Stop', None)
'Stroke:0-20' -> ('Stroke', (0, 20))
"""
out = []
for p in s.split(','):
p = p.strip()
if not p:
continue
if p.lower() == 'stop':
out.append(('Stop', None))
elif ':' in p:
k, v = p.split(':', 1)
k, v = k.strip(), v.strip()
if k == 'Stroke' and '-' in v:
# Stroke:min-max format (0-100 scale)
try:
parts = v.split('-')
out.append(('Stroke', (int(parts[0]), int(parts[1]))))
except: pass
else:
try: out.append((k, int(float(v)))) # Handle scientific notation
except: pass
return out
async def handle_function(data):
actions = parse_actions(data.get("action", ""))
# Check for Stop action first (overrides everything)
for act, val in actions:
if act == "Stop":
log("CMD", "Stop action")
await do_stop()
return
vib = rot = osc = depth = pump = None # None = not specified
stroke_range = None # (min, max) in 0-100 scale
for act, val in actions:
# Handle Stroke:min-max specially (val is tuple)
if act == "Stroke" and isinstance(val, tuple):
stroke_range = (val[0] / 100.0, val[1] / 100.0) # Normalize to 0-1
log("CMD", f"Stroke:{val[0]}-{val[1]} -> range {stroke_range[0]:.0%}-{stroke_range[1]:.0%}")
continue
# Normal actions with integer value
scale = SCALE.get(act, 20)
norm = max(0.0, min(1.0, val / scale))
log("CMD", f"{act}:{val} (/{scale}) -> {norm:.0%}")
if act == "Vibrate":
vib = norm if vib is None else max(vib, norm)
elif act == "Rotate":
rot = norm if rot is None else max(rot, norm)
elif act in ("Thrusting", "Oscillate"):
osc = norm if osc is None else max(osc, norm)
elif act == "Pump":
pump = norm if pump is None else max(pump, norm) # Pump → oscillation speed
elif act == "Depth":
depth = norm if depth is None else max(depth, norm)
elif act in ("Fingering", "Suction"):
# Flexer/Tenera toys - map to vibrate for other devices
vib = norm if vib is None else max(vib, norm)
elif act == "All":
vib = rot = norm
osc = norm
# Send vibrate/rotate if specified (including 0 to stop!)
if vib is not None: await do_vibrate(vib)
if rot is not None: await do_rotate(rot)
# Linear device control:
# Thrusting = target position (games send alternating values)
# Pump = oscillation speed (games send constant intensity)
if pump is not None:
# Pump controls continuous oscillation speed
if pump > 0:
await do_oscillate(pump, 1.0) # Full stroke depth
else:
await do_stop_linear()
elif osc is not None:
if stroke_range:
lo, hi = stroke_range
target = lo + osc * (hi - lo)
else:
target = osc # Thrusting:20 -> 1.0, Thrusting:0 -> 0.0
if INVERT_LINEAR:
target = 1.0 - target
await do_position(target)
async def handle_position(data):
"""Direct position control (0-100)."""
val = int(data.get("value", 50))
await do_position(val / 100.0)
async def handle_cmd(data):
cmd = data.get("command", "")
if cmd == "Function": await handle_function(data)
elif cmd == "Position": await handle_position(data)
elif cmd == "Stop": await do_stop()
elif cmd == "Pattern":
s = data.get("strength", "10")
try:
v = int(s.split(';')[0])
await do_vibrate(v/20)
await do_oscillate(v/20, 1.0) # Full depth
except: pass
# === WEBSOCKET CLIENT ===
async def ws_handler(msg):
global devices
try:
for m in json.loads(msg):
if "ServerInfo" in m:
log("IC", f"Connected: {m['ServerInfo'].get('ServerName','?')}")
await bp_send({"RequestDeviceList": {"Id": nid()}})
await bp_send({"StartScanning": {"Id": nid()}})
elif "DeviceList" in m:
for d in m["DeviceList"].get("Devices", []):
devices[d["DeviceIndex"]] = d
log("IC", f"Device {d['DeviceIndex']}: {d['DeviceName']}")
elif "DeviceAdded" in m:
d = m["DeviceAdded"]
devices[d["DeviceIndex"]] = d
caps = ["Linear"] if has_linear(d) else []
log("IC", f"+ {d['DeviceName']} [{','.join(caps)}]")
elif "DeviceRemoved" in m:
i = m["DeviceRemoved"]["DeviceIndex"]
if i in devices: del devices[i]
log("IC", f"- Device {i}")
elif "Error" in m:
log("ERR", m["Error"].get("ErrorMessage", "?"))
except: pass
async def ws_client():
global ws_conn
while True:
try:
async with websockets.connect(IC_WS_URL) as ws:
ws_conn = ws
log("IC", f"Connecting to {IC_WS_URL}")
await bp_send({"RequestServerInfo": {"Id": nid(), "ClientName": "LR_Bridge", "MessageVersion": 3}})
async for msg in ws: await ws_handler(msg)
except Exception as e:
log("IC", f"Disconnected: {e}")
ws_conn = None; devices.clear()
await asyncio.sleep(3)
# === HTTP SERVER ===
class Handler(BaseHTTPRequestHandler):
loop = None
def log_message(self, *a): pass
def send_json(self, d):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(d).encode())
def do_GET(self):
self.send_json({"status": "ok", "devices": len(devices)})
def do_POST(self):
try:
raw = self.rfile.read(int(self.headers.get('Content-Length', 0)))
log("RAW", f"{self.path} | {raw}") # Raw bytes before ANY parsing
body = raw.decode()
data = json.loads(body) if body else {}
cmd = data.get("command", "")
if cmd == "GetToys":
self.send_json(FAKE_TOY)
elif cmd == "GetToyName":
self.send_json({"code": 200, "data": ["LR Bridge"], "type": "OK"})
else:
if self.loop: asyncio.run_coroutine_threadsafe(handle_cmd(data), self.loop)
self.send_json(OK)
except Exception as e:
log("ERR", e)
self.send_json(OK)
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET,POST,OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def make_ssl_context():
key = rsa.generate_private_key(65537, 2048)
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")])
cert = x509.CertificateBuilder().subject_name(name).issuer_name(name).public_key(key.public_key()).serial_number(1).not_valid_before(datetime.utcnow()).not_valid_after(datetime.utcnow() + timedelta(365)).sign(key, hashes.SHA256())
with tempfile.NamedTemporaryFile(delete=False) as cf, tempfile.NamedTemporaryFile(delete=False) as kf:
cf.write(cert.public_bytes(serialization.Encoding.PEM))
kf.write(key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()))
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER); ctx.load_cert_chain(cf.name, kf.name)
os.unlink(cf.name); os.unlink(kf.name)
return ctx
def run_server(loop):
Handler.loop = loop
server = HTTPServer((LR_BIND, LR_PORT), Handler)
if USE_HTTPS:
server.socket = make_ssl_context().wrap_socket(server.socket, server_side=True)
server.serve_forever()
# === MAIN ===
async def main():
print("=" * 40)
print("Lovense -> Intiface Proxy")
print(f"{'HTTPS' if USE_HTTPS else 'HTTP'} on {LR_BIND}:{LR_PORT}")
print("=" * 40)
threading.Thread(target=run_server, args=(asyncio.get_event_loop(),), daemon=True).start()
log("LR", f"{'HTTPS' if USE_HTTPS else 'HTTP'} on {LR_BIND}:{LR_PORT}")
await ws_client()
if __name__ == "__main__":
try: asyncio.run(main())
except KeyboardInterrupt: print("\nBye!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment