Created
January 23, 2024 20:17
-
-
Save Staars/32418242b08b9ae32b84488e14761760 to your computer and use it in GitHub Desktop.
bpr2s voice test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple Berry driver for the BPR2S Air mouse (a cheap BLE HID controller) | |
import BLE | |
class ADPCM | |
static indexTable = [ | |
-1, -1, -1, -1, 2, 4, 6, 8, | |
-1, -1, -1, -1, 2, 4, 6, 8 | |
] | |
static stepSizeTable = [ | |
7, 8, 9, 10, 11, 12, 13, 14, 16, 17, | |
19, 21, 23, 25, 28, 31, 34, 37, 41, 45, | |
50, 55, 60, 66, 73, 80, 88, 97, 107, 118, | |
130, 143, 157, 173, 190, 209, 230, 253, 279, 307, | |
337, 371, 408, 449, 494, 544, 598, 658, 724, 796, | |
876, 963, 1060, 1166, 1282, 1411, 1552, 1707, 1878, 2066, | |
2272, 2499, 2749, 3024, 3327, 3660, 4026, 4428, 4871, 5358, | |
5894, 6484, 7132, 7845, 8630, 9493, 10442, 11487, 12635, 13899, | |
15289, 16818, 18500, 20350, 22385, 24623, 27086, 29794, 32767 | |
] | |
var statePrevSample, statePrevIndex | |
# util to clamp a number within a given range | |
def clamp(num, min, max) | |
return num <= min ? min : num >= max ? max : num | |
end | |
def decodeAdpcm(inputBuffer) | |
var outputBuffer = bytes(-((size(inputBuffer)-3) * 4)) | |
var outputBufferOffset = 0 | |
self.statePrevSample = inputBuffer.geti(0,-2) | |
self.statePrevIndex = inputBuffer[2] | |
for inputBufferOffset:range(3,size(inputBuffer) - 1) | |
var b = inputBuffer[inputBufferOffset] | |
outputBuffer.seti(outputBufferOffset,self.decodeSample((b >> 4) & 0xF),2) | |
outputBuffer.seti(outputBufferOffset+2,self.decodeSample(b & 0xF),2) | |
outputBufferOffset += 4 | |
end | |
return outputBuffer; | |
end | |
def decodeSample(sample) | |
var predSample = self.statePrevSample | |
var index = self.statePrevIndex | |
var step = self.stepSizeTable[index] | |
var difference = step >> 3 | |
# compute difference and new predicted value | |
if (sample & 0x4) difference += step end | |
if (sample & 0x2) difference += (step >> 1) end | |
if (sample & 0x1) difference += (step >> 2) end | |
# handle sign bit | |
predSample += (sample & 0x8) ? -difference : difference | |
# find new index value | |
index += self.indexTable[sample] | |
index = self.clamp(index, 0, 88) | |
# clamp output value | |
predSample = self.clamp(predSample, -32768, 32767) | |
self.statePrevSample = predSample | |
self.statePrevIndex = index | |
predSample <<= 2 # amplify | |
return predSample | |
end | |
end | |
var adpcm = ADPCM() | |
class BLE_BPR2S : Driver | |
var buf | |
var connecting, connected | |
var step | |
var adpcm, recording, rec_counter, start_rec, save_rec | |
def init(MAC,addr_type) | |
var cbp = tasmota.gen_cb(/e,o,u,h->self.cb(e,o,u,h)) | |
self.buf = bytes(-256) | |
BLE.conn_cb(cbp,self.buf) | |
BLE.set_MAC(bytes(MAC),addr_type) | |
print("BLE: will try to connect to BPR2S with MAC:",MAC) | |
self.connect() | |
self.step = 0 | |
self.rec_counter = 0 | |
self.adpcm = bytes(30000) | |
tasmota.add_fast_loop(/-> BLE.loop()) | |
end | |
def connect() | |
self.connecting = true; | |
self.connected = false; | |
BLE.set_svc("1812",true) | |
BLE.set_chr("2a4a") # the first characteristic we have to read | |
BLE.run(1) # read | |
end | |
def every_second() | |
if (self.connecting == false && self.connected == false) | |
print("BLE: try to reconnect BPR2S") | |
self.subMicRX() | |
self.connecting = true | |
self.step = 3 | |
end | |
if self.recording == true | |
if self.rec_counter == size(self.adpcm) | |
print("ready") | |
self.recording = false | |
self.rec_counter = 0 | |
BLE.run(5) | |
else | |
self.rec_counter = size(self.adpcm) | |
end | |
print(self.rec_counter) | |
end | |
end | |
def every_50ms() | |
if self.start_rec == true | |
self.writeCharTX(bytes("0c00)")) # mic open | |
self.start_rec = false | |
end | |
end | |
def handle_read_CB(uuid) # uuid is the callback characteristic | |
self.connected = true; | |
# we just have to read these characteristics before we can finally subscribe | |
if uuid == 0x2a4a # did receive HID info | |
BLE.set_chr("2a4b") | |
BLE.run(1) # read next characteristic | |
elif uuid == 0x2a4b # did receive HID report map | |
BLE.set_chr("2a4d") | |
BLE.run(1) # read to trigger notifications of the HID device | |
elif uuid == 0x2a4d # did receive HID report | |
# print(self.buf[1..self.buf[0]]) | |
BLE.set_chr("2a4d") | |
BLE.run(3) # subscribe | |
end | |
end | |
def saveADPCM() | |
var f = open("audio.raw", "w") | |
f.write(self.adpcm) | |
f.close() | |
end | |
#- | |
Service: UUID: AB5E0001-5A21-4F05-BC7D-AF01F617B664 | |
AB5E0002-5A21-4F05-BC7D-AF01F617B664 , handle: 0x0048 , ['Read', 'WriteNoResp', 'Write'] | |
AB5E0003-5A21-4F05-BC7D-AF01F617B664 , handle: 0x004a , ['Read', 'Notify'] | |
AB5E0004-5A21-4F05-BC7D-AF01F617B664 , handle: 0x004d , ['Read', 'Notify'] | |
-# | |
def handle_HID_notification(h) | |
import mqtt | |
import path | |
var t = "key" | |
var v = "" | |
if h == 42 | |
var k = self.buf[3] | |
if k == 0x65 | |
v = "square" | |
elif k == 0x4f | |
v = "right" | |
elif k == 0x50 | |
v = "left" | |
elif k == 0x51 | |
v = "down" | |
elif k == 0x52 | |
v = "up" | |
elif k == 0x2a | |
v = "back" | |
end | |
elif h == 38 | |
var k = self.buf[1] | |
if k == 0x30 | |
v = "on" | |
elif k == 0xe2 | |
v = "mute" | |
elif k == 0x23 | |
v = "triangle" | |
#self.save_rec = true | |
self.saveADPCM() | |
elif k == 0x21 | |
v = "circle" | |
self.adpcm.clear() | |
self.start_rec = true | |
elif k == 0x41 | |
v = "set" | |
elif k == 0x24 | |
v = "return" | |
elif k == 0xea | |
v = "minus" | |
elif k == 0xe9 | |
v = "plus" | |
end | |
elif h == 34 | |
t = "mouse" | |
var x = self.buf.getbits(12,12) | |
if x > 2048 | |
x -= 4096 | |
end | |
var y = self.buf.getbits(24,12) | |
if y > 2048 | |
y -= 4096 | |
end | |
v = format('{"x":%i,"y":%i}',x,y) # stupid idea to publish from fast_loop, but for demonstration ... | |
else | |
print("notified by:",h) | |
end | |
if v != '' | |
mqtt.publish("tele/BPR2S",format('{"%s":"%s"}',t,v)) | |
# else # will be triggered on button release too | |
# print(self.buf[1..self.buf[0]],h) # show the packet as byte buffer | |
end | |
end | |
def writeCharTX(payload) | |
self.buf[0] = size(payload) | |
self.buf.setbytes(1,payload) | |
BLE.set_svc("AB5E0001-5A21-4F05-BC7D-AF01F617B664") | |
BLE.set_chr("AB5E0002-5A21-4F05-BC7D-AF01F617B664") | |
BLE.run(2) | |
end | |
def subMicCRTL(unsub) | |
# BLE.set_svc("AB5E0001-5A21-4F05-BC7D-AF01F617B664") | |
# BLE.set_chr("AB5E0004-5A21-4F05-BC7D-AF01F617B664") | |
if unsub == true | |
print("Unsubscribe from Mic CTRL") | |
BLE.run(4) | |
else | |
print("Subscribe to Mic CTRL") | |
BLE.run(3,false,0x4d) | |
end | |
end | |
def subMicRX(unsub) | |
BLE.set_svc("AB5E0001-5A21-4F05-BC7D-AF01F617B664") | |
BLE.set_chr("AB5E0003-5A21-4F05-BC7D-AF01F617B664") | |
if unsub == true | |
print("Unsubscribe from Mic RX") | |
BLE.run(4) | |
else | |
print("Subscribe to Mic RX") | |
BLE.run(3) | |
end | |
end | |
def handleCTRL() | |
if self.buf[1] == 0x8 | |
self.writeCharTX(bytes("0c00)")) # mic open | |
print("Open mic") | |
elif self.buf[1] == 0x0 | |
print("Audio stop") | |
self.recording = false | |
elif self.buf[1] == 0x4 | |
self.subMicCRTL(true) # maybe unneeded | |
print("Audio start") | |
elif self.buf[1] == 0xa | |
print("Audio sync") | |
elif self.buf[1] == 0xb | |
var v = self.buf.geti(2,-2) | |
print("Version:",f"{v:04i}","Framesize:",self.buf.geti(8,-2)) | |
print("Got audio caps -> Subscribe to Mic RX") | |
#print(self.buf[1..self.buf[0]]) | |
self.subMicRX(false) | |
end | |
end | |
def recAudioFrame() | |
self.adpcm..adpcm.decodeAdpcm(self.buf[4..133]) | |
self.recording = true | |
#print(self.buf[0],size(self.adpcm)) | |
end | |
def handleMic(h) | |
if h == 0x4d | |
self.handleCTRL() | |
elif h == 0x4a | |
self.recAudioFrame() | |
else | |
print(f"{h:x}") | |
print(self.buf[1..self.buf[0]]) | |
end | |
end | |
def handleNotif(u,h) | |
if self.connected == false return end | |
if h < 0x37 | |
self.handle_HID_notification(h) | |
else | |
self.handleMic(h) | |
end | |
end | |
def cb(error,op,uuid,handle) | |
if error == 0 | |
if op == 1 # read OP | |
# print(op,uuid) | |
self.handle_read_CB(uuid) | |
elif op == 2 | |
if self.step == 4 | |
self.connecting = false | |
self.connected = true | |
print("BLE: init completed for BPR2S") | |
self.step = 5 | |
end | |
elif op == 3 | |
print(self.buf[1..self.buf[0]]) | |
if self.step == 0 | |
self.subMicCRTL() | |
self.step = 3 | |
elif self.step == 3 | |
print("Request caps") | |
self.writeCharTX(bytes("0A0100000301)")) # get caps | |
self.step = 4 | |
end | |
elif op == 4 | |
#self.subMicCRTL() | |
elif op == 5 | |
self.connected = false | |
self.connecting = false | |
print("BLE: did disconnect BPR2S ... will try to reconnect") | |
tasmota.gc() | |
elif op == 103 # notification OP | |
self.handleNotif(uuid,handle) | |
else | |
print(f"unhandled op:{op:x}") | |
end | |
else | |
print("BLE: error:",error) | |
if self.connecting == true | |
print("BLE: init sequence failed ... try to repeat") | |
self.connecting = false | |
end | |
end | |
end | |
end | |
ble_hid = BLE_BPR2S("E007020103C1",1) # HID controller MAC and address type | |
tasmota.add_driver(ble_hid) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment