Last active
June 2, 2023 22:10
-
-
Save Langerz82/2eae038ba57aecbb8d7b7c989e2d8fd5 to your computer and use it in GitHub Desktop.
emuelec-bluetooth 4.7 - Author: ebeem (https://github.com/ebeem)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from subprocess import Popen, PIPE | |
from dataclasses import dataclass | |
import time | |
import sys | |
DEBUG = False | |
@dataclass | |
class BluetoothDevice: | |
mac: str | |
name: str | |
alias: str | |
clas: str | |
icon: str | |
paired: bool | |
bounded: bool | |
trusted: bool | |
blocked: bool | |
connected: bool | |
wake_allowed: bool | |
legacy_pairing: bool | |
rssi: int | |
class BluetoothCTL: | |
def _execute(self, cmd: list[str]) -> list[str]: | |
if DEBUG: | |
print("> " + " ".join(x for x in cmd)) | |
out: list[str] = [] | |
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p: | |
for line in p.stdout: | |
out.append(line) | |
if DEBUG: | |
print(line) | |
return out | |
def _execute_lookup(self, cmd: list[str], match: str) -> str: | |
out = self._execute(cmd) | |
for o in out: | |
if match in o: | |
return o | |
return "" | |
def _execute_async(self, cmd: list[str]): | |
Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) | |
def _parse_device_info(self, mac: str): | |
info = self._execute(["bluetoothctl", "info", mac]) | |
dev = BluetoothDevice( | |
mac=mac, | |
name="", | |
alias="", | |
clas="", | |
icon="", | |
paired=False, | |
bounded=False, | |
trusted=False, | |
blocked=False, | |
connected=False, | |
wake_allowed=False, | |
legacy_pairing=False, | |
rssi=0, | |
) | |
if DEBUG: | |
print(info) | |
for i in info: | |
if "Name:" in i: | |
dev.name = i.split(":")[1].strip() # bug - default strip make name and alias blank. | |
if "Alias:" in i: | |
dev.alias = i.split(":")[1].strip() | |
if "Class:" in i: | |
dev.clas = i.split(":")[1].strip() | |
if "Icon:" in i: | |
dev.icon = i.split(":")[1].strip() | |
if "Paired:" in i: | |
dev.paired = i.split(":")[1].strip() == "yes" | |
if "Bounded:" in i: | |
dev.bounded = i.split(":")[1].strip() == "yes" | |
if "Trusted:" in i: | |
dev.trusted = i.split(":")[1].strip() == "yes" | |
if "Blocked:" in i: | |
dev.blocked = i.split(":")[1].strip() == "yes" | |
if "Connected:" in i: | |
dev.connected = i.split(":")[1].strip() == "yes" | |
if "WakeAllowed:" in i: | |
dev.wake_allowed = i.split(":")[1].strip() == "yes" | |
if "LegacyPairing:" in i: | |
dev.legacy_pairing = i.split(":")[1].strip() == "yes" | |
if "RSSI:" in i: | |
dev.rssi = int(i.split(":")[1].strip()) # bug - my PS3 Gamepad does not have this value. | |
return dev | |
@property | |
def power(self) -> bool: | |
return "yes" in self._execute_lookup(["bluetoothctl", "show"], "Pairable") | |
@property | |
def discoverable(self) -> bool: | |
return "yes" in self._execute_lookup(["bluetoothctl", "show"], "Discoverable") | |
@property | |
def pairable(self) -> bool: | |
return "yes" in self._execute_lookup(["bluetoothctl", "show"], "Pairable") | |
@power.setter | |
def power(self, value): | |
self._execute(["bluetoothctl", "power", "on" if value else "off"]) | |
@discoverable.setter | |
def discoverable(self, value): | |
self._execute(["bluetoothctl", "discoverable", "on" if value else "off"]) | |
@pairable.setter | |
def pairable(self, value): | |
self._execute(["bluetoothctl", "pairable", "on" if value else "off"]) | |
@pairable.setter | |
def agent(self, value): | |
self._execute(["bluetoothctl", "agent", "on" if value else "off"]) | |
def scan(self, timeout=10): | |
self._execute(["bluetoothctl", "--timeout", str(timeout), "scan", "on"]) | |
def scan_async(self, timeout=10): | |
self._execute_async(["bluetoothctl", "--timeout", str(timeout), "scan", "on"]) | |
def devices(self) -> list[BluetoothDevice]: | |
devs = self._execute(["bluetoothctl", "devices"]) | |
return [self._parse_device_info(dev.split()[1]) for dev in devs] | |
def trust(self, dev: BluetoothDevice) -> bool: | |
#print("trusting.") # debugging | |
return "succeeded" in self._execute_lookup( | |
["bluetoothctl", "trust", dev.mac], "trust succeeded" | |
) | |
def untrust(self, dev: BluetoothDevice) -> bool: | |
#print("untrusting.") # debugging | |
return "succeeded" in self._execute_lookup( | |
["bluetoothctl", "untrust", dev.mac], "untrust succeeded" | |
) | |
def pair(self, dev: BluetoothDevice) -> bool: | |
#print("pairing.") # debugging | |
return "successful" in self._execute_lookup( | |
["bluetoothctl", "pair", dev.mac], "Pairing successful" | |
) | |
def connect(self, dev: BluetoothDevice) -> bool: | |
#print("connecting") # debugging | |
return "successful" in self._execute_lookup( | |
["bluetoothctl", "connect", dev.mac], "Connection successful" | |
) | |
def forget(self, dev: BluetoothDevice) -> bool: | |
#print("forgeting") # debugging | |
return "removed" in self._execute_lookup( | |
["bluetoothctl", "remove", dev.mac], "Device has been removed" | |
) | |
SCAN_TIME = 60 | |
SCAN_INTERVAL = 15 | |
if __name__ == "__main__": | |
if len(sys.argv) == 2: | |
SCAN_INTERVAL = int(sys.argv[1]); | |
if len(sys.argv) == 3: | |
SCAN_INTERVAL = int(sys.argv[1]); | |
SCAN_TIME = int(sys.argv[2]); | |
bt = BluetoothCTL() | |
bt.power = True | |
bt.agent = True | |
bt.discoverable = True | |
bt.pairable = True | |
print( | |
"Scanning available devices for {} seconds, with interval {}, please wait...".format( | |
SCAN_TIME, SCAN_INTERVAL | |
) | |
) | |
bt.scan_async(timeout=SCAN_TIME) | |
for i in range(SCAN_TIME // SCAN_INTERVAL): | |
time.sleep(SCAN_INTERVAL) | |
print("Starting scan {}, please wait...".format(i + 1)) | |
devices = bt.devices() | |
for dev in devices: | |
print( | |
"detected device {}, mac: {}, icon: {}, paired: {}".format( | |
dev.name, dev.mac, dev.icon, dev.paired | |
) | |
) | |
if ( | |
not dev.connected | |
and dev.rssi != 0 | |
and "input-" in dev.icon | |
and len(dev.name) > 0 | |
): | |
print( | |
"found device {}, mac: {}, icon: {}, paired: {}".format( | |
dev.name, dev.mac, dev.icon, dev.paired | |
) | |
) | |
# handle if a device is trying to pair while it's already paired | |
if dev.paired: | |
print("pairing a paired device {}".format(dev.name)) | |
bt.forget(dev) | |
# trust then pair and finally connect | |
if bt.trust(dev) and bt.pair(dev) and bt.connect(dev): # new code bug - see comments up. | |
print("successfully paired {}".format(dev.name)) | |
else: | |
# if for any reason one step fails, forget the device so next time it can be paired | |
bt.forget(dev) | |
print("failed to pair {}, try again".format(dev.name)) | |
elif ( | |
dev.connected and not dev.trusted and not dev.paired | |
and "input-" in dev.icon | |
): | |
print( | |
"trusting device {}, mac: {}, icon: {}, paired: {}".format( | |
dev.name, dev.mac, dev.icon, dev.paired | |
) | |
) | |
if bt.trust(dev) and bt.pair(dev): | |
print("successfully paired {} now connect cable.".format(dev.name)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment