Last active
April 8, 2024 18:51
-
-
Save v6ak/b7445f9a7fe8f92c623de616ec34021c to your computer and use it in GitHub Desktop.
Quick hack for QuietDrift in Home Assistant; Alternatively, you can use addon, which also supports BLE proxies: https://github.com/v6ak/v6-quietdrift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Define the command | |
"rest_command": | |
"set_switchbot_curtain_position": | |
"url": "{{ (\"http://localhost:85/curtains/\" + mac) + \"/set-position\" }}" | |
"content_type": "application/x-www-form-urlencoded" | |
"method": "POST" | |
"payload": "{{ ((\"position=\" + (position | string)) + \"&speed=\") + (speed\ | |
\ | string) }}" | |
# Run the command | |
- "service": "rest_command.set_switchbot_curtain_position" | |
"data": | |
"mac": "YO:UR:MA:CA:DD:RE" | |
"speed": 1 | |
"position": "{{ states(\"sensor.curtain_cobain_expected_position\") |\ | |
\ int }}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
psb: | |
build: psb | |
ports: | |
- 127.0.0.1:85:8080 | |
volumes: | |
- /opt/bluesand/dbus-socket/:/run/dbus:ro | |
# I use BlueSand (https://github.com/v6ak/bluesand/ ); Adjust the dbus socket to your needs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3 | |
RUN pip install PySwitchbot quart | |
RUN mkdir -p /opt/psbs | |
COPY server.py /opt/psbs | |
WORKDIR /opt/psbs | |
ENTRYPOINT hypercorn -b 0.0.0.0:8080 --log-level INFO --access-logfile - --error-logfile - server:app |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from logging.config import dictConfig | |
import asyncio | |
from switchbot import get_device | |
from switchbot.devices import curtain | |
from quart import Quart | |
from quart import request | |
from bleak_retry_connector import BleakNotFoundError, BleakConnectionError | |
from bleak.exc import BleakDBusError | |
dictConfig({ | |
'version': 1, | |
'disable_existing_loggers': False, | |
'loggers': { | |
'quart.app': { 'level': 'INFO', 'handlers': [ 'console' ] }, | |
'quart.serving': { 'level': 'INFO', 'handlers': [ 'console' ] }, | |
'': { 'level': 'INFO', 'handlers': [ 'console' ] }, | |
}, | |
'handlers': { | |
'console': { | |
'level': 'INFO', | |
'class': 'logging.StreamHandler', | |
'stream': 'ext://sys.stderr', | |
}, | |
}, | |
}) | |
cached_curtains=dict() | |
app = Quart(__name__) | |
async def get_curtain(mac): | |
if mac not in cached_curtains: | |
device = await get_device(mac) | |
if device is None: | |
return None | |
cached_curtains[mac] = curtain.SwitchbotCurtain(device) | |
return cached_curtains[mac] | |
bt_mac_re = re.compile("""^([0-9A-F]{2}:){5}([0-9A-F]{2})$""") | |
bt_lock = asyncio.Lock() | |
async def work_with_curtain(mac, callback, expected): | |
# This retries when the curtain disconnects or so | |
async def attempt(): | |
async with bt_lock: # This should hopefully ensure fairness | |
curtain = await get_curtain(mac) | |
if curtain is None: | |
return None | |
return await callback(curtain) | |
async def invalidate_and_retry(): | |
try: | |
del cached_curtains[mac] | |
except KeyError: | |
pass | |
return await attempt() | |
try: | |
res = await attempt() | |
except (BleakNotFoundError, BleakDBusError, BleakConnectionError) as e: # catching BleakDBusError is probably useless | |
app.logger.info("curtain seems to have disconnected (Exception {%s}): {%s}", e, mac) | |
res = await invalidate_and_retry() | |
if res != expected: | |
app.logger.info("curtain seems to have disconnected (success=False): %s", mac) | |
res = await invalidate_and_retry() | |
return res | |
@app.route('/curtains/<mac>/set-position', methods=['POST']) | |
async def curtains_set_position(mac): | |
if not bt_mac_re.match(mac): | |
return {'success': False, 'error': 'Bad MAC'}, 400 | |
form = await request.form | |
speed = int(form.get('speed') or 255) | |
position = int(form.get('position')) | |
app.logger.info("set position: %s %s %s", mac, speed, position) | |
# TODO: continue when request is cancelled | |
res = await work_with_curtain(mac, lambda curtain: curtain.set_position(position=position, speed=speed), expected=True) | |
app.logger.info("set position result (%s %s %s): %s", mac, speed, position, res) | |
if res is None: | |
return {"success": False, "error": "Not found"}, 404 | |
else: | |
return {"success": res}, (200 if res else 500) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment