Skip to content

Instantly share code, notes, and snippets.

@qdot
Created January 26, 2018 20:00
Show Gist options
  • Save qdot/93c231c16b22d5ced907747b1b81c1b4 to your computer and use it in GitHub Desktop.
Save qdot/93c231c16b22d5ced907747b1b81c1b4 to your computer and use it in GitHub Desktop.
Luxuria Suburbia Interaction Code
from __future__ import division;
from memorpy import *;
# ^ this is from https://github.com/n1nj4sec/memorpy/ and
# it's why I'm stuck in python 2.7... replace asap
import websocket;
import threading;
import select;
import time;
import json;
# bridge between luxuria superbia and buttplugio
# when you score points in luxuria they are instantly added to a "cap",
# which the visible score then counts up to.
#
# starting from the base address of the executable in memory, the
# pointer chain (in cheatengine style) for the cap is 0x008D7D28, 0x19C, 0x1A4, 0x15C, 0x2C4, 0x38.
#
# if you'd rather use the display score directly, which LSSampler
# now is, it lives four bytes further in at 0x008D7D28, 0x19C, 0x1A4, 0x15C, 0x2C4, 0x3C
class CEPointer:
def __init__( self, offsets, modName = "LuxuriaSuperbia.exe" ):
self._offsets = offsets;
self._modName = modName;
def deReference( self, w ):
# get the base address for process
baseModule = ( [ x for x in w.process.list_modules() if x.szModule == self._modName ] + [ None ] )[0];
if( baseModule == None ):
raise "can't find luxuria's module";
currentAddress = baseModule.modBaseAddr;
for o in self._offsets[:-1]:
# increment
currentAddress += o;
# dereference
currentAddress = w.Address( currentAddress ).read( "uint" );
currentAddress += self._offsets[-1];
return w.Address( currentAddress );
# this sampler is designed to target the display score rather than the
# point cap, and just calculates the delta from that... in future I
# might make it so that it migrates between speeds over some length
# of time.
class LSSampler:
def __init__( self, a ):
self._scorePointer = a;
self._previousScore = self._scorePointer.read( "uint" );
self._previousTime = time.time();
self._speed = 0;
def takeSample( self ):
newScore = self._scorePointer.read( "uint" );
newTime = time.time();
self._speed = (newScore - self._previousScore) / (newTime - self._previousTime);
self._previousScore = newScore;
self._previousTime = newTime;
return self.pointVelocity;
@property
def pointVelocity( self ):
return self._speed;
class BPIOClient:
def __init__( self, address = "ws://localhost:12345/buttplug" ):
self._ws = websocket.create_connection( address );
self._nextMessageID = 1;
self._lastPingTime = time.time();
self._pendingReplies = {};
self._stateFlags = {};
self._devices = {};
self._messageReplyEvents = {};
self._motorCap = 80;
self._motorFloor = 0.10;
self._timeOut = 1.0;
self._messagingLock = threading.Lock();
self._onConnect();
def _receiveMessages( self ):
while( self._ws.connected ):
r, w, e = select.select( ( self._ws.sock, ), (), (), 0.5 );
if( r ):
messageString = self._ws.recv();
messageObjects = json.loads( messageString );
for messageObject in messageObjects:
for key, values in messageObject.items():
self._addReply( values[ "Id" ], key, values );
def _addReply( self, id, k, v ):
v[ "MessageType" ] = k;
if( id == 0 ):
# reserved for system messages
if( k == "ScanningFinished" ):
if( "scanningDevices" in self._stateFlags ):
self._stateFlags[ "scanningDevices" ].set();
if( k == "DeviceAdded" ):
self._addDevice( v );
if( k == "DeviceRemoved" ):
self._removeDevice( v );
else:
self._pendingReplies[ id ] = v;
if( id in self._messageReplyEvents ):
self._messageReplyEvents[ id ].set();
def _doPing( self, interval ):
event = threading.Event();
while not event.wait( interval ):
reply = self.sendMessage( "Ping", {} );
self._lastPingTime = time.time();
def sendMessage( self, msg, args = {} ):
with self._messagingLock:
args[ "Id" ] = self._nextMessageID;
self._nextMessageID += 1;
# because we are receiving messages in another
# thread, there is a risk that the reply will
# come back from ws.send before we get the event
# in place - this causes weird problems where
# stuff goes to the messaging queue and is then
# never retrieved
jmsg = json.dumps( [ { msg: args } ] );
self._messageReplyEvents[ args[ "Id" ] ] = threading.Event();
self._ws.send( jmsg );
# wait for the response
response = None;
if( self._messageReplyEvents[ args[ "Id" ] ].wait( self._timeOut ) == True ):
response = self._pendingReplies[ args[ "Id" ] ];
del( self._pendingReplies[ args[ "Id" ] ] );
else:
print( "message %s (#%i) timed out after %f seconds" % ( msg, args[ "Id" ], self._timeOut ) );
print( "pending reply queue is", self._pendingReplies );
del( self._messageReplyEvents[ args[ "Id" ] ] );
return response;
def _setDevices( self, devList ):
newDevs = {};
for device in devList:
newDevs[ device[ "DeviceIndex" ] ] = device;
self._devices = newDevs;
def broadcastMotorSpeed( self, speed ):
motorSpeed = min( 1, max( self._motorFloor, speed / self._motorCap ) );
print( "broadcastMotorSpeed mapped a pV of", speed, "to motorspeed of", motorSpeed, "-> sending" );
for k, v in self._devices.items():
if( "SingleMotorVibrateCmd" in v[ "DeviceMessages" ] ):
self.sendMessage( "SingleMotorVibrateCmd", { "DeviceIndex": k, "Speed": motorSpeed } );
def _addDevice( self, device ):
pass;
def _removeDevice( self, device ):
pass;
def _onConnect( self ):
# setup the message receiving handler
thread = threading.Thread( target = self._receiveMessages, args = () );
thread.setDaemon( True );
thread.start();
self._rmThread = thread;
# first message always seeks server info
reply = self.sendMessage( "RequestServerInfo", {"ClientName": "LuxuriaBridge"} );
pingInterval = reply[ "MaxPingTime" ] / 1000 / 2;
# setup the ping thread
thread = threading.Thread( target = self._doPing, args = ( pingInterval, ) );
thread.setDaemon( True );
thread.start();
self._dpThread = thread;
def scanDevices( self ):
self._stateFlags[ "scanningDevices" ] = threading.Event();
self.sendMessage( "StartScanning" );
result = self._stateFlags[ "scanningDevices" ].wait( 10 );
del( self._stateFlags[ "scanningDevices" ] );
if( result == False ):
return False;
result = self.sendMessage( "RequestDeviceList" );
if( result == None ):
return False;
self._setDevices( result[ "Devices" ] );
return True;
if( __name__ == "__main__" ):
worker = MemWorker( name = "LuxuriaSuperbia.exe" );
# this is the "point cap", that display score counts up to
#scorePointer = CEPointer( [ 0x008D7D28, 0x19C, 0x1A4, 0x15C, 0x2C4, 0x38 ] );
# this is the display score
scorePointer = CEPointer( [ 0x008D7D28, 0x19C, 0x1A4, 0x15C, 0x2C4, 0x3C ] );
scoreAddress = scorePointer.deReference(worker);
sampler = LSSampler( scoreAddress );
bpio = BPIOClient();
r = bpio.scanDevices();
while( True ):
pV = sampler.takeSample();
bpio.broadcastMotorSpeed( pV );
time.sleep( 0.25 );
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment