Skip to content

Instantly share code, notes, and snippets.

@Staars
Created January 20, 2024 08:26
Show Gist options
  • Save Staars/9fb3c2a442fc4dab72ee1b0baeae86f2 to your computer and use it in GitHub Desktop.
Save Staars/9fb3c2a442fc4dab72ee1b0baeae86f2 to your computer and use it in GitHub Desktop.
BPR2S Air mouse with mic input
# Simple Berry driver for the BPR2S Air mouse (a cheap BLE HID controller)
import BLE
class ADPCM
static indexTable = [
-1, -1, -1, -1, 2, 4, 6, 8,
-1, -1, -1, -1, 2, 4, 6, 8
]
static stepSizeTable = [
7, 8, 9, 10, 11, 12, 13, 14, 16, 17,
19, 21, 23, 25, 28, 31, 34, 37, 41, 45,
50, 55, 60, 66, 73, 80, 88, 97, 107, 118,
130, 143, 157, 173, 190, 209, 230, 253, 279, 307,
337, 371, 408, 449, 494, 544, 598, 658, 724, 796,
876, 963, 1060, 1166, 1282, 1411, 1552, 1707, 1878, 2066,
2272, 2499, 2749, 3024, 3327, 3660, 4026, 4428, 4871, 5358,
5894, 6484, 7132, 7845, 8630, 9493, 10442, 11487, 12635, 13899,
15289, 16818, 18500, 20350, 22385, 24623, 27086, 29794, 32767
]
var statePrevSample, statePrevIndex
# util to clamp a number within a given range
def clamp(num, min, max)
return num <= min ? min : num >= max ? max : num
end
def decodeAdpcm(inputBuffer)
var outputBuffer = bytes(-((size(inputBuffer)-3) * 4))
var outputBufferOffset = 0
self.statePrevSample = inputBuffer.geti(0,-2)
self.statePrevIndex = inputBuffer[2]
for inputBufferOffset:range(3,size(inputBuffer) - 1)
var b = inputBuffer[inputBufferOffset]
outputBuffer.seti(outputBufferOffset,self.decodeSample((b >> 4) & 0xF),2)
outputBuffer.seti(outputBufferOffset+2,self.decodeSample(b & 0xF),2)
outputBufferOffset += 4
end
return outputBuffer;
end
def decodeSample(sample)
var predSample = self.statePrevSample
var index = self.statePrevIndex
var step = self.stepSizeTable[index]
var difference = step >> 3
# compute difference and new predicted value
if (sample & 0x4) difference += step end
if (sample & 0x2) difference += (step >> 1) end
if (sample & 0x1) difference += (step >> 2) end
# handle sign bit
predSample += (sample & 0x8) ? -difference : difference
# find new index value
index += self.indexTable[sample]
index = self.clamp(index, 0, 88)
# clamp output value
predSample = self.clamp(predSample, -32768, 32767)
self.statePrevSample = predSample
self.statePrevIndex = index
predSample <<= 2 # amplify
return predSample
end
end
var adpcm = ADPCM()
class BLE_BPR2S : Driver
var buf
var connecting, connected
var step
var adpcm, recording, rec_counter, start_rec, save_rec
def init(MAC,addr_type)
var cbp = tasmota.gen_cb(/e,o,u,h->self.cb(e,o,u,h))
self.buf = bytes(-256)
BLE.conn_cb(cbp,self.buf)
BLE.set_MAC(bytes(MAC),addr_type)
print("BLE: will try to connect to BPR2S with MAC:",MAC)
self.connect()
self.step = 0
self.rec_counter = 0
self.adpcm = bytes(30000)
tasmota.add_fast_loop(/-> BLE.loop())
end
def connect()
self.connecting = true;
self.connected = false;
BLE.set_svc("1812",true)
BLE.set_chr("2a4a") # the first characteristic we have to read
BLE.run(1) # read
end
def every_second()
if (self.connecting == false && self.connected == false)
print("BLE: try to reconnect BPR2S")
self.subMicRX()
self.connecting = true
self.step = 3
end
if self.recording == true
if self.rec_counter == size(self.adpcm)
print("ready")
self.recording = false
self.rec_counter = 0
BLE.run(5)
else
self.rec_counter = size(self.adpcm)
end
print(self.rec_counter)
end
end
def every_50ms()
if self.start_rec == true
self.writeCharTX(bytes("0c00)")) # mic open
self.start_rec = false
end
end
def handle_read_CB(uuid) # uuid is the callback characteristic
self.connected = true;
# we just have to read these characteristics before we can finally subscribe
if uuid == 0x2a4a # did receive HID info
BLE.set_chr("2a4b")
BLE.run(1) # read next characteristic
elif uuid == 0x2a4b # did receive HID report map
BLE.set_chr("2a4d")
BLE.run(1) # read to trigger notifications of the HID device
elif uuid == 0x2a4d # did receive HID report
# print(self.buf[1..self.buf[0]])
BLE.set_chr("2a4d")
BLE.run(3) # subscribe
end
end
def saveADPCM()
var f = open("audio.raw", "w")
f.write(self.adpcm)
f.close()
end
#-
Service: UUID: AB5E0001-5A21-4F05-BC7D-AF01F617B664
AB5E0002-5A21-4F05-BC7D-AF01F617B664 , handle: 0x0048 , ['Read', 'WriteNoResp', 'Write']
AB5E0003-5A21-4F05-BC7D-AF01F617B664 , handle: 0x004a , ['Read', 'Notify']
AB5E0004-5A21-4F05-BC7D-AF01F617B664 , handle: 0x004d , ['Read', 'Notify']
-#
def handle_HID_notification(h)
import mqtt
import path
var t = "key"
var v = ""
if h == 42
var k = self.buf[3]
if k == 0x65
v = "square"
elif k == 0x4f
v = "right"
elif k == 0x50
v = "left"
elif k == 0x51
v = "down"
elif k == 0x52
v = "up"
elif k == 0x2a
v = "back"
end
elif h == 38
var k = self.buf[1]
if k == 0x30
v = "on"
elif k == 0xe2
v = "mute"
elif k == 0x23
v = "triangle"
#self.save_rec = true
self.saveADPCM()
elif k == 0x21
v = "circle"
self.adpcm.clear()
self.start_rec = true
elif k == 0x41
v = "set"
elif k == 0x24
v = "return"
elif k == 0xea
v = "minus"
elif k == 0xe9
v = "plus"
end
elif h == 34
t = "mouse"
var x = self.buf.getbits(12,12)
if x > 2048
x -= 4096
end
var y = self.buf.getbits(24,12)
if y > 2048
y -= 4096
end
v = format('{"x":%i,"y":%i}',x,y) # stupid idea to publish from fast_loop, but for demonstration ...
else
print("notified by:",h)
end
if v != ''
mqtt.publish("tele/BPR2S",format('{"%s":"%s"}',t,v))
# else # will be triggered on button release too
# print(self.buf[1..self.buf[0]],h) # show the packet as byte buffer
end
end
def writeCharTX(payload)
self.buf[0] = size(payload)
self.buf.setbytes(1,payload)
BLE.set_svc("AB5E0001-5A21-4F05-BC7D-AF01F617B664")
BLE.set_chr("AB5E0002-5A21-4F05-BC7D-AF01F617B664")
BLE.run(2)
end
def subMicCRTL(unsub)
# BLE.set_svc("AB5E0001-5A21-4F05-BC7D-AF01F617B664")
# BLE.set_chr("AB5E0004-5A21-4F05-BC7D-AF01F617B664")
if unsub == true
print("Unsubscribe from Mic CTRL")
BLE.run(4)
else
print("Subscribe to Mic CTRL")
BLE.run(3,false,0x4d)
end
end
def subMicRX(unsub)
BLE.set_svc("AB5E0001-5A21-4F05-BC7D-AF01F617B664")
BLE.set_chr("AB5E0003-5A21-4F05-BC7D-AF01F617B664")
if unsub == true
print("Unsubscribe from Mic RX")
BLE.run(4)
else
print("Subscribe to Mic RX")
BLE.run(3)
end
end
def handleCTRL()
if self.buf[1] == 0x8
self.writeCharTX(bytes("0c00)")) # mic open
print("Open mic")
elif self.buf[1] == 0x0
print("Audio stop")
self.recording = false
elif self.buf[1] == 0x4
self.subMicCRTL(true) # maybe unneeded
print("Audio start")
elif self.buf[1] == 0xa
print("Audio sync")
elif self.buf[1] == 0xb
var v = self.buf.geti(2,-2)
print("Version:",f"{v:04i}","Framesize:",self.buf.geti(8,-2))
print("Got audio caps -> Subscribe to Mic RX")
#print(self.buf[1..self.buf[0]])
self.subMicRX(false)
end
end
def recAudioFrame()
self.adpcm..adpcm.decodeAdpcm(self.buf[4..133])
self.recording = true
#print(self.buf[0],size(self.adpcm))
end
def handleMic(h)
if h == 0x4d
self.handleCTRL()
elif h == 0x4a
self.recAudioFrame()
else
print(f"{h:x}")
print(self.buf[1..self.buf[0]])
end
end
def handleNotif(u,h)
if self.connected == false return end
if h < 0x37
self.handle_HID_notification(h)
else
self.handleMic(h)
end
end
def cb(error,op,uuid,handle)
if error == 0
if op == 1 # read OP
# print(op,uuid)
self.handle_read_CB(uuid)
elif op == 2
if self.step == 4
self.connecting = false
self.connected = true
print("BLE: init completed for BPR2S")
self.step = 5
end
elif op == 3
print(self.buf[1..self.buf[0]])
if self.step == 0
self.subMicCRTL()
self.step = 3
elif self.step == 3
print("Request caps")
self.writeCharTX(bytes("0A0100000301)")) # get caps
self.step = 4
end
elif op == 4
#self.subMicCRTL()
elif op == 5
self.connected = false
self.connecting = false
print("BLE: did disconnect BPR2S ... will try to reconnect")
tasmota.gc()
elif op == 103 # notification OP
self.handleNotif(uuid,handle)
else
print(f"unhandled op:{op:x}")
end
else
print("BLE: error:",error)
if self.connecting == true
print("BLE: init sequence failed ... try to repeat")
self.connecting = false
end
end
end
end
ble_hid = BLE_BPR2S("E007020103C1",1) # HID controller MAC and address type
tasmota.add_driver(ble_hid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment