Created
January 26, 2018 20:00
-
-
Save qdot/93c231c16b22d5ced907747b1b81c1b4 to your computer and use it in GitHub Desktop.
Luxuria Suburbia Interaction Code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division; | |
from memorpy import *; | |
# ^ this is from https://github.com/n1nj4sec/memorpy/ and | |
# it's why I'm stuck in python 2.7... replace asap | |
import websocket; | |
import threading; | |
import select; | |
import time; | |
import json; | |
# bridge between luxuria superbia and buttplugio | |
# when you score points in luxuria they are instantly added to a "cap", | |
# which the visible score then counts up to. | |
# | |
# starting from the base address of the executable in memory, the | |
# pointer chain (in cheatengine style) for the cap is 0x008D7D28, 0x19C, 0x1A4, 0x15C, 0x2C4, 0x38. | |
# | |
# if you'd rather use the display score directly, which LSSampler | |
# now is, it lives four bytes further in at 0x008D7D28, 0x19C, 0x1A4, 0x15C, 0x2C4, 0x3C | |
class CEPointer: | |
def __init__( self, offsets, modName = "LuxuriaSuperbia.exe" ): | |
self._offsets = offsets; | |
self._modName = modName; | |
def deReference( self, w ): | |
# get the base address for process | |
baseModule = ( [ x for x in w.process.list_modules() if x.szModule == self._modName ] + [ None ] )[0]; | |
if( baseModule == None ): | |
raise "can't find luxuria's module"; | |
currentAddress = baseModule.modBaseAddr; | |
for o in self._offsets[:-1]: | |
# increment | |
currentAddress += o; | |
# dereference | |
currentAddress = w.Address( currentAddress ).read( "uint" ); | |
currentAddress += self._offsets[-1]; | |
return w.Address( currentAddress ); | |
# this sampler is designed to target the display score rather than the | |
# point cap, and just calculates the delta from that... in future I | |
# might make it so that it migrates between speeds over some length | |
# of time. | |
class LSSampler: | |
def __init__( self, a ): | |
self._scorePointer = a; | |
self._previousScore = self._scorePointer.read( "uint" ); | |
self._previousTime = time.time(); | |
self._speed = 0; | |
def takeSample( self ): | |
newScore = self._scorePointer.read( "uint" ); | |
newTime = time.time(); | |
self._speed = (newScore - self._previousScore) / (newTime - self._previousTime); | |
self._previousScore = newScore; | |
self._previousTime = newTime; | |
return self.pointVelocity; | |
@property | |
def pointVelocity( self ): | |
return self._speed; | |
class BPIOClient: | |
def __init__( self, address = "ws://localhost:12345/buttplug" ): | |
self._ws = websocket.create_connection( address ); | |
self._nextMessageID = 1; | |
self._lastPingTime = time.time(); | |
self._pendingReplies = {}; | |
self._stateFlags = {}; | |
self._devices = {}; | |
self._messageReplyEvents = {}; | |
self._motorCap = 80; | |
self._motorFloor = 0.10; | |
self._timeOut = 1.0; | |
self._messagingLock = threading.Lock(); | |
self._onConnect(); | |
def _receiveMessages( self ): | |
while( self._ws.connected ): | |
r, w, e = select.select( ( self._ws.sock, ), (), (), 0.5 ); | |
if( r ): | |
messageString = self._ws.recv(); | |
messageObjects = json.loads( messageString ); | |
for messageObject in messageObjects: | |
for key, values in messageObject.items(): | |
self._addReply( values[ "Id" ], key, values ); | |
def _addReply( self, id, k, v ): | |
v[ "MessageType" ] = k; | |
if( id == 0 ): | |
# reserved for system messages | |
if( k == "ScanningFinished" ): | |
if( "scanningDevices" in self._stateFlags ): | |
self._stateFlags[ "scanningDevices" ].set(); | |
if( k == "DeviceAdded" ): | |
self._addDevice( v ); | |
if( k == "DeviceRemoved" ): | |
self._removeDevice( v ); | |
else: | |
self._pendingReplies[ id ] = v; | |
if( id in self._messageReplyEvents ): | |
self._messageReplyEvents[ id ].set(); | |
def _doPing( self, interval ): | |
event = threading.Event(); | |
while not event.wait( interval ): | |
reply = self.sendMessage( "Ping", {} ); | |
self._lastPingTime = time.time(); | |
def sendMessage( self, msg, args = {} ): | |
with self._messagingLock: | |
args[ "Id" ] = self._nextMessageID; | |
self._nextMessageID += 1; | |
# because we are receiving messages in another | |
# thread, there is a risk that the reply will | |
# come back from ws.send before we get the event | |
# in place - this causes weird problems where | |
# stuff goes to the messaging queue and is then | |
# never retrieved | |
jmsg = json.dumps( [ { msg: args } ] ); | |
self._messageReplyEvents[ args[ "Id" ] ] = threading.Event(); | |
self._ws.send( jmsg ); | |
# wait for the response | |
response = None; | |
if( self._messageReplyEvents[ args[ "Id" ] ].wait( self._timeOut ) == True ): | |
response = self._pendingReplies[ args[ "Id" ] ]; | |
del( self._pendingReplies[ args[ "Id" ] ] ); | |
else: | |
print( "message %s (#%i) timed out after %f seconds" % ( msg, args[ "Id" ], self._timeOut ) ); | |
print( "pending reply queue is", self._pendingReplies ); | |
del( self._messageReplyEvents[ args[ "Id" ] ] ); | |
return response; | |
def _setDevices( self, devList ): | |
newDevs = {}; | |
for device in devList: | |
newDevs[ device[ "DeviceIndex" ] ] = device; | |
self._devices = newDevs; | |
def broadcastMotorSpeed( self, speed ): | |
motorSpeed = min( 1, max( self._motorFloor, speed / self._motorCap ) ); | |
print( "broadcastMotorSpeed mapped a pV of", speed, "to motorspeed of", motorSpeed, "-> sending" ); | |
for k, v in self._devices.items(): | |
if( "SingleMotorVibrateCmd" in v[ "DeviceMessages" ] ): | |
self.sendMessage( "SingleMotorVibrateCmd", { "DeviceIndex": k, "Speed": motorSpeed } ); | |
def _addDevice( self, device ): | |
pass; | |
def _removeDevice( self, device ): | |
pass; | |
def _onConnect( self ): | |
# setup the message receiving handler | |
thread = threading.Thread( target = self._receiveMessages, args = () ); | |
thread.setDaemon( True ); | |
thread.start(); | |
self._rmThread = thread; | |
# first message always seeks server info | |
reply = self.sendMessage( "RequestServerInfo", {"ClientName": "LuxuriaBridge"} ); | |
pingInterval = reply[ "MaxPingTime" ] / 1000 / 2; | |
# setup the ping thread | |
thread = threading.Thread( target = self._doPing, args = ( pingInterval, ) ); | |
thread.setDaemon( True ); | |
thread.start(); | |
self._dpThread = thread; | |
def scanDevices( self ): | |
self._stateFlags[ "scanningDevices" ] = threading.Event(); | |
self.sendMessage( "StartScanning" ); | |
result = self._stateFlags[ "scanningDevices" ].wait( 10 ); | |
del( self._stateFlags[ "scanningDevices" ] ); | |
if( result == False ): | |
return False; | |
result = self.sendMessage( "RequestDeviceList" ); | |
if( result == None ): | |
return False; | |
self._setDevices( result[ "Devices" ] ); | |
return True; | |
if( __name__ == "__main__" ): | |
worker = MemWorker( name = "LuxuriaSuperbia.exe" ); | |
# this is the "point cap", that display score counts up to | |
#scorePointer = CEPointer( [ 0x008D7D28, 0x19C, 0x1A4, 0x15C, 0x2C4, 0x38 ] ); | |
# this is the display score | |
scorePointer = CEPointer( [ 0x008D7D28, 0x19C, 0x1A4, 0x15C, 0x2C4, 0x3C ] ); | |
scoreAddress = scorePointer.deReference(worker); | |
sampler = LSSampler( scoreAddress ); | |
bpio = BPIOClient(); | |
r = bpio.scanDevices(); | |
while( True ): | |
pV = sampler.takeSample(); | |
bpio.broadcastMotorSpeed( pV ); | |
time.sleep( 0.25 ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment