Created
May 9, 2018 08:38
-
-
Save xiopt/a2bd9a0dd24191310a614ae748064262 to your computer and use it in GitHub Desktop.
Python A2DP Bluetooth Connection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import os | |
import re | |
from time import sleep | |
import sys | |
from Bluetoothctl import Bluetoothctl | |
DEVICE_MAC = '04:5D:4B:3A:29:FB' | |
def getDeviceConnectionStatus(): | |
output = os.popen('pacmd list-cards').read() | |
indexRegex = 'index\:\s(.*)\s+name\:\s<bluez_card.' + DEVICE_MAC.replace(':', '_') | |
index = re.search(indexRegex, output).group(1) | |
useless,main = output.split('device.string = "' + DEVICE_MAC + '"') | |
profileRegex = 'active\sprofile:\s\<(.*)\>\s+' | |
profile = re.search(profileRegex, main).group(1) | |
return [index, profile] | |
data = [8, 'off'] | |
while(data[1] == 'off'): | |
# print os.system('sudo systemctl restart bluetooth') | |
print 'Restarting bluetooth...' | |
subprocess.call(['sudo', 'systemctl', 'restart', 'bluetooth']) | |
sleep(5) | |
try: | |
print 'Connecting to device...' | |
bl = Bluetoothctl() | |
bl.connect(DEVICE_MAC) | |
except: | |
print 'Error connecting to device...Retrying' | |
continue | |
sleep(7) | |
try: | |
print 'Parsing status...' | |
data = getDeviceConnectionStatus() | |
except: | |
print 'Error parsing status...Retrying' | |
continue | |
print 'Offline mode..Retrying' if data[1] == 'off' else 'A2DP Mode!! Finished!' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ReachView code is placed under the GPL license. | |
# Written by Egor Fedorov ([email protected]) | |
# Copyright (c) 2015, Emlid Limited | |
# All rights reserved. | |
# If you are interested in using ReachView code as a part of a | |
# closed source project, please contact Emlid Limited ([email protected]). | |
# This file is part of ReachView. | |
# ReachView is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# ReachView is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# You should have received a copy of the GNU General Public License | |
# along with ReachView. If not, see <http://www.gnu.org/licenses/>. | |
import time | |
import pexpect | |
import subprocess | |
import sys | |
class BluetoothctlError(Exception): | |
"""This exception is raised, when bluetoothctl fails to start.""" | |
pass | |
class Bluetoothctl: | |
"""A wrapper for bluetoothctl utility.""" | |
def __init__(self): | |
out = subprocess.check_output("rfkill unblock bluetooth", shell = True) | |
self.child = pexpect.spawn("bluetoothctl", echo = False) | |
def get_output(self, command, pause = 0): | |
"""Run a command in bluetoothctl prompt, return output as a list of lines.""" | |
self.child.send(command + "\n") | |
time.sleep(pause) | |
start_failed = self.child.expect(["bluetooth", pexpect.EOF]) | |
if start_failed: | |
raise BluetoothctlError("Bluetoothctl failed after running " + command) | |
return self.child.before.split("\r\n") | |
def start_scan(self): | |
"""Start bluetooth scanning process.""" | |
try: | |
out = self.get_output("scan on") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
def make_discoverable(self): | |
"""Make device discoverable.""" | |
try: | |
out = self.get_output("discoverable on") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
def parse_device_info(self, info_string): | |
"""Parse a string corresponding to a device.""" | |
device = {} | |
block_list = ["[\x1b[0;", "removed"] | |
string_valid = not any(keyword in info_string for keyword in block_list) | |
if string_valid: | |
try: | |
device_position = info_string.index("Device") | |
except ValueError: | |
pass | |
else: | |
if device_position > -1: | |
attribute_list = info_string[device_position:].split(" ", 2) | |
device = { | |
"mac_address": attribute_list[1], | |
"name": attribute_list[2] | |
} | |
return device | |
def get_available_devices(self): | |
"""Return a list of tuples of paired and discoverable devices.""" | |
try: | |
out = self.get_output("devices") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
available_devices = [] | |
for line in out: | |
device = self.parse_device_info(line) | |
if device: | |
available_devices.append(device) | |
return available_devices | |
def get_paired_devices(self): | |
"""Return a list of tuples of paired devices.""" | |
try: | |
out = self.get_output("paired-devices") | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
paired_devices = [] | |
for line in out: | |
device = self.parse_device_info(line) | |
if device: | |
paired_devices.append(device) | |
return paired_devices | |
def get_discoverable_devices(self): | |
"""Filter paired devices out of available.""" | |
available = self.get_available_devices() | |
paired = self.get_paired_devices() | |
return [d for d in available if d not in paired] | |
def get_device_info(self, mac_address): | |
"""Get device info by mac address.""" | |
try: | |
out = self.get_output("info " + mac_address) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
return out | |
def pair(self, mac_address): | |
"""Try to pair with a device by mac address.""" | |
try: | |
out = self.get_output("pair " + mac_address, 4) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success | |
def remove(self, mac_address): | |
"""Remove paired device by mac address, return success of the operation.""" | |
try: | |
out = self.get_output("remove " + mac_address, 3) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["not available", "Device has been removed", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success | |
def connect(self, mac_address): | |
"""Try to connect to a device by mac address.""" | |
try: | |
out = self.get_output("connect " + mac_address, 2) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success | |
def disconnect(self, mac_address): | |
"""Try to disconnect to a device by mac address.""" | |
try: | |
out = self.get_output("disconnect " + mac_address, 2) | |
except BluetoothctlError, e: | |
print(e) | |
return None | |
else: | |
res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF]) | |
success = True if res == 1 else False | |
return success |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment