-
-
Save ofekp/539ce199a96e6a9ace2c1511cc7409ce to your computer and use it in GitHub Desktop.
# ReachView code is placed under the GPL license. | |
# Written by Egor Fedorov ([email protected]) | |
# Copyright (c) 2015, Emlid Limited | |
# All rights reserved. | |
# If you are interested in using ReachView code as a part of a | |
# closed source project, please contact Emlid Limited ([email protected]). | |
# This file is part of ReachView. | |
# ReachView is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# ReachView is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# You should have received a copy of the GNU General Public License | |
# along with ReachView. If not, see <http://www.gnu.org/licenses/>. | |
import time | |
import pexpect | |
import subprocess | |
import sys | |
import re | |
class BluetoothctlError(Exception): | |
"""This exception is raised, when bluetoothctl fails to start.""" | |
pass | |
class Bluetoothctl: | |
"""A wrapper for bluetoothctl utility.""" | |
def __init__(self): | |
out = subprocess.check_output("rfkill unblock bluetooth", shell = True) | |
self.child = pexpect.spawn("bluetoothctl", echo = False) | |
def get_output(self, command, pause = 0): | |
"""Run a command in bluetoothctl prompt, return output as a list of lines.""" | |
self.child.send(command + "\n") | |
time.sleep(pause) | |
start_failed = self.child.expect(["bluetooth", pexpect.EOF]) | |
if start_failed: | |
raise BluetoothctlError("Bluetoothctl failed after running " + command) | |
return self.child.before.split("\r\n") | |
def start_scan(self): | |
"""Start bluetooth scanning process.""" | |
try: | |
out = self.get_output("scan on") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
def make_discoverable(self): | |
"""Make device discoverable.""" | |
try: | |
out = self.get_output("discoverable on") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
def parse_device_info(self, info_string): | |
"""Parse a string corresponding to a device.""" | |
device = {} | |
block_list = ["[\x1b[0;", "removed"] | |
string_valid = not any(keyword in info_string for keyword in block_list) | |
if string_valid: | |
try: | |
device_position = info_string.index("Device") | |
except ValueError: | |
pass | |
else: | |
if device_position > -1: | |
attribute_list = info_string[device_position:].split(" ", 2) | |
device = { | |
"mac_address": attribute_list[1], | |
"name": attribute_list[2] | |
} | |
return device | |
def get_available_devices(self): | |
"""Return a list of tuples of paired and discoverable devices.""" | |
try: | |
out = self.get_output("devices") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
available_devices = [] | |
for line in out: | |
device = self.parse_device_info(line) | |
if device: | |
available_devices.append(device) | |
return available_devices | |
def get_paired_devices(self): | |
"""Return a list of tuples of paired devices.""" | |
try: | |
out = self.get_output("paired-devices") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
paired_devices = [] | |
for line in out: | |
device = self.parse_device_info(line) | |
if device: | |
paired_devices.append(device) | |
return paired_devices | |
def get_discoverable_devices(self): | |
"""Filter paired devices out of available.""" | |
available = self.get_available_devices() | |
paired = self.get_paired_devices() | |
return [d for d in available if d not in paired] | |
def get_device_info(self, mac_address): | |
"""Get device info by mac address.""" | |
try: | |
out = self.get_output("info " + mac_address) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
return out | |
def get_connectable_devices(self): | |
"""Get a list of connectable devices. | |
Must install 'sudo apt-get install bluez blueztools' to use this""" | |
try: | |
res = [] | |
out = subprocess.check_output(["hcitool", "scan"]) # Requires 'apt-get install bluez' | |
out = out.split("\n") | |
device_name_re = re.compile("^\t([0-9,:,A-F]{17})\t(.*)$") | |
for line in out: | |
device_name = device_name_re.match(line) | |
if device_name != None: | |
res.append({ | |
"mac_address": device_name.group(1), | |
"name": device_name.group(2) | |
}) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
return res | |
def is_connected(self): | |
"""Returns True if there is a current connection to any device, otherwise returns False""" | |
try: | |
res = False | |
out = subprocess.check_output(["hcitool", "con"]) # Requires 'apt-get install bluez' | |
out = out.split("\n") | |
mac_addr_re = re.compile("^.*([0-9,:,A-F]{17}).*$") | |
for line in out: | |
mac_addr = mac_addr_re.match(line) | |
if mac_addr != None: | |
res = True | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
return res | |
def pair(self, mac_address): | |
"""Try to pair with a device by mac address.""" | |
try: | |
out = self.get_output("pair " + mac_address, 4) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success | |
def remove(self, mac_address): | |
"""Remove paired device by mac address, return success of the operation.""" | |
try: | |
out = self.get_output("remove " + mac_address, 3) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["not available", "Device has been removed", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success | |
def connect(self, mac_address): | |
"""Try to connect to a device by mac address.""" | |
try: | |
out = self.get_output("connect " + mac_address, 2) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success | |
def disconnect(self, mac_address): | |
"""Try to disconnect to a device by mac address.""" | |
try: | |
out = self.get_output("disconnect " + mac_address, 2) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success | |
def trust(self, mac_address): | |
"""Trust the device with the given MAC address""" | |
try: | |
out = self.get_output("trust " + mac_address, 4) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["not available", "trust succeeded", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success | |
def start_agent(self): | |
"""Start agent""" | |
try: | |
out = self.get_output("agent on") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
def default_agent(self): | |
"""Start default agent""" | |
try: | |
out = self.get_output("default-agent") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
if __name__ == "__main__": | |
print("Init bluetooth...") | |
bl = Bluetoothctl() | |
print("Ready!") | |
bl.start_scan() | |
print("Scanning for 10 seconds...") | |
for i in range(0, 10): | |
print(i) | |
time.sleep(1) | |
print(bl.get_discoverable_devices()) |
I am getting this error when I try to connect to a device that is connected to another device, it should return "Failed to connect" but it's a TIMEOUT exception.
raise TIMEOUT(msg)
pexpect.exceptions.TIMEOUT: Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0xb62e3ab0>
command: /usr/bin/bluetoothctl
args: ['/usr/bin/bluetoothctl']
buffer (last 100 chars): 'mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD RSSI: -47\r\n[\x 1b[0;93mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD TxPower: 4\r\n'
before (last 100 chars): 'mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD RSSI: -47\r\n[\x 1b[0;93mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD TxPower: 4\r\n'
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 544
child_fd: 5
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
0: re.compile("Failed to connect")
1: re.compile("Connection successful")
2: EOF
EDIT2: changed "start_failed = self.child.expect(["bluetooth", pexpect.EOF])" to "start_failed = self.child.expect( [ "#", pexpect.EOF ] )", and now it works even if you have another bluetooth deviced connected.
EDIT: I think I know where the problem is. the line "start_failed = self.child.expect(["bluetooth", pexpect.EOF])" is waiting for the bluetoothctl prompt to be "[bluetooth]", and when a device is already connected, the prompt is "[NAME OF THE DEVICE]". Now I'm figuring out how to make it ignore that.
Thanks for the code. Not sure if it's a bug, but if there's any other device already connected, the script gets stuck on "Ready!" message, and after a few second it stops and prints this:
Traceback (most recent call last):
File "scripts/bluetoothctl.py", line 270, in
bl.start_scan()
File "scripts/bluetoothctl.py", line 60, in start_scan
out = self.get_output("scan on")
File "scripts/bluetoothctl.py", line 50, in get_output
start_failed = self.child.expect(["bluetooth", pexpect.EOF])
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/spawnbase.py", line 341, in expect
timeout, searchwindowsize, async_)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/spawnbase.py", line 369, in expect_list
return exp.expect_loop(timeout)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/expect.py", line 119, in expect_loop
return self.timeout(e)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/expect.py", line 82, in timeout
raise TIMEOUT(msg)
pexpect.exceptions.TIMEOUT: Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0x769f98f0>
command: /usr/bin/bluetoothctl
args: ['/usr/bin/bluetoothctl']
buffer (last 100 chars): 'M30 Modkit]\x1b[0m# Discovery started\r\n[\x1b[0;93mCHG\x1b[0m] Controller B8:27:EB:94:DE:6D Discovering: yes\r\n'
before (last 100 chars): 'M30 Modkit]\x1b[0m# Discovery started\r\n[\x1b[0;93mCHG\x1b[0m] Controller B8:27:EB:94:DE:6D Discovering: yes\r\n'
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 688
child_fd: 5
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 1
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
0: re.compile('bluetooth')
1: EOF
If you replace "bluetooth"
with "(\x1b\\[0;94m)\\[.+\\](\x1b\\[0m)"
it will work when connected to a device. This checks for any text between a set of brackets that's wrapped in the ansi color code bluetoothctl uses for the prompt text.
Thank you @ShibaSama that works great !
If pairing is not working, try calling the "connect" function this will automatically try attempting to pair the device to its client. This worked for me
If I start the agent and default-agent, how can I get the pi to answer the pair request, so that it can pair with the device?