Skip to content

Instantly share code, notes, and snippets.

@ofekp
Forked from egorf/bluetoothctl.py
Last active January 24, 2023 10:12
Show Gist options
  • Save ofekp/539ce199a96e6a9ace2c1511cc7409ce to your computer and use it in GitHub Desktop.
Save ofekp/539ce199a96e6a9ace2c1511cc7409ce to your computer and use it in GitHub Desktop.
Bluetoothctl wrapper in Python
# ReachView code is placed under the GPL license.
# Written by Egor Fedorov ([email protected])
# Copyright (c) 2015, Emlid Limited
# All rights reserved.
# If you are interested in using ReachView code as a part of a
# closed source project, please contact Emlid Limited ([email protected]).
# This file is part of ReachView.
# ReachView is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# ReachView is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with ReachView. If not, see <http://www.gnu.org/licenses/>.
import time
import pexpect
import subprocess
import sys
import re
class BluetoothctlError(Exception):
"""This exception is raised, when bluetoothctl fails to start."""
pass
class Bluetoothctl:
"""A wrapper for bluetoothctl utility."""
def __init__(self):
out = subprocess.check_output("rfkill unblock bluetooth", shell = True)
self.child = pexpect.spawn("bluetoothctl", echo = False)
def get_output(self, command, pause = 0):
"""Run a command in bluetoothctl prompt, return output as a list of lines."""
self.child.send(command + "\n")
time.sleep(pause)
start_failed = self.child.expect(["bluetooth", pexpect.EOF])
if start_failed:
raise BluetoothctlError("Bluetoothctl failed after running " + command)
return self.child.before.split("\r\n")
def start_scan(self):
"""Start bluetooth scanning process."""
try:
out = self.get_output("scan on")
except BluetoothctlError, e:
print(e)
return None
def make_discoverable(self):
"""Make device discoverable."""
try:
out = self.get_output("discoverable on")
except BluetoothctlError, e:
print(e)
return None
def parse_device_info(self, info_string):
"""Parse a string corresponding to a device."""
device = {}
block_list = ["[\x1b[0;", "removed"]
string_valid = not any(keyword in info_string for keyword in block_list)
if string_valid:
try:
device_position = info_string.index("Device")
except ValueError:
pass
else:
if device_position > -1:
attribute_list = info_string[device_position:].split(" ", 2)
device = {
"mac_address": attribute_list[1],
"name": attribute_list[2]
}
return device
def get_available_devices(self):
"""Return a list of tuples of paired and discoverable devices."""
try:
out = self.get_output("devices")
except BluetoothctlError, e:
print(e)
return None
else:
available_devices = []
for line in out:
device = self.parse_device_info(line)
if device:
available_devices.append(device)
return available_devices
def get_paired_devices(self):
"""Return a list of tuples of paired devices."""
try:
out = self.get_output("paired-devices")
except BluetoothctlError, e:
print(e)
return None
else:
paired_devices = []
for line in out:
device = self.parse_device_info(line)
if device:
paired_devices.append(device)
return paired_devices
def get_discoverable_devices(self):
"""Filter paired devices out of available."""
available = self.get_available_devices()
paired = self.get_paired_devices()
return [d for d in available if d not in paired]
def get_device_info(self, mac_address):
"""Get device info by mac address."""
try:
out = self.get_output("info " + mac_address)
except BluetoothctlError, e:
print(e)
return None
else:
return out
def get_connectable_devices(self):
"""Get a list of connectable devices.
Must install 'sudo apt-get install bluez blueztools' to use this"""
try:
res = []
out = subprocess.check_output(["hcitool", "scan"]) # Requires 'apt-get install bluez'
out = out.split("\n")
device_name_re = re.compile("^\t([0-9,:,A-F]{17})\t(.*)$")
for line in out:
device_name = device_name_re.match(line)
if device_name != None:
res.append({
"mac_address": device_name.group(1),
"name": device_name.group(2)
})
except BluetoothctlError, e:
print(e)
return None
else:
return res
def is_connected(self):
"""Returns True if there is a current connection to any device, otherwise returns False"""
try:
res = False
out = subprocess.check_output(["hcitool", "con"]) # Requires 'apt-get install bluez'
out = out.split("\n")
mac_addr_re = re.compile("^.*([0-9,:,A-F]{17}).*$")
for line in out:
mac_addr = mac_addr_re.match(line)
if mac_addr != None:
res = True
except BluetoothctlError, e:
print(e)
return None
else:
return res
def pair(self, mac_address):
"""Try to pair with a device by mac address."""
try:
out = self.get_output("pair " + mac_address, 4)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF])
success = True if res == 1 else False
return success
def remove(self, mac_address):
"""Remove paired device by mac address, return success of the operation."""
try:
out = self.get_output("remove " + mac_address, 3)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["not available", "Device has been removed", pexpect.EOF])
success = True if res == 1 else False
return success
def connect(self, mac_address):
"""Try to connect to a device by mac address."""
try:
out = self.get_output("connect " + mac_address, 2)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF])
success = True if res == 1 else False
return success
def disconnect(self, mac_address):
"""Try to disconnect to a device by mac address."""
try:
out = self.get_output("disconnect " + mac_address, 2)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF])
success = True if res == 1 else False
return success
def trust(self, mac_address):
"""Trust the device with the given MAC address"""
try:
out = self.get_output("trust " + mac_address, 4)
except BluetoothctlError, e:
print(e)
return None
else:
res = self.child.expect(["not available", "trust succeeded", pexpect.EOF])
success = True if res == 1 else False
return success
def start_agent(self):
"""Start agent"""
try:
out = self.get_output("agent on")
except BluetoothctlError, e:
print(e)
return None
def default_agent(self):
"""Start default agent"""
try:
out = self.get_output("default-agent")
except BluetoothctlError, e:
print(e)
return None
if __name__ == "__main__":
print("Init bluetooth...")
bl = Bluetoothctl()
print("Ready!")
bl.start_scan()
print("Scanning for 10 seconds...")
for i in range(0, 10):
print(i)
time.sleep(1)
print(bl.get_discoverable_devices())
@ntanenho
Copy link

ntanenho commented Dec 1, 2017

If I start the agent and default-agent, how can I get the pi to answer the pair request, so that it can pair with the device?

@ntanenho
Copy link

I am getting this error when I try to connect to a device that is connected to another device, it should return "Failed to connect" but it's a TIMEOUT exception.

raise TIMEOUT(msg)
pexpect.exceptions.TIMEOUT: Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0xb62e3ab0>
command: /usr/bin/bluetoothctl
args: ['/usr/bin/bluetoothctl']
buffer (last 100 chars): 'mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD RSSI: -47\r\n[\x 1b[0;93mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD TxPower: 4\r\n'
before (last 100 chars): 'mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD RSSI: -47\r\n[\x 1b[0;93mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD TxPower: 4\r\n'
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 544
child_fd: 5
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
0: re.compile("Failed to connect")
1: re.compile("Connection successful")
2: EOF

@molul
Copy link

molul commented May 3, 2019

EDIT2: changed "start_failed = self.child.expect(["bluetooth", pexpect.EOF])" to "start_failed = self.child.expect( [ "#", pexpect.EOF ] )", and now it works even if you have another bluetooth deviced connected.

EDIT: I think I know where the problem is. the line "start_failed = self.child.expect(["bluetooth", pexpect.EOF])" is waiting for the bluetoothctl prompt to be "[bluetooth]", and when a device is already connected, the prompt is "[NAME OF THE DEVICE]". Now I'm figuring out how to make it ignore that.


Thanks for the code. Not sure if it's a bug, but if there's any other device already connected, the script gets stuck on "Ready!" message, and after a few second it stops and prints this:

Traceback (most recent call last):
File "scripts/bluetoothctl.py", line 270, in
bl.start_scan()
File "scripts/bluetoothctl.py", line 60, in start_scan
out = self.get_output("scan on")
File "scripts/bluetoothctl.py", line 50, in get_output
start_failed = self.child.expect(["bluetooth", pexpect.EOF])
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/spawnbase.py", line 341, in expect
timeout, searchwindowsize, async_)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/spawnbase.py", line 369, in expect_list
return exp.expect_loop(timeout)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/expect.py", line 119, in expect_loop
return self.timeout(e)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/expect.py", line 82, in timeout
raise TIMEOUT(msg)
pexpect.exceptions.TIMEOUT: Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0x769f98f0>
command: /usr/bin/bluetoothctl
args: ['/usr/bin/bluetoothctl']
buffer (last 100 chars): 'M30 Modkit]\x1b[0m# Discovery started\r\n[\x1b[0;93mCHG\x1b[0m] Controller B8:27:EB:94:DE:6D Discovering: yes\r\n'
before (last 100 chars): 'M30 Modkit]\x1b[0m# Discovery started\r\n[\x1b[0;93mCHG\x1b[0m] Controller B8:27:EB:94:DE:6D Discovering: yes\r\n'
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 688
child_fd: 5
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 1
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
0: re.compile('bluetooth')
1: EOF

@ShibaSama
Copy link

If you replace "bluetooth" with "(\x1b\\[0;94m)\\[.+\\](\x1b\\[0m)" it will work when connected to a device. This checks for any text between a set of brackets that's wrapped in the ansi color code bluetoothctl uses for the prompt text.

@loichu
Copy link

loichu commented Mar 5, 2021

Thank you @ShibaSama that works great !

@Aswinifolder
Copy link

If pairing is not working, try calling the "connect" function this will automatically try attempting to pair the device to its client. This worked for me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment