|
#!/usr/bin/env python3 |
|
# Unofficial CLI command for mornin'+ (mornin' plus) smart curtain. |
|
# https://gist.github.com/mugifly/dc371aec69c41623d31e8157cefc31a2 |
|
|
|
import sys |
|
from bluepy.btle import (Peripheral, ADDR_TYPE_RANDOM) |
|
from Crypto.Cipher import AES |
|
from time import sleep |
|
|
|
|
|
APP_SERVICE_UUID = '79ab0000-9dfa-4ae2-bd46-ac69d9fdd743' |
|
APP_SERVICE_STATUS_UUID = '79ab0001-9dfa-4ae2-bd46-ac69d9fdd743' |
|
CONTROL_SERVICE_UUID = '79ab1000-9dfa-4ae2-bd46-ac69d9fdd743' |
|
CONTROL_SERVICE_CONTROL_UUID = '79ab1001-9dfa-4ae2-bd46-ac69d9fdd743' |
|
CONTROL_SERVICE_CONTROL_OPEN_VALUE = b'\x00\x00' |
|
CONTROL_SERVICE_CONTROL_CLOSE_VALUE = b'\x00\x01' |
|
CONTROL_SERVICE_CONTROL_STOP_VALUE = b'\x00\x02' |
|
CONTROL_SERVICE_CONTROL_TIMEOUT_SEC = 5 |
|
|
|
|
|
def auth_device(p: Peripheral, encrypt_key: str) -> str: |
|
|
|
app_service = p.getServiceByUUID(APP_SERVICE_UUID) |
|
|
|
# Generate the seed |
|
print('Receiving status... ', end = '') |
|
app_service_status_characteristic = app_service.getCharacteristics(APP_SERVICE_STATUS_UUID)[0] |
|
app_service_status = app_service_status_characteristic.read() |
|
print(app_service_status.hex()) |
|
|
|
print('Generating seed... ',) |
|
seeds = get_seeds_by_app_service_status(app_service_status) |
|
|
|
# Generate the main token from the seed |
|
print('Generating main token... ', end = '') |
|
main_token = get_main_token_by_seeds(seeds) |
|
print(main_token) |
|
|
|
# Generate the encrypted main token |
|
print('Generating encrypted main token... ', end = '') |
|
enc_main_token = get_encrypted_main_token_by_main_token_and_key(main_token, encrypt_key) |
|
print(enc_main_token) |
|
|
|
# Try to authentication |
|
print('Trying to authenticate... ') |
|
auth_value = '02' + enc_main_token |
|
app_service_status_characteristic.write(bytes.fromhex(auth_value), True) |
|
print('') |
|
|
|
|
|
def control_device(p: Peripheral, mode: str) -> None: |
|
|
|
control_service = p.getServiceByUUID(CONTROL_SERVICE_UUID) |
|
control_service_control_characteristic = control_service.getCharacteristics(CONTROL_SERVICE_CONTROL_UUID)[0] |
|
|
|
if mode == 'open': |
|
print('Moving to open the curtain...') |
|
control_service_control_characteristic.write(CONTROL_SERVICE_CONTROL_OPEN_VALUE, True) |
|
elif mode == 'close': |
|
print('Moving to close the curtain...') |
|
control_service_control_characteristic.write(CONTROL_SERVICE_CONTROL_CLOSE_VALUE, True) |
|
else: |
|
print('Stopping the curtain...') |
|
control_service_control_characteristic.write(CONTROL_SERVICE_CONTROL_STOP_VALUE, True) |
|
|
|
sleep(CONTROL_SERVICE_CONTROL_TIMEOUT_SEC) |
|
print('') |
|
|
|
|
|
def get_seeds_by_app_service_status(app_service_status: list) -> list: |
|
|
|
seeds = [ |
|
app_service_status[11], |
|
app_service_status[12], |
|
app_service_status[13], |
|
app_service_status[14] |
|
] |
|
|
|
return seeds |
|
|
|
|
|
def get_main_token_by_seeds(seeds: list) -> str: |
|
|
|
BYTE_LENGTH = 1 |
|
BYTE_ENDIAN = 'little' |
|
main_token = [92, 101, 44, 182, 81, 212, 239, 235, 137, 90, 188, 111] |
|
main_token.extend(seeds) |
|
main_token = list(map(lambda x : x.to_bytes(BYTE_LENGTH, BYTE_ENDIAN), main_token)) |
|
|
|
main_token_hex = bytearray(b''.join(main_token)).hex() |
|
return main_token_hex |
|
|
|
|
|
def get_encrypted_main_token_by_main_token_and_key(main_token_hex: str, encrypt_key_hex: str) -> str: |
|
|
|
main_token = bytes.fromhex(main_token_hex) |
|
encrypt_key = bytes.fromhex(encrypt_key_hex) |
|
|
|
enc_main_token_hex = AES.new(encrypt_key, AES.MODE_ECB).encrypt(main_token).hex() |
|
|
|
if (len(enc_main_token_hex) != 32): |
|
raise Exception('encrypted main token has generated but seems invalid... value = ' + enc_main_token_hex + ', length = ' + str(len(enc_main_token_hex))) |
|
|
|
return enc_main_token_hex |
|
|
|
|
|
def print_help() -> None: |
|
|
|
print('Usage: ' + sys.argv[0] + ' MAC_ADDRESS ENCRYPT_KEY MODE\n\ |
|
\n\ |
|
Parameters:\n\ |
|
\n\ |
|
MAC_ADDRESS MAC address of device.\n\ |
|
\n\ |
|
ENCRYPT_KEY Enrypt Key string of device.\n\ |
|
\n\ |
|
MODE \'open\', \'close\', \'stop\'\n\ |
|
\n\ |
|
') |
|
|
|
|
|
def main() -> None: |
|
|
|
# Check the arguments |
|
if len(sys.argv) != 4: |
|
print_help() |
|
quit(1) |
|
|
|
address = sys.argv[1] |
|
encrypt_key = sys.argv[2] |
|
mode = sys.argv[3] |
|
|
|
if (len(address) != 17): |
|
print('Error: The specified MAC_ADDRESS length is incorrect.\n') |
|
print_help() |
|
quit(1) |
|
|
|
if (len(encrypt_key) != 32): |
|
print('Error: The specified ENCRYPT_KEY length is incorrect.\n') |
|
print_help() |
|
quit(1) |
|
|
|
if mode != 'open' and mode != 'close' and mode != 'stop': |
|
print('Error: The specified MODE is incorrect.\n') |
|
print_help() |
|
quit(1) |
|
|
|
# Connect to the device |
|
print('Connecting... ' + address) |
|
peripheral = Peripheral(address, ADDR_TYPE_RANDOM) |
|
print('') |
|
|
|
# Try to authenticate to the device |
|
auth_device(peripheral, encrypt_key) |
|
|
|
# Control the device |
|
control_device(peripheral, mode) |
|
|
|
# Done |
|
peripheral.disconnect() |
|
|
|
|
|
main() |
If you want to open / close the curtain with using voice via Google Home (Google Assistant) or Alexa, I recommend you to use Home Assistant (Hass.io) and homeassistant-mornin.