Last active
April 15, 2023 07:34
-
-
Save zry98/7142014f2e098ab44ea7337d8c45dded to your computer and use it in GitHub Desktop.
miio message decryptor, thanks to the research by @dgiese (Dennis Giese): https://recon.cx/2018/brussels/resources/slides/RECON-BRX-2018-Reversing-IoT-Xiaomi-ecosystem.pdf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from string import printable | |
from dataclasses import dataclass | |
from datetime import datetime | |
from hashlib import md5 | |
from Crypto.Cipher import AES | |
from Crypto.Util.Padding import unpad | |
# consts | |
MAGIC_NUMBER = b'\x21\x31' # [0:2] | |
LENGTH = b'\x00\x20' # [2:4] | |
DELIMITER = b'\x00\x00\x00\x00' # [4:8] | |
AES_BLOCK_SIZE = 16 | |
# consts in hello message | |
DELIMITER_HELLO = b'\xff\xff\xff\xff' # [4:8] | |
DID_HELLO = b'\xff\xff\xff\xff' # [8:12] | |
EPOCH_HELLO = b'\x00\x00\x00\x00' # [12:16] | |
CHECKSUM_HELLO = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' # [16:32] | |
@dataclass | |
class Message: | |
did: int | |
timestamp: datetime | |
checksum: bytes | |
data: bytes | |
is_hello: bool = False | |
def __str__(self): | |
return f"Message(did={self.did}, timestamp='{self.timestamp}', checksum='{self.checksum.hex()}', data='{self.data.hex()}')" | |
def is_printable(s: str) -> bool: | |
try: | |
return all(c in printable for c in s) | |
except Exception: | |
return False | |
def parse_raw(raw: bytes) -> Message: | |
if len(raw) < 32 or raw[0:2] != MAGIC_NUMBER \ | |
or (raw[4:8] != DELIMITER and raw[4:8] != DELIMITER_HELLO) \ | |
or int.from_bytes(raw[2:4], byteorder='big') != len(raw): | |
raise ValueError('invalid raw message') | |
did = int.from_bytes(raw[8:12], byteorder='big') | |
timestamp = datetime.fromtimestamp(int.from_bytes(raw[12:16], byteorder='big')) | |
checksum = raw[16:32] | |
return Message(did, timestamp, checksum, raw[32:], | |
is_hello=(raw[4:8] == DELIMITER_HELLO and raw[8:12] == DID_HELLO and raw[16:32] == CHECKSUM_HELLO)) | |
def decrypt(raw: bytes, msg: Message, token: bytes) -> bytes: | |
if md5(raw[0:16] + token + msg.data).digest() != msg.checksum: | |
raise ValueError('invalid checksum') | |
if msg.data: | |
key = md5(token).digest() | |
iv = md5(key + token).digest() | |
cipher = AES.new(key, AES.MODE_CBC, iv) | |
try: | |
return unpad(cipher.decrypt(msg.data), AES_BLOCK_SIZE) | |
except ValueError: | |
raise ValueError('invalid token') | |
else: | |
raise ValueError('message data is empty') | |
def main(): | |
token = '3S4LPlO1dpyIvmcc' # '3353344c506c4f3164707949766d6363' in hex | |
# token = input('Token: ') | |
try: | |
int(token, 16) | |
except ValueError: | |
token = token.encode() | |
else: | |
token = bytes.fromhex(token) | |
while True: | |
raw = input('Raw: ') | |
if raw == 'q': # exit | |
break | |
try: | |
message = parse_raw(bytes.fromhex(raw)) | |
except Exception as e: | |
print(f'Error: {e}\n') | |
continue | |
if message.is_hello: | |
print(f'Hello message (timestamp: {message.timestamp})\n') | |
continue | |
try: | |
dec = decrypt(bytes.fromhex(raw), message, token) | |
except Exception as e: | |
print(f'Error: {e}\n') | |
continue | |
data = dec.decode() | |
if dec.endswith(b'\x00') and len(dec) > 1 and dec[-2:-1] != b'\x00': # trim trailing \x00 | |
data = data[:-1] | |
if not is_printable(data): | |
data = f'{data}\nData hex: {dec.hex()}' | |
print(f'Device ID:\t{message.did}\n' | |
f'Timestamp:\t{message.timestamp}\n' | |
f'Data:\t\t{data}\n') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment