-
-
Save pylover/d68be364adac5f946887b85e6ed6e7ae to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3 | |
"""Fixing bluetooth stereo headphone/headset problem in debian distros. | |
Workaround for bug: https://bugs.launchpad.net/ubuntu/+source/indicator-sound/+bug/1577197 | |
Run it with python3.5 or higher after pairing/connecting the bluetooth stereo headphone. | |
This will be only fixes the bluez5 problem mentioned above . | |
Licence: Freeware | |
Install with: | |
curl "https://gist.githubusercontent.com/pylover/d68be364adac5f946887b85e6ed6e7ae/raw/install.sh" | sh | |
Shorthands: | |
$ alias speakers="a2dp.py 10:08:C1:44:AE:BC" | |
$ alias headphones="a2dp.py 00:22:37:3D:DA:50" | |
$ alias headset="a2dp.py 00:22:37:F8:A0:77 -p hsp" | |
$ speakers | |
See ``a2dp.py -h`` for help. | |
Check here for updates: https://gist.github.com/pylover/d68be364adac5f946887b85e6ed6e7ae | |
Thanks to: | |
* https://github.com/DominicWatson, for adding the ``-p/--profile`` argument. | |
* https://github.com/IzzySoft, for mentioning wait before connecting again. | |
* https://github.com/AmploDev, for v0.4.0 | |
* https://github.com/Mihara, for autodetect & autorun service | |
* https://github.com/dabrovnijk, for systemd service | |
Change Log | |
---------- | |
- 0.6.2 | |
* Fix program name inside the help message. | |
- 0.6.1 | |
* Fix Py warning | |
- 0.6.0 | |
* Install script | |
- 0.5.2 | |
* Increasing the number of tries to 15. | |
- 0.5.2 | |
* Optimizing waits. | |
- 0.5.1 | |
* Increasing WAIT_TIME and TRIES | |
- 0.5.0 | |
* Autodetect & autorun service | |
- 0.4.1 | |
* Sorting device list | |
- 0.4.0 | |
* Adding ignore_fail argument by @AmploDev. | |
* Sending all available streams into selected sink, after successfull connection by @AmploDev. | |
- 0.3.3 | |
* Updating default sink before turning to ``off`` profile. | |
- 0.3.2 | |
* Waiting a bit: ``-w/--wait`` before connecting again. | |
- 0.3.0 | |
* Adding -p / --profile option for using the same script to switch between headset and A2DP audio profiles | |
- 0.2.5 | |
* Mentioning [mac] argument. | |
- 0.2.4 | |
* Removing duplicated devices in select device list. | |
- 0.2.3 | |
* Matching ANSI escape characters. Tested on 16.10 & 16.04 | |
- 0.2.2 | |
* Some sort of code enhancements. | |
- 0.2.0 | |
* Adding `-V/--version`, `-w/--wait` and `-t/--tries` CLI arguments. | |
- 0.1.1 | |
* Supporting the `[NEW]` prefix for devices & controllers as advised by @wdullaer | |
* Drying the code. | |
""" | |
import sys | |
import re | |
import asyncio | |
import subprocess as sb | |
import argparse | |
__version__ = '0.6.2' | |
HEX_DIGIT_PATTERN = '[0-9A-F]' | |
HEX_BYTE_PATTERN = '%s{2}' % HEX_DIGIT_PATTERN | |
MAC_ADDRESS_PATTERN = ':'.join((HEX_BYTE_PATTERN, ) * 6) | |
DEVICE_PATTERN = re.compile('^(?:.*\s)?Device\s(?P<mac>%s)\s(?P<name>.*)' % MAC_ADDRESS_PATTERN) | |
CONTROLLER_PATTERN = re.compile('^(?:.*\s)?Controller\s(?P<mac>%s)\s(?P<name>.*)' % MAC_ADDRESS_PATTERN) | |
WAIT_TIME = 2.25 | |
TRIES = 15 | |
PROFILE = 'a2dp' | |
_profiles = { | |
'a2dp': 'a2dp_sink', | |
'hsp': 'headset_head_unit', | |
'off': 'off' | |
} | |
# CLI Arguments | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-e', '--echo', action='store_true', default=False, | |
help='If given, the subprocess stdout will be also printed on stdout.') | |
parser.add_argument('-w', '--wait', default=WAIT_TIME, type=float, | |
help='The seconds to wait for subprocess output, default is: %s' % WAIT_TIME) | |
parser.add_argument('-t', '--tries', default=TRIES, type=int, | |
help='The number of tries if subprocess is failed. default is: %s' % TRIES) | |
parser.add_argument('-p', '--profile', default=PROFILE, | |
help='The profile to switch to. available options are: hsp, a2dp. default is: %s' % PROFILE) | |
parser.add_argument('-V', '--version', action='store_true', help='Show the version.') | |
parser.add_argument('mac', nargs='?', default=None) | |
# Exceptions | |
class SubprocessError(Exception): | |
pass | |
class RetryExceededError(Exception): | |
pass | |
class BluetoothctlProtocol(asyncio.SubprocessProtocol): | |
def __init__(self, exit_future, echo=True): | |
self.exit_future = exit_future | |
self.transport = None | |
self.output = None | |
self.echo = echo | |
def listen_output(self): | |
self.output = '' | |
def not_listen_output(self): | |
self.output = None | |
def pipe_data_received(self, fd, raw): | |
d = raw.decode() | |
if self.echo: | |
print(d, end='') | |
if self.output is not None: | |
self.output += d | |
def process_exited(self): | |
self.exit_future.set_result(True) | |
def connection_made(self, transport): | |
self.transport = transport | |
print('Connection MADE') | |
async def send_command(self, c): | |
stdin_transport = self.transport.get_pipe_transport(0) | |
# noinspection PyProtectedMember | |
stdin_transport._pipe.write(('%s\n' % c).encode()) | |
async def search_in_output(self, expression, fail_expression=None): | |
if self.output is None: | |
return None | |
for l in self.output.splitlines(): | |
if fail_expression and re.search(fail_expression, l, re.IGNORECASE): | |
raise SubprocessError('Expression "%s" failed with fail pattern: "%s"' % (l, fail_expression)) | |
if re.search(expression, l, re.IGNORECASE): | |
return True | |
async def send_and_wait(self, cmd, wait_expression, fail_expression='fail'): | |
try: | |
self.listen_output() | |
await self.send_command(cmd) | |
while not await self.search_in_output(wait_expression.lower(), fail_expression=fail_expression): | |
await wait() | |
finally: | |
self.not_listen_output() | |
async def disconnect(self, mac): | |
print('Disconnecting the device.') | |
await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected') | |
async def connect(self, mac): | |
print('Connecting again.') | |
await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful') | |
async def trust(self, mac): | |
await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded') | |
async def quit(self): | |
await self.send_command('quit') | |
async def get_list(self, command, pattern): | |
result = set() | |
try: | |
self.listen_output() | |
await self.send_command(command) | |
await wait() | |
for l in self.output.splitlines(): | |
m = pattern.match(l) | |
if m: | |
result.add(m.groups()) | |
return sorted(list(result), key=lambda i: i[1]) | |
finally: | |
self.not_listen_output() | |
async def list_devices(self): | |
return await self.get_list('devices', DEVICE_PATTERN) | |
async def list_paired_devices(self): | |
return await self.get_list('paired-devices', DEVICE_PATTERN) | |
async def list_controllers(self): | |
return await self.get_list('list', CONTROLLER_PATTERN) | |
async def select_paired_device(self): | |
print('Selecting device:') | |
devices = await self.list_paired_devices() | |
count = len(devices) | |
if count < 1: | |
raise SubprocessError('There is no connected device.') | |
elif count == 1: | |
return devices[0] | |
for i, d in enumerate(devices): | |
print('%d. %s %s' % (i+1, d[0], d[1])) | |
print('Select device[1]:') | |
selected = input() | |
return devices[0 if not selected.strip() else (int(selected) - 1)] | |
async def wait(delay=None): | |
return await asyncio.sleep(WAIT_TIME or delay) | |
async def execute_command(cmd, ignore_fail=False): | |
p = await asyncio.create_subprocess_shell(cmd, stdout=sb.PIPE, stderr=sb.PIPE) | |
stdout, stderr = await p.communicate() | |
stdout, stderr = \ | |
stdout.decode() if stdout is not None else '', \ | |
stderr.decode() if stderr is not None else '' | |
if p.returncode != 0 or stderr.strip() != '': | |
message = 'Command: %s failed with status: %s\nstderr: %s' % (cmd, p.returncode, stderr) | |
if ignore_fail: | |
print('Ignoring: %s' % message) | |
else: | |
raise SubprocessError(message) | |
return stdout | |
async def execute_find(cmd, pattern, tries=0, fail_safe=False): | |
tries = tries or TRIES | |
message = 'Cannot find `%s` using `%s`.' % (pattern, cmd) | |
retry_message = message + ' Retrying %d more times' | |
while True: | |
stdout = await execute_command(cmd) | |
match = re.search(pattern, stdout) | |
if match: | |
return match.group() | |
elif tries > 0: | |
await wait() | |
print(retry_message % tries) | |
tries -= 1 | |
continue | |
if fail_safe: | |
return None | |
raise RetryExceededError('Retry times exceeded: %s' % message) | |
async def find_dev_id(mac, **kw): | |
return await execute_find('pactl list cards short', 'bluez_card.%s' % '_'.join(mac), **kw) | |
async def find_sink(mac, **kw): | |
return await execute_find('pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac), **kw) | |
async def set_profile(device_id, profile): | |
print('Setting the %s profile' % profile) | |
try: | |
return await execute_command('pactl set-card-profile %s %s' % (device_id, _profiles[profile])) | |
except KeyError: | |
print('Invalid profile: %s, please select one one of a2dp or hsp.' % profile, file=sys.stderr) | |
raise SystemExit(1) | |
async def set_default_sink(sink): | |
print('Updating default sink to %s' % sink) | |
return await execute_command('pacmd set-default-sink %s' % sink) | |
async def move_streams_to_sink(sink): | |
streams = await execute_command('pacmd list-sink-inputs | grep "index:"', True) | |
for i in streams.split(): | |
i = ''.join(n for n in i if n.isdigit()) | |
if i != '': | |
print('Moving stream %s to sink' % i) | |
await execute_command('pacmd move-sink-input %s %s' % (i, sink)) | |
return sink | |
async def main(args): | |
global WAIT_TIME, TRIES | |
if args.version: | |
print(__version__) | |
return 0 | |
mac = args.mac | |
# Hacking, Changing the constants! | |
WAIT_TIME = args.wait | |
TRIES = args.tries | |
exit_future = asyncio.Future() | |
transport, protocol = await asyncio.get_event_loop().subprocess_exec( | |
lambda: BluetoothctlProtocol(exit_future, echo=args.echo), 'bluetoothctl' | |
) | |
try: | |
if mac is None: | |
mac, _ = await protocol.select_paired_device() | |
mac = mac.split(':' if ':' in mac else '_') | |
print('Device MAC: %s' % ':'.join(mac)) | |
device_id = await find_dev_id(mac, fail_safe=True) | |
if device_id is None: | |
print('It seems device: %s is not connected yet, trying to connect.' % ':'.join(mac)) | |
await protocol.trust(mac) | |
await protocol.connect(mac) | |
device_id = await find_dev_id(mac) | |
sink = await find_sink(mac, fail_safe=True) | |
if sink is None: | |
await set_profile(device_id, args.profile) | |
sink = await find_sink(mac) | |
print('Device ID: %s' % device_id) | |
print('Sink: %s' % sink) | |
await set_default_sink(sink) | |
await wait() | |
await set_profile(device_id, 'off') | |
if args.profile == 'a2dp': | |
await protocol.disconnect(mac) | |
await wait() | |
await protocol.connect(mac) | |
device_id = await find_dev_id(mac) | |
print('Device ID: %s' % device_id) | |
await wait(2) | |
await set_profile(device_id, args.profile) | |
await set_default_sink(sink) | |
await move_streams_to_sink(sink) | |
except (SubprocessError, RetryExceededError) as ex: | |
print(str(ex), file=sys.stderr) | |
return 1 | |
finally: | |
print('Exiting bluetoothctl') | |
await protocol.quit() | |
await exit_future | |
# Close the stdout pipe | |
transport.close() | |
if args.profile == 'a2dp': | |
print('"Enjoy" the HiFi stereo music :)') | |
else: | |
print('"Enjoy" your headset audio :)') | |
if __name__ == '__main__': | |
sys.exit(asyncio.get_event_loop().run_until_complete(main(parser.parse_args()))) |
#!/usr/bin/python | |
import dbus | |
from dbus.mainloop.glib import DBusGMainLoop | |
import gobject | |
import subprocess | |
import time | |
# Where you keep the above script. Must be executable, obviously. | |
A2DP = '/home/mihara/.local/bin/a2dp' | |
# A list of device IDs that you wish to run it on. | |
DEV_IDS = ['00:18:09:30:FC:D8','FC:58:FA:B1:2D:25'] | |
device_paths = [] | |
devices = [] | |
dbus_loop = DBusGMainLoop() | |
bus = dbus.SystemBus(mainloop=dbus_loop) | |
def generate_handler(device_id): | |
last_ran = [0] # WHOA, this is crazy closure behavior: A simple variable does NOT work. | |
def cb(*args, **kwargs): | |
if args[0] == 'org.bluez.MediaControl1': | |
if args[1].get('Connected'): | |
print("Connected {}".format(device_id)) | |
if last_ran[0] < time.time() - 120: | |
print("Fixing...") | |
subprocess.call([A2DP,device_id]) | |
last_ran[0] = time.time() | |
else: | |
print("Disconnected {}".format(device_id)) | |
return cb | |
# Figure out the path to the headset | |
man = bus.get_object('org.bluez', '/') | |
iface = dbus.Interface(man, 'org.freedesktop.DBus.ObjectManager') | |
for device in iface.GetManagedObjects().keys(): | |
for id in DEV_IDS: | |
if device.endswith(id.replace(':','_')): | |
device_paths.append((id, device)) | |
for id, device in device_paths: | |
headset = bus.get_object('org.bluez', device) | |
headset.connect_to_signal("PropertiesChanged", generate_handler(id), dbus_interface='org.freedesktop.DBus.Properties') | |
devices.append(headset) | |
loop = gobject.MainLoop() | |
loop.run() | |
[Unit] | |
Description=Fix BT Speaker | |
[Service] | |
Type=simple | |
Restart=always | |
ExecStart=/usr/bin/fix-bt.py | |
[Install] | |
WantedBy=multi-user.target | |
#! /usr/bin/env bash | |
URL="https://gist.githubusercontent.com/pylover/d68be364adac5f946887b85e6ed6e7ae/raw/a2dp.py" | |
TARGET=/usr/local/bin/a2dp.py | |
curl $URL | sudo tee ${TARGET} > /dev/null | |
sudo chmod +x $TARGET | |
echo "a2dp installation was successfull." | |
echo "Run a2dp.py [-h/--help] and enjoy the HiFi music!" | |
i just need to start pipewire service locally, not glboally, so no pulseaudio, right, systemctl --user status pipewire
should fix pactl lists not being available, right?
so everytime I restarted pactl list was unavailable because pipewire pulseaudio was masked,
sudo apt update
sudo apt install --reinstall pipewire pipewire-pulse pipewire-audio-client-libraries
after this noo other tricks ever worked and audio never worked again and I gave up, I could uninstall and reinstall pulseaudio but I cant bother.
I kinda solved it, these were the steps...
upgraded to latest version pipewire ppa
remove blueman
edit /etc/bluetooth/audio.conf
~$ cat /etc/bluetooth/audio.conf
[General]
Enable=Control,Gateway,Headset,Media,Sink,Socket,Source
mv .config/systemd to .config/systemd.backup
uninstall https://old.reddit.com/r/pop_os/comments/ol9nxx/installing_latest_pipewire_with_working_bluetooth/
install wireplumber
sudo rm /etc/systemd/user/pipewire-media-session.service
systemctl --user --now enable wireplumber.service
systemctl --user start wireplumber.service
sudo rm -rf /var/lib/bluetooth/*
DEVICE_MAC=$(bluetoothctl devices | awk '/Device/{print $2}')
# Step 5: Pair, trust, and connect to the Bluetooth device
echo "Pairing, trusting, and connecting to the Bluetooth device..."
echo -e "pair $DEVICE_MAC\ntrust $DEVICE_MAC\nconnect $DEVICE_MAC\nquit" | bluetoothctl
New problems: Now it is either a2p or hfp , cant switch, but now I get 3 different profiles..
Profile:
off: Off (Ziele: 0, Quellen: 0, Priorität: 0, verfügbar: ja)
a2dp-sink-sbc: High Fidelity Playback (A2DP Sink, codec SBC) (Ziele: 1, Quellen: 0, Priorität: 18, verfügbar: ja)
a2dp-sink-sbc_xq: High Fidelity Playback (A2DP Sink, codec SBC-XQ) (Ziele: 1, Quellen: 0, Priorität: 17, verfügbar: ja)
a2dp-sink: High Fidelity Playback (A2DP Sink, codec AAC) (Ziele: 1, Quellen: 0, Priorität: 19, verfügbar: ja)
Aktives Profil: a2dp-sink
```
remove bluetooth reconenct and pray you get the hfp profile
Settings
└─ Default Configured Node Names:
0. Audio/Sink bluez_output.E8_AB_FA_40_25_3B.headset-head-unit
kreijstal@kreijstalnuc:~$ pactl list sinks short
165 bluez_output.E8_AB_FA_40_25_3B.headset-head-unit PipeWire s16le 1ch 16000Hz RUNNING
It's either stuck on headset-head-unit or a2dp-sink, only with wireplumber, things worked before, but oh well. omg
so I rebooted and sound was gone again, and I perfomed my daily ritual of trying random commands to see what sticks to the wall, I dont like casting
sudo dpkg-reconfigure bluez
sudo dpkg-reconfigure linux-sound-base
but it seems in this case that did the trick along with other things.. let's see if it survives this reboot,
everything was fixed with Mint 22. Finally.
sledge hammer solution, experts please help
I have to do this party trick almost every day lol
Ok, but we know it can also do microphone just like before, let us try it.
huh, so now I can not do audio calls? but before I could :(