Skip to content

Instantly share code, notes, and snippets.

@NanoPi
Last active February 18, 2023 08:59
Show Gist options
  • Select an option

  • Save NanoPi/9b627df41e1a4cbc8c251fd754bc4031 to your computer and use it in GitHub Desktop.

Select an option

Save NanoPi/9b627df41e1a4cbc8c251fd754bc4031 to your computer and use it in GitHub Desktop.
FreePIE script, experimental DSU Server that always has a single DS4 controller that does nothing except output IMU data. Converts raw mouse movements to the gyroscope's Yaw and Pitch with a little bit of smoothing.
import socket
import struct
import select
import time
import random
from binascii import crc32
DSUC_VersionReq = 0x100000
DSUS_VersionRsp = 0x100000
DSUC_ListPorts = 0x100001
DSUS_PortInfo = 0x100001
DSUC_PadDataReq = 0x100002
DSUS_PadDataRsp = 0x100002
protocolver = 1001
if starting:
system.setThreadTiming(TimingTypes.ThreadYield)
system.threadExecutionInterval = 15
p0enabled = True
p1enabled = False
p2enabled = False
p3enabled = False
global sock
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(0)
sock.bind(('', 26760))
serverid = random.randint(0, 0xFFFFFFFF)
global clients
clients = {}
p0list = {}
p0rem = []
p0count = 0
#counter the game's anti drift
pitchoffset = 0.0
yawoffset = 0.0
rlist,wlist,xlist = select.select([sock],[],[],0.0)
for r in rlist:
try:
data,address = r.recvfrom(1024)
except:
continue
magic,protocolver,msglen,crc,clientid = struct.unpack_from('<4s2HiI', data)
buffer = data[:8] + struct.pack('<I',0) + data[12:]
if msglen+16 != len(data):
continue
vc = crc32(buffer)
if vc != crc:
continue
msgtype, = struct.unpack_from('<1I', data, 16)
if msgtype == DSUC_VersionReq:
msg = struct.pack("<IH",DSUS_VersionRsp,protocolver)
header = struct.pack("<4s2HiI","DSUS",protocolver,len(msg),0,serverid)
rc=crc32(header+msg)
header2 = struct.pack("<4s2HiI","DSUS",protocolver,len(msg),rc,serverid)
sock.sendto(header2+msg, address)
if msgtype == DSUC_ListPorts:
wpl = []
wanted, = struct.unpack_from('<1I', data, 20)
if wanted >0 and len(data) >24:
wpn, = struct.unpack_from('<1B', data, 24)
wpl += [wpn]
if wanted >1 and len(data) >25:
wpn, = struct.unpack_from('<1B', data, 25)
wpl += [wpn]
if wanted >2 and len(data) >26:
wpn, = struct.unpack_from('<1B', data, 26)
wpl += [wpn]
if wanted >3 and len(data) >27:
wpn, = struct.unpack_from('<1B', data, 27)
wpl += [wpn]
msg = struct.pack("<I",DSUS_PortInfo)
enabled = [p0enabled,p1enabled,p2enabled,p3enabled]
for p in wpl:
if enabled[p]:
state = 2 # 0=disconnected, 1=reserved, 2=connected
model = 2 # 0=none, 1=DS3, 2=DS4
connection = 2 # 0=none, 1=usb, 2=bt
battery = 5 # 0=none, 1=dying, 2=low, 3=medium, 4=high, 5=full, 0xEE=charging, 0xEF=charged
active = 1 # 0=no, 1=yes
portinfo = struct.pack("<4B6s2B",p,state,model,connection,"FPIE0{}".format(p),battery,active)
header = struct.pack("<4s2HiI","DSUS",protocolver,len(msg+portinfo),0,serverid)
rc=crc32(header+msg+portinfo)
header2 = struct.pack("<4s2HiI","DSUS",protocolver,len(msg+portinfo),rc,serverid)
sock.sendto(header2+msg+portinfo, address)
if msgtype == DSUC_PadDataReq:
regflags,slotnum,macaddr = struct.unpack_from('<2B6s', data, 20)
timestamp = time.clock()
if p0enabled and (regflags == 0 or (regflags&1 != 0 and slotnum == 0) or (regflags&2 != 0 and macaddr == "FPIE00")):
p0list[address] = timestamp
p0count += 1
if p0count % 100 == 0:
p0clients = len(p0list)
diagnostics.watch(p0clients)
motiontimestamp = time.clock() * 1000000
# acceleration in Earth Gravity
ax = 0.0
ay = 0.0
az = 0.0
# rotation speed in degrees per second
pitch = 0.0-mouse.deltaY
yaw = mouse.deltaX
roll = 0.0
#"""
pitch = filters.simple(pitch,0.3)
yaw = filters.simple(yaw,0.3)
pitch = pitch if abs(pitch)>0.1 else 0.0
yaw = yaw if abs(yaw)>0.1 else 0.0
#"""
msg = struct.pack("<I",DSUS_PadDataRsp)
state = 2 # 0=disconnected, 1=reserved, 2=connected
model = 2 # 0=none, 1=DS3, 2=DS4
connection = 2 # 0=none, 1=usb, 2=bt
battery = 5 # 0=none, 1=dying, 2=low, 3=medium, 4=high, 5=full, 0xEE=charging, 0xEF=charged
active = 1 # 0=no, 1=yes
port = 0
portinfo = struct.pack("<4B6s2BI",port,state,model,connection,"FPIE00",battery,active,p0count)
portdata = struct.pack("<22B2H2B2HQ6f",0,0,0,0,128,128,128,128,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,motiontimestamp,ax,ay,az,pitch,yaw,roll)
header = struct.pack("<4s2HiI","DSUS",protocolver,len(msg+portinfo+portdata),0,serverid)
rc=crc32(header + msg + portinfo + portdata)
header2 = struct.pack("<4s2HiI","DSUS",protocolver,len(msg+portinfo+portdata),rc,serverid)
for ip,port in p0list:
address = (ip,port)
timestamp = p0list[address]
if time.clock()-timestamp > 5.0:
p0rem += [address]
sock.sendto(header2 + msg + portinfo + portdata, address)
# motion notes
# acceleration in Earth Gravity
# which side of DS4 is facing down if motionless:
# X -1.0=left grip, +1.0=right grip
# Y -1.0=bottom, +1.0=top
# Z -1.0=USB port and light, +1.0=EXT/headset port
# rotation speed in degrees per second
# pitch +=up, -=down
# yaw +=right, -=left
# roll +=right, -=left
if len(p0rem) > 0:
for a in p0rem:
p0list.pop(a,None)
p0rem = []
if stopping:
sock.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment