Last active
February 14, 2024 18:06
-
-
Save josemarcosrf/e181769c51b99952b77bf0eb8ab9c98e to your computer and use it in GitHub Desktop.
python BLE server & client using pybluez
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
import json | |
import logging | |
import sys | |
import bluetooth as bt | |
import coloredlogs | |
from typing import * | |
logger = logging.getLogger(__name__) | |
coloredlogs.install(logger=logger, level=logging.DEBUG) | |
class BLEServer: | |
def __init__( | |
self, uuid: str, server_name: str, port: int = 1, | |
): | |
self.uuid = uuid | |
self.server_name = server_name | |
# Advertise the server | |
self.port = port | |
self.server_sock = self._advertise() | |
# mock state | |
self.state = [0, 0, 0, 0] | |
def _advertise(self): | |
logger.debug( | |
f"Advertising with: server name: {self.server_name} " | |
f"| port: {self.port} " | |
f"| uuid: {self.uuid}" | |
) | |
server_sock = bt.BluetoothSocket(bt.RFCOMM) | |
server_sock.bind(("", bt.PORT_ANY)) | |
server_sock.listen(self.port) | |
bt.advertise_service( | |
server_sock, | |
self.server_name, | |
service_id=self.uuid, | |
service_classes=[self.uuid, bt.SERIAL_PORT_CLASS], | |
profiles=[bt.SERIAL_PORT_PROFILE], | |
) | |
return server_sock | |
def _accept_connection(self) -> bt.BluetoothSocket: | |
# Accept a connection | |
client_sock, client_info = self.server_sock.accept() | |
logger.info(f"Accepted connection from {client_info}") | |
return client_sock | |
def process_request(self, data: bytes): | |
data = json.loads(data) | |
logger.info(f"Should process: '{data}'") | |
if data.get("cmd") == "/read": | |
return self.state | |
else: | |
self.state[data["channels"][0] - 1] = data["mode"] | |
return json.dumps({"ok": True, "state": self.state, "scheduled": [], }) | |
def run(self): | |
# Wait for an incoming connection | |
_ = self.server_sock.getsockname()[self.port] | |
client_sock = self._accept_connection() | |
while True: | |
try: | |
data = client_sock.recv(1024) | |
ret = self.process_request(data) | |
logger.debug(f"request process returned: {ret}") | |
client_sock.send(json.dumps(ret)) | |
except bt.BluetoothError as be: | |
if be.errno == 104: | |
logger.warning(f"Connection reset by peer...") | |
client_sock.close() | |
# Accept a new connection | |
client_sock = self._accept_connection() | |
else: | |
logger.debug(be.errno) | |
logger.error(f"Something wrong with bluetooth: {be}") | |
return | |
except KeyboardInterrupt: | |
logger.warning("\nDisconnected") | |
return | |
except Exception as e: | |
logger.error(f"BT Server Unknown error: {e}") | |
return | |
@click.command() | |
@click.option("--uuid", default="616d3aa1-689e-4e71-8fed-09f3c7c4ad91") | |
@click.option("--server-name", default="RPI-BT-Server") | |
@click.option("--port", default=1) | |
def main(uuid: str, server_name: str, port: int): | |
try: | |
ble = BLEServer(uuid, server_name, port) | |
ble.run() | |
except bt.btcommon.BluetoothError as be: | |
logger.error(f"Bluetooth Error: {be}") | |
logger.error( | |
"Try running like: " | |
"'sudo hciconfig hci0 piscan && sudo .venv/bin/python ble_server.py'" | |
) | |
except Exception as e: | |
logger.error(f"Unknown error: {e}") | |
if __name__ == "__main__": | |
""" | |
For an example for RFCOMM server from pybluez: | |
# https://github.com/pybluez/pybluez/blob/master/examples/simple/rfcomm-server.py | |
""" | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
import json | |
import logging | |
import sys | |
import bluetooth | |
import coloredlogs | |
logger = logging.getLogger(__name__) | |
coloredlogs.install(logger=logger, level=logging.DEBUG) | |
@click.command() | |
@click.option("--uuid", default="616d3aa1-689e-4e71-8fed-09f3c7c4ad91") | |
@click.option("--addr") | |
def main(uuid: str, addr: str = None): | |
if addr: | |
logger.info(f"Searching for BT Server with address={addr}...") | |
else: | |
logger.info("Searching all nearby bluetooth devices for the BT Server") | |
# search for the BT Server service | |
service_matches = bluetooth.find_service(uuid=uuid, address=addr) | |
if len(service_matches) == 0: | |
logger.info("Couldn't find the SampleServer service.") | |
sys.exit(0) | |
first_match = service_matches[0] | |
port = first_match["port"] | |
name = first_match["name"] | |
host = first_match["host"] | |
logger.info(f"Connecting to {name} @ {host}:{port}") | |
# Create the client socket | |
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) | |
sock.connect((host, port)) | |
logger.info("Connected!") | |
while True: | |
channels = input("Input channels\t").strip() | |
channels = list(map(int, channels.split(" "))) | |
mode = input("Input ON (1) / OFF (0)\t") | |
sock.send( | |
json.dumps({"cmd": "/switch", "channels": channels, "mode": int(mode)}) | |
) | |
res = sock.recv(1024) | |
print(res) | |
print("-" * 40) | |
sock.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment