Skip to content

Instantly share code, notes, and snippets.

@maurom
Created March 23, 2021 13:27
Show Gist options
  • Save maurom/ab66ec8a408642930913bfbf372a1c80 to your computer and use it in GitHub Desktop.
Save maurom/ab66ec8a408642930913bfbf372a1c80 to your computer and use it in GitHub Desktop.
EC717 Bluetooth Headset Connector for Linux/PulseAudio
#!/usr/bin/env python3
"""EC717 Bluetooth Headset Connector for Linux/PulseAudio
Version 20210228.01
RATIONALE
---------
I got a bluetooth headset as a gift and couldn't get it to work with Linux.
So, given that HFP/HSP profile support for certain Bluetooth headsets in
PulseAudio is a matter of luck (see below), this little script builds
a pair of virtual source/sink and maps them to a plain SCO audio socket.
Parts of this script come from Nikolay Khabarov work at
https://github.com/Nikolay-Kha/BluetoothAudio
Requires:
- python3-bluez
- python3-numpy (optional)
WARNING
-------
This is garbage code written in a rush. Audio quality is bad (per spec).
Don't expect good syntax, thread handling, documentation nor Google style.
It just barely works and you're invited to polish it.
You WILL HAVE better luck using James Bottomley's pulseadio patches from
- https://git.kernel.org/pub/scm/linux/kernel/git/jejb/pulseaudio.git/log/?h=jejb-v13
- https://git.kernel.org/pub/scm/linux/kernel/git/jejb/pulseaudio.git/log/?h=jejb-v12
Just for the record, probably it's me as a bad coder, but I think Bluetooth
and its myriad of protocols are an awful technology.
RELEVANT DOCUMENTATION
----------------------
Bluetooth Specifications:
- https://www.bluetooth.com/specifications/specs/hands-free-profile-1-8/
- https://www.bluetooth.com/specifications/specs/headset-profile-1-2/ (old)
- https://www.bluetooth.com/specifications/specs/core-specification-5-2/
- https://www.bluetooth.com/specifications/assigned-numbers/
HFP/HSP issues:
- https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/issues/84
- https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/issues/122
- https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/issues/884
- https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/288
- https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/491
LICENSE
-------
The MIT License (MIT)
Copyright (c) 2021, Mauro A. Meloni <com.gmail@maumeloni>
Portions Copyright (c) 2017 Nikolay Khabarov
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from argparse import ArgumentParser
import logging
from pathlib import Path
from queue import PriorityQueue
import struct
from subprocess import check_output, CalledProcessError
import sys
import threading
import time
try:
import bluetooth
except ImportError:
logging.error('Please install python3-bluez and python3-numpy packages')
raise
try:
import pumpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
# Bluetooth Headset MAC Addresses
VENDOR_PREFIX = '41:42:29'
DEFAULT_ADDRESS = '41:42:29:12:9B:CF'
class PulseAudioPipe:
TYPE_SOURCE = 'source'
TYPE_SINK = 'sink'
VALID_TYPES = (TYPE_SOURCE, TYPE_SINK)
def __init__(self, pipe_type, path):
self.path = Path(path)
if pipe_type not in self.VALID_TYPES:
raise ValueError('Pipe type %s is not one of %s' % (pipe_type, self.VALID_TYPES))
self.type = pipe_type
self.module_id = None
self.object_id = None
self.stream = None
def create(self):
if self.module_id is not None:
logging.warning('Unable to create %s. This pipe is already connected to Pulseaudio module %d',
self.type, self.module_id)
return False
if self.path.exists():
logging.warning('Unable to create %s. Path %s already exists', self.type, self.path)
return False
logging.debug('Creating virtual pulseaudio %s', self.type)
if self.type == self.TYPE_SOURCE:
cmd = [
'pactl', 'load-module', 'module-pipe-source', 'source_name=ec717',
'source_properties=device.description="EC717\ Headset\ Mic"',
'file=%s' % self.path, 'format=s16le', 'rate=8000', 'channels=1'
]
cmd2 = ['pactl', 'list', 'sources', 'short']
elif self.type == self.TYPE_SINK:
cmd = [
'pactl', 'load-module', 'module-pipe-sink', 'sink_name=ec717',
'sink_properties=device.description="EC717\ Headset"',
'file=%s' % self.path, 'format=s16le', 'rate=8000', 'channels=2',
]
cmd2 = ['pactl', 'list', 'sinks', 'short']
else:
logging.error('Pipe type %s is not one of %s', self.type, self.VALID_TYPES)
return False
try:
output = check_output(cmd).decode('UTF-8').strip()
except CalledProcessError as e:
logging.error(str(e))
raise
if not output.isdigit():
logging.warning('Unable to create %s: %s', self.type, output)
return False
self.module_id = output
try:
output = check_output(cmd2).decode('UTF-8')
for line in output.split('\n'):
if 'ec717' in line:
self.object_id = line.split('\t')[0]
except CalledProcessError as e:
logging.error(str(e))
if self.stream:
self.stream.close()
if self.type == self.TYPE_SOURCE:
self.stream = open(self.path, 'wb')
elif self.type == self.TYPE_SINK:
self.stream = open(self.path, 'rb')
return True
def connected(self):
return self.module_id is not None and self.path.exists()
def mute(self):
if self.object_id is None:
logging.warning('Unable to mute %s', self.type)
return False
cmd = ['pactl', 'set-%s-mute' % self.type, self.object_id, 'toggle']
try:
output = check_output(cmd).decode('UTF-8').strip()
except CalledProcessError as e:
logging.error(str(e))
raise
if output.strip():
logging.warning('Unable to mute %s: %s', self.type, output)
return False
return True
def destroy(self):
if self.module_id is not None:
logging.debug('Destroying virtual pulseaudio %s', self.type)
cmd = ['pactl', 'unload-module', self.module_id]
try:
output = check_output(cmd).decode('UTF-8').strip()
except CalledProcessError as e:
logging.error(str(e))
return False
if output.strip():
logging.warning('Unable to destroy %s: %s', self.type, output)
return False
if self.stream:
try:
self.stream.close()
except BrokenPipeError:
pass
self.stream = None
if self.path.exists():
self.path.unlink()
return True
def write(self, data):
try:
return self.stream.write(data)
except (AttributeError, BrokenPipeError, ValueError):
return None
def read(self, length=0):
try:
return self.stream.read(length)
except (AttributeError, BrokenPipeError, ValueError):
return b''
class BluetoothHeadset:
""" This object connect to Bluetooth handset/handsfree device
stream audio from microphone and to speaker.
"""
# https://www.bluetooth.com/specifications/assigned-numbers/service-discovery/
L2CAP_UUID = "0100"
HFP_TIMEOUT = 1.0
def __init__(self, address):
""" Create object which connects to bluetooth device in the background.
Class automatically reconnects to the device in case of any errors.
:param address: MAC address of Bluetooth device, string.
"""
self.addr = address
self.channel = None
self.service_conn = BluetoothServiceConnection()
self.service_conn.cmer_callback = self._connect_audio
self.service_conn.bldn_callback = self._mute
self.source = PulseAudioPipe('source', '/tmp/ec717.source')
self.sink = PulseAudioPipe('sink', '/tmp/ec717.sink')
self.audio_conn = BluetoothAudioConnection(self.source, self.sink)
def _cleanup(self):
logging.debug('Cleaning up')
self.channel = None
self.sink.destroy()
self.source.destroy()
if self.service_conn.connected():
self.service_conn.close()
if self.audio_conn.connected():
self.audio_conn.close()
def close(self):
self._cleanup()
def wait_for_device(self):
logging.info('HFPDevice class is initialised, looking for %s', self.addr)
try:
while True:
logging.info('Awaiting bluetooth device...')
self._find_channel()
if not self.channel:
time.sleep(self.HFP_TIMEOUT)
continue
self.service_conn.connect(self.addr, self.channel)
if not self.service_conn.connected():
time.sleep(self.HFP_TIMEOUT)
continue
while self.service_conn.connected():
time.sleep(self.HFP_TIMEOUT)
except KeyboardInterrupt:
pass
self._cleanup()
def _connect_audio(self, hfp):
logging.debug('Called _connect_audio()')
self.source.create()
self.sink.create()
self.audio_conn.connect(self.addr)
def _find_channel(self):
services = bluetooth.find_service(address=self.addr, uuid=self.L2CAP_UUID)
channels = PriorityQueue(5)
for i, svc in enumerate(services):
logging.debug('Service %d %s', i, svc['name'])
for c in svc["service-classes"]:
channel = int(svc["port"])
logging.debug(' Service %s, channel %d', c, channel)
service_class = c.lower()
if bluetooth.HANDSFREE_CLASS.lower() == service_class:
# logging.debug('Handsfree Profile (HFP) found on RFCOMM channel %d', channel)
channels.put((1, channel, 'hfp', 'Handsfree Profile (HFP)'))
elif bluetooth.HEADSET_CLASS.lower() == service_class:
# logging.info('Headset Profile (HSP) found on RFCOMM channel %d', channel)
channels.put((2, channel, 'hsp', 'Headset Profile (HSP)'))
elif bluetooth.GENERIC_AUDIO_CLASS.lower() == service_class:
# logging.info('Generic Audio Profile found on RFCOMM channel %d', channel)
channels.put((3, channel, 'generic', 'Generic Audio Profile'))
for p_uuid, p_extra in svc['profiles']:
logging.debug(' Profile %s %s', p_uuid, str(p_extra))
if not channels:
logging.info('No available service channels found found')
selected = channels.get()
logging.debug('Found %s on channel %d', selected[3], selected[1])
self.channel = selected[1]
def _mute(self, hfp):
logging.debug('Called _mute()')
self.source.mute()
class BluetoothServiceConnection:
HFP_TIMEOUT = 1.0
AG_SUPPORTED_FEATURES_BITMAP = 2**9 + 0
def __init__(self):
self.address = None
self.channel = None
self.socket = None
self.thread = None
self.cmer_callback = None # called when ready to establish audio connection
self.bldn_callback = None # called on call button press
def connect(self, address, channel):
self.close()
self.address = address
self.channel = channel
hfp = bluetooth.BluetoothSocket(bluetooth.RFCOMM) # Radio frequency communication (RFCOMM)
try:
logging.debug('Connecting to %s on channel %d ...', self.address, self.channel)
hfp.connect((self.address, self.channel))
except bluetooth.btcommon.BluetoothError as e:
hfp.close()
logging.warning('Failed to establish service level connection: %s', e)
return False
hfp.settimeout(self.HFP_TIMEOUT)
logging.info('HSP/HFP service level connection is established')
self.socket = hfp
self.thread = threading.Thread(target=self._worker_loop)
self.thread.start()
logging.info('HSP/HFP service level thread started')
return True
def connected(self):
return self.socket is not None
def _worker_loop(self):
while self.thread:
try:
self._read_loop()
except bluetooth.btcommon.BluetoothError as e:
logging.warning('Service level connection disconnected: %s', e)
time.sleep(self.HFP_TIMEOUT)
self.close()
def _read_loop(self):
svc_buffer = bytes()
newline = b'\r\n'
while self.thread:
time.sleep(0.01)
command = None
while self.thread and b'\r' not in svc_buffer:
try:
svc_buffer += self.socket.recv(1024)
except bluetooth.btcommon.BluetoothError as e:
# logging.debug('Timed out waiting for service connection data')
if str(e) != 'timed out':
raise
continue
if newline in svc_buffer:
try:
command, svc_buffer = svc_buffer.split(newline)
except ValueError:
print(newline)
raise
command = command.decode('UTF-8').strip()
logging.debug('>> %s', command)
value = None
if '=' in command:
command, value = command.split('=')
# HFP_v1.8.pdf, pages 25-30
if command == 'AT+BRSF' and value:
# HFP_v1.8.pdf, page 97
# HF_SUPPORTED_FEATURES
bitmap = int(value)
logging.debug('Supported Features: %s = %s', bitmap, bin(bitmap))
self._send_at('+BRSF: %s' % str(self.AG_SUPPORTED_FEATURES_BITMAP))
self._send_ok()
elif command == 'AT+CIND':
self._send_at('+CIND: ("service",(0,1)),("call",(0,1))')
self._send_ok()
elif command == 'AT+CIND?':
self._send_at('+CIND: 1,0')
self._send_ok()
elif command == 'AT+CMER' and value:
if value == '3,0,0,1':
logging.debug('HF activates "indicator events reporting"')
elif value == '3,0,0,0':
logging.debug('HF deactivates "indicator events reporting"')
else:
logging.debug('Unknown value: %s', value)
self._send_ok()
# after this command we can establish audio connection
if self.cmer_callback is not None:
self.cmer_callback(self)
elif command == 'AT+CHLD' and value is not None and value == '?':
self._send_at('+CHLD: 0')
self._send_ok()
elif command == 'AT+CLIP':
self._send_ok()
elif command == 'AT+BIA':
self._send_ok()
elif command == 'AT+VGM':
logging.debug('Hands-free reported Gain of Microphone: %s', value)
self._send_ok()
elif command == 'AT+VGS':
logging.debug('Hands-free reported Gain of Speaker: %s', value)
self._send_ok()
elif command == 'AT+BLDN':
# logging.debug('Hands-free requested Bluetooth Last Dialed Number')
self._send_ok()
if self.bldn_callback is not None:
self.bldn_callback(self)
else:
if value is None:
logging.debug('Unhandled command: %s', command)
else:
logging.debug('Unhandled command: %s=%s', command, value)
self._send_error()
def _send(self, data):
logging.debug('<< %s', repr(data))
self.socket.send(data)
def _send_at(self, data):
logging.debug('<< %s', data)
self.socket.send(('\r\n' + data + '\r\n').encode('UTF-8'))
def _send_ok(self):
self._send_at('OK')
def _send_error(self):
self._send_at('ERROR')
def close(self):
if self.thread is not None:
# thread = self.thread
self.thread = None
# thread.join()
logging.info('HSP/HFP service level thread ended')
if self.socket is not None:
self.socket.close()
self.socket = None
class BluetoothAudioConnection:
# https://github.com/torvalds/linux/blob/v5.4/include/net/bluetooth/bluetooth.h#L114-L120
BT_VOICE = 11
BT_VOICE_CVSD_16BIT = 0x0060
# "Volume 2, Section 6.12, of the Bluetooth 2.0 Spec"
# btv20.pdf, page 553
# Input Coding: Linear, Input Data Format: 2's complement, Input Sample Size: 16-bit, Air Coding Format: CVSD
BT_VOICE_CONFIGURATION = 0b0000000000 + 0b0001000000 + 0b0000100000 + 0b0000000000 + 0b0000000000
# https://github.com/torvalds/linux/blob/v5.4/include/linux/socket.h#L339
SOL_BLUETOOTH = 274
# https://github.com/torvalds/linux/blob/v5.4/include/net/bluetooth/bluetooth.h#L59
SOL_SCO = 17
SCO_OPTIONS = 1
SCO_HEADERS_SIZE = 16 // 8
def __init__(self, source, sink):
self.source = source
self.sink = sink
self.sco_payload = None
self.socket = None
self.rthread = None
self.wthread = None
# self.threadlock = threading.Lock()
def connect(self, address):
audio = bluetooth.BluetoothSocket(bluetooth.SCO) # Synchronous Connection-Oriented (SCO) link
# https://www.sciencedirect.com/topics/engineering/synchronous-connection
logging.info('Estalishing voice configuration 0x%04x', self.BT_VOICE_CONFIGURATION)
opt = struct.pack("H", self.BT_VOICE_CONFIGURATION)
audio.setsockopt(self.SOL_BLUETOOTH, self.BT_VOICE, opt)
try:
audio.connect((address,))
except bluetooth.btcommon.BluetoothError as e:
audio.close()
logging.error('Failed to establish audio connection: %s', e)
return False
opt = audio.getsockopt(self.SOL_SCO, self.SCO_OPTIONS, 2)
mtu = struct.unpack('H', opt)[0]
self.sco_payload = mtu - self.SCO_HEADERS_SIZE
self.socket = audio
self.rthread = threading.Thread(target=self._read_loop)
self.rthread.start()
self.wthread = threading.Thread(target=self._write_loop)
self.wthread.start()
logging.info('Audio connection is established, threads started, mtu = %s', mtu)
return True
def connected(self):
return self.socket is not None
def _read_loop(self):
# https://linuxlists.cc/l/2/bluez-devel/t/2673948/(bluez-devel)_sco_usb_isoc_transfer
# Here the 2C0030 is the correct SCO packet header
# http://hccc.ee.ccu.edu.tw/courses/bt/Bluetooth_Project_OK/hci.c
# Defines for the packet types
logging.info('Audio read loop start')
while self.rthread:
try:
data = self.socket.recv(self.sco_payload)
except bluetooth.btcommon.BluetoothError:
data = None
if not data or len(data) == 0:
self.socket.close()
self.socket = None
logging.warning('Audio capture failed')
break
self.source.write(data)
time.sleep(0.0000625)
logging.info('Audio read loop stop')
def _write_loop(self):
logging.info('Audio write loop start')
while self.wthread:
# read stereo data
data = self.sink.read(self.sco_payload) + self.sink.read(self.sco_payload)
if HAS_NUMPY:
# numpy downsample (add both channels, this will clip!)
stereo = np.frombuffer(data, dtype=np.int16)
mono = (stereo[::2] + stereo[1::2]).tobytes()
else:
# manual downsample (keeping only left channel)
mono = b''
for offset in range(0, len(data), 4):
mono += data[offset:offset+2]
self.socket.send(mono)
time.sleep(0.0000625)
logging.info('Audio write loop stop')
def close(self):
if self.rthread is not None:
# rthread = self.rthread
self.rthread = None
# rthread.join()
logging.info('Audio capture thread ended')
if self.wthread is not None:
# wthread = self.wthread
self.wthread = None
# wthread.join()
logging.info('Audio output thread ended')
if self.socket is not None:
self.socket.close()
self.socket = None
def main():
description = 'EC717 Bluetooth Headset Connector for PulseAudio/Linux'
parser = ArgumentParser(description=description)
parser.add_argument('-d', '--debug', action='store_true', default=False)
parser.add_argument('mac', type=str, metavar='BLUETOOTH_MAC_ADDRESS', nargs='?',
help='MAC address of headset or "scan" to scan for it')
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG, format='%(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(description)
# ask for the headset mac address
hwaddress = None
parameter = args.mac if args.mac is not None else DEFAULT_ADDRESS
if not parameter:
logging.error("Please specify device MAC address or 'scan' to scan it.")
sys.exit(1)
elif parameter == 'scan':
# scan for devices
logging.info('Scanning for devices ...')
nearby_devices = bluetooth.discover_devices(duration=4, lookup_names=True,
flush_cache=True, lookup_class=False)
print(nearby_devices)
for address, name in nearby_devices:
if address.startswith(VENDOR_PREFIX):
hwaddress = address
logging.info('Found compatible bluetooth headphone %s on %s', name, hwaddress)
if not hwaddress:
return
else:
hwaddress = parameter
if not bluetooth.is_valid_address(hwaddress):
logging.error('Wrong device address: %s', hwaddress)
sys.exit(2)
if not HAS_NUMPY:
logging.warning('Using manual downmixing. Install python3-numpy package for faster audio processing')
hf = BluetoothHeadset(hwaddress)
hf.wait_for_device()
# hf.close()
return
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment