Created
January 16, 2023 19:26
-
-
Save melianmiko/02f7c6a550808e38d9b6760fb688e125 to your computer and use it in GitHub Desktop.
Freebuds playground
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import code | |
import os | |
import socket | |
import threading | |
import traceback | |
import bluetooth | |
DEVICE_ADDR = "xx:xx:xx:xx:xx:xx" | |
SPP_SERVICE_UUID = "00001101-0000-1000-8000-00805f9b34fb" | |
# Some data for more pretty output | |
CMD_DESCRIPTORS = { | |
"0107": "DEVICE_INFO", | |
"0108": "BATTERY_INFO", | |
"011f": "SET_DOUBLE_TAP", | |
"0120": "GET_DOUBLE_TAP", | |
"0127": "BATTERY_STATE_CHANGED", | |
"012d": "GET_TOUCH_PAD", | |
"2b03": "IN_EAR_CHANGED", | |
"2b04": "SET_ANC_MODE", | |
"2b10": "SET_AUTO_PAUSE", | |
"2b11": "GET_AUTO_PAUSE", | |
"2b16": "SET_LONG_TAP", | |
"2b17": "GET_LONG_TAP", | |
"2b18": "SET_ANC_PREF", | |
"2b19": "GET_ANC_PREF", | |
"2b2a": "CURRENT_ANC_MODE", | |
"0a0d": "LOG_SPAM", | |
"0c02": "LIST_LANGUAGES", | |
} | |
class State: | |
""" | |
Static app state class | |
""" | |
socket = None | |
class Package: | |
""" | |
This class implements a full request/response | |
pacakge, and functions to build/read bytes. | |
See "Package structure". | |
""" | |
def __init__(self, cmd: bytes, parameters): | |
""" | |
Build new package | |
""" | |
assert len(cmd) == 2 | |
self.command_id = cmd | |
self.parameters = parameters | |
self.process_params() | |
def process_params(self): | |
""" | |
Process params (convert int values to bytes, if present) | |
""" | |
for i in range(len(self.parameters)): | |
t, v = self.parameters[i] | |
if isinstance(v, int): | |
self.parameters[i] = (t, v.to_bytes(1, signed=True, byteorder="big")) | |
def __str__(self): | |
""" | |
Pretty-print this pacakge contents | |
""" | |
out = build_row(12, "COMMAND_ID") + \ | |
build_row(10, "2 bytes") + \ | |
build_row(40, self.command_id.hex(), CMD_DESCRIPTORS) + "\n" | |
out += 70 * "=" + "\n" | |
for p_type, p_value in self.parameters: | |
out += build_row(12, f"PARAM {p_type}") + \ | |
build_row(10, f"{len(p_value)} bytes") + \ | |
build_row(40, p_value.hex()) | |
if all(c < 128 for c in p_value): | |
# ASCII string | |
out += p_value.decode("ascii") | |
out += "\n" | |
return out | |
def send(self): | |
""" | |
Send this package | |
""" | |
bts = self.encode() | |
print("Send package", bts.hex()) | |
print(self) | |
State.socket.send(bts) | |
def encode(self): | |
""" | |
Convert this package to bytes. | |
Used to send them to device | |
""" | |
# Build body (command_id + parameters) | |
body = self.command_id | |
for p_type, p_value in self.parameters: | |
p_type = p_type.to_bytes(1, byteorder="big") | |
p_length = len(p_value).to_bytes(1, byteorder="big") | |
body += p_type + p_length + p_value | |
# Build package | |
result = b"Z" + (len(body) + 1).to_bytes(2, byteorder="big") + b"\x00" + body | |
result += crc16(result) | |
return result | |
@staticmethod | |
def from_bytes(data: bytes): | |
""" | |
Create Package from bytes. | |
Used to parse incoming data. | |
""" | |
assert data[0] == 90 | |
assert data[3] == 0 | |
length = int.from_bytes(data[1:3], byteorder="big") | |
command_id = data[4:6] | |
position = 6 | |
parameters = [] | |
while position < length + 3: | |
p_type = data[position] | |
p_length = data[position + 1] | |
p_value = data[position + 2:position + p_length + 2] | |
parameters.append((p_type, p_value)) | |
position += p_length + 2 | |
return Package(command_id, parameters) | |
def recv_thread(): | |
""" | |
This function will infinite read data from socket, | |
parse them and print to console/file. | |
""" | |
conn = State.socket | |
while True: | |
try: | |
byte = conn.recv(4) | |
if byte[0:2] == b"Z\x00": | |
length = byte[2] | |
if length < 4: | |
conn.recv(length) # Filter trash | |
continue | |
# Read data | |
byte += conn.recv(length) | |
package = Package.from_bytes(byte) | |
if package.command_id != b"\x0a\x0d": | |
print(TermColors.BLUE + | |
"Got package\n" + str(package) + TermColors.END) | |
else: | |
handle_log_pkg(package) | |
print("Log written to file...") | |
except (TimeoutError, socket.timeout): | |
pass | |
except (ConnectionResetError, ConnectionAbortedError, OSError): | |
traceback.print_exc() | |
def handle_log_pkg(log_pkg: Package): | |
with open("device_logs.csv", "a+") as f: | |
row = "" | |
for p_type, p_value in log_pkg.parameters: | |
if p_type == 4: | |
p_value = p_value.decode("ascii") | |
else: | |
p_value = p_value.hex().upper() | |
row += f"{p_type};{p_value};" | |
row += "\n" | |
f.write(row) | |
def main(): | |
# Find service | |
services = bluetooth.find_service(address=DEVICE_ADDR, uuid=SPP_SERVICE_UUID) | |
assert len(services) > 0 | |
host = services[0]["host"] | |
port = services[0]["port"] | |
print(f"Found service at port {port}") | |
# Connect socket | |
print("Trying to connect...") | |
# noinspection PyUnresolvedReferences | |
conn = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) | |
conn.settimeout(2) | |
conn.connect((host, port)) | |
# Run recv thread | |
State.socket = conn | |
threading.Thread(target=recv_thread).start() | |
print("OK") | |
code.interact("", local=globals()) | |
# Force exit | |
# noinspection PyUnresolvedReferences,PyProtectedMember | |
os._exit(0) | |
# ------------------------------------------------ | |
# Tools | |
# ------------------------------------------------ | |
class TermColors: | |
BLUE = '\033[94m' | |
GREEN = '\033[92m' | |
END = '\033[0m' | |
def build_row(ln, val, description_table=None): | |
if description_table is not None and val in description_table: | |
val = f"{val} ({description_table[val]})" | |
return str(val).ljust(ln) + " | " | |
def crc16(data): | |
""" | |
I don't found good implementation of CRC16-XModem in Python | |
and don't want to implement it by myself, for now, so I use | |
this, with table =) | |
""" | |
crc16_tab = [0, 4129, 8258, 12387, 16516, 20645, 24774, 28903, -32504, -28375, -24246, -20117, -15988, -11859, | |
-7730, | |
-3601, 4657, 528, 12915, 8786, 21173, 17044, 29431, 25302, -27847, -31976, -19589, -23718, -11331, | |
-15460, -3073, -7202, 9314, 13379, 1056, 5121, 25830, 29895, 17572, 21637, -23190, -19125, -31448, | |
-27383, -6674, -2609, -14932, -10867, 13907, 9842, 5649, 1584, 30423, 26358, 22165, 18100, -18597, | |
-22662, -26855, -30920, -2081, -6146, -10339, -14404, 18628, 22757, 26758, 30887, 2112, 6241, 10242, | |
14371, -13876, -9747, -5746, -1617, -30392, -26263, -22262, -18133, 23285, 19156, 31415, 27286, 6769, | |
2640, 14899, 10770, -9219, -13348, -1089, -5218, -25735, -29864, -17605, -21734, 27814, 31879, 19684, | |
23749, 11298, 15363, 3168, 7233, -4690, -625, -12820, -8755, -21206, -17141, -29336, -25271, 32407, | |
28342, 24277, 20212, 15891, 11826, 7761, 3696, -97, -4162, -8227, -12292, -16613, -20678, -24743, | |
-28808, -28280, -32343, -20022, -24085, -12020, -16083, -3762, -7825, 4224, 161, 12482, 8419, 20484, | |
16421, 28742, 24679, -31815, -27752, -23557, -19494, -15555, -11492, -7297, -3234, 689, 4752, 8947, | |
13010, 16949, 21012, 25207, 29270, -18966, -23093, -27224, -31351, -2706, -6833, -10964, -15091, 13538, | |
9411, 5280, 1153, 29798, 25671, 21540, 17413, -22565, -18438, -30823, -26696, -6305, -2178, -14563, | |
-10436, 9939, 14066, 1681, 5808, 26199, 30326, 17941, 22068, -9908, -13971, -1778, -5841, -26168, | |
-30231, -18038, -22101, 22596, 18533, 30726, 26663, 6336, 2273, 14466, 10403, -13443, -9380, -5313, | |
-1250, -29703, -25640, -21573, -17510, 19061, 23124, 27191, 31254, 2801, 6864, 10931, 14994, -722, | |
-4849, -8852, -12979, -16982, -21109, -25112, -29239, 31782, 27655, 23652, 19525, 15522, 11395, 7392, | |
3265, -4321, -194, -12451, -8324, -20581, -16454, -28711, -24584, 28183, 32310, 20053, 24180, 11923, | |
16050, 3793, 7920] | |
s = 0 | |
for byte in data: | |
s = crc16_tab[((s >> 8) ^ byte) & 255] ^ (s << 8) | |
s = s & 0b1111111111111111 # use only 16 bits | |
return s.to_bytes(2, "big") | |
if __name__ == "__main__": | |
source = bytes.fromhex("5a0007002b1901000200ff0f") | |
pkg = Package.from_bytes(source) | |
result = pkg.encode() | |
assert source == result | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment