Last active
January 8, 2024 06:41
-
-
Save sanghviharshit/913d14b225399e0fa4211b3e785671aa to your computer and use it in GitHub Desktop.
Scratch pad for working with Milacares API for monitoring and controlling Mila air purifier. Read my blog post for more details - https://blog.sanghviharshit.com/reverse-engineering-private-api-ssl-pinning/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Scratch pad for working with Milacares API for monitoring and controlling their air purifier devices. | |
# Based on the code from https://www.stefaanlippens.net/oauth-code-flow-pkce.html for PKCE code verifier and challenge. | |
import base64 | |
import hashlib | |
import html | |
import json | |
import os | |
import re | |
import urllib.parse | |
import requests | |
import math | |
import time | |
LOGIN_PROVIDER = "https://id.milacares.com" | |
REDIRECT_URI = "milacares://anyurl.com/" | |
API_BASE = "https://api.milacares.com/mms" | |
CLIENT_ID = "prod-ui" | |
USERNAME = "[email protected]" | |
PASSWORD = "super-secret" | |
access_token = None | |
def get_new_access_token(): | |
''' | |
Gets user access token using OAuth Code flow with PKCE | |
''' | |
# PKCE code verifier and challenge | |
''' | |
We need a code verifier, which is a long enough random alphanumeric string, only to be used "client side". We'll use a simple urandom/base64 trick to generate one: | |
''' | |
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8') | |
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier) | |
code_verifier, len(code_verifier) | |
''' | |
To create the PKCE code challenge we hash the code verifier with SHA256 and encode the result in URL-safe base64 (without padding) | |
''' | |
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() | |
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8') | |
code_challenge = code_challenge.replace('=', '') | |
code_challenge, len(code_challenge) | |
# Create random Nonce and State for login request | |
nonce = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8') | |
state = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8') | |
# Request login page | |
''' | |
We now have all the pieces for the initial request, which will give us the login page of the authentication provider. Adding the code challenge signals to the OAuth provider that we are expecting the PKCE based flow. | |
''' | |
resp = requests.get( | |
url=LOGIN_PROVIDER + "/auth/realms/prod/protocol/openid-connect/auth", | |
params={ | |
"response_type": "code", | |
"client_id": CLIENT_ID, | |
"nonce": nonce, | |
"scope": "openid,profile", | |
"redirect_uri": REDIRECT_URI, | |
"state": state, | |
"code_challenge": code_challenge, | |
"code_challenge_method": "S256", | |
}, | |
allow_redirects=False | |
) | |
print("GET login page response status", resp.status_code) | |
# Parse login page (response) | |
''' | |
Get cookie data from response headers (requires a bit of manipulation). | |
''' | |
cookie = resp.headers['Set-Cookie'] | |
cookie = '; '.join(c.split(';')[0] for c in cookie.split(', ')) | |
# print(cookie) | |
''' | |
Extract the login URL to post to from the page HTML code. Because the the Keycloak login page is straightforward HTML we can get away with some simple regexes. | |
''' | |
page = resp.text | |
form_action = html.unescape(re.search('<form\s+.*?\s+action="(.*?)"', page, re.DOTALL).group(1)) | |
# print(form_action) | |
# Do the login (aka authenticate) | |
''' | |
Now, we post the login form with the user we created earlier, passing it the extracted cookie as well. | |
''' | |
resp = requests.post( | |
url=form_action, | |
data={ | |
"username": USERNAME, | |
"password": PASSWORD, | |
}, | |
headers={"Cookie": cookie}, | |
allow_redirects=False | |
) | |
print("POST login response status", resp.status_code) | |
''' | |
As expected we are forwarded, let's get the redirect URL. | |
''' | |
redirect = resp.headers['Location'] | |
print("Redirect", redirect) | |
assert redirect.startswith(REDIRECT_URI) | |
# Extract authorization code from redirect | |
''' | |
The redirect URL contains the authentication code. | |
''' | |
query = urllib.parse.urlparse(redirect).query | |
redirect_params = urllib.parse.parse_qs(query) | |
# print(redirect_params) | |
auth_code = redirect_params['code'][0] | |
print("Auth Code: ", auth_code) | |
# Exchange authorization code for an access token | |
''' | |
We can now exchange the authorization code for an access token. In the normal OAuth authorization flow we should include a static secret here, but instead we provide the code verifier here which acts proof that the initial request was done by us. | |
''' | |
resp = requests.post( | |
url=LOGIN_PROVIDER + "/auth/realms/prod/protocol/openid-connect/token", | |
data={ | |
"grant_type": "authorization_code", | |
"client_id": CLIENT_ID, | |
"redirect_uri": REDIRECT_URI, | |
"code": auth_code, | |
"code_verifier": code_verifier, | |
}, | |
allow_redirects=False | |
) | |
print("GET Access Token response status", resp.status_code) | |
''' | |
In the response we get, among others, the access token | |
''' | |
result = resp.json() | |
print(result) | |
access_token = result['access_token'] | |
print("Access Token:", access_token) | |
refresh_token = result['refresh_token'] | |
print("Refresh Token:", refresh_token) | |
return access_token | |
def get_device_info(): | |
""" | |
Get Profile and device information | |
""" | |
resp = requests.get( | |
url = API_BASE + "/profile", | |
headers = { | |
"Authorization": 'Bearer {}'.format(access_token) | |
} | |
) | |
result = resp.json() | |
# print("Profile Info:", result) | |
# Appliances Meta | |
resp = requests.get( | |
url = API_BASE + "/appliances/meta", | |
headers = { | |
"Authorization": 'Bearer {}'.format(access_token) | |
} | |
) | |
result = resp.json() | |
device_id_1 = result["data"][0]["id"] | |
appliance_code_1 = result["data"][0]["appliance_code"] | |
print("Device ID:", device_id_1) | |
print("Appliance Code", appliance_code_1) | |
return device_id_1, appliance_code_1 | |
def get_modes(device_id): | |
""" | |
Get the modes for various settings | |
""" | |
resp = requests.get( | |
url = API_BASE + "/appliance/" + str(device_id) + "/config", | |
headers = { | |
"Authorization": 'Bearer {}'.format(access_token) | |
} | |
) | |
result = resp.json() | |
quiet_enabled = result["data"]["quiet_enabled"] | |
night_enabled = result["data"]["night_enabled"] | |
housekeeper_enabled = result["data"]["housekeeper_enabled"] | |
quarantine_enabled = result["data"]["quarantine_enabled"] | |
sleep_enabled = result["data"]["sleep_enabled"] | |
turndown_enabled = result["data"]["turndown_enabled"] | |
whitenoise_enabled = result["data"]["whitenoise_enabled"] | |
sounds_enabled = result["data"]["sounds_enabled"] | |
print(f"quiet_enabled {quiet_enabled}") | |
print(f"night_enabled {night_enabled}") | |
print(f"housekeeper_enabled {housekeeper_enabled}") | |
print(f"quarantine_enabled {quarantine_enabled}") | |
print(f"sleep_enabled {sleep_enabled}") | |
print(f"turndown_enabled {turndown_enabled}") | |
print(f"whitenoise_enabled {whitenoise_enabled}") | |
print(f"sounds_enabled {sounds_enabled}") | |
def get_sensors_data(appliance_code): | |
""" | |
Get all the available sensors' latest value | |
""" | |
SENSOR_TYPES = ["pm_1.0", "pm_2.5", "pm_10", "tvoc", "coppm", "eco2", "temperature", "humidity"] | |
response_data = {} | |
for sensor in SENSOR_TYPES: | |
response_data[sensor] = get_sensor_data(appliance_code=appliance_code, sensor_name=sensor) | |
return response_data | |
def get_sensor_data(appliance_code, sensor_name): | |
""" | |
Get a sensor's latest value | |
""" | |
resp = requests.get( | |
url = API_BASE + f"/sensor/appliance?deviceId={appliance_code}&metric={sensor_name}", | |
headers = { | |
"Authorization": 'Bearer {}'.format(access_token) | |
} | |
) | |
result = resp.json() | |
sensor_value = result["data"]["meta"]["latest_sensor_value"]["value"] | |
print(f"{sensor_name}: {sensor_value}") | |
def set_mode_manual(appliance_code, fan_speed_percentage): | |
""" | |
Set Fan Mode to Manual | |
""" | |
if fan_speed_percentage < 0 or fan_speed_percentage > 100: | |
return | |
# Fan Speed Percentage to RPM mapping: | |
# SPEED_PERCENT_TO_RPM_MAP = { | |
# 0: 0, | |
# 10: 600, | |
# 20: 740, | |
# 30: 880, | |
# 40: 1020, | |
# 50: 1160, | |
# 60: 1300, | |
# 70: 1440, | |
# 80: 1580, | |
# 90: 1720, | |
# 100: 2000, | |
# } | |
SPEED_PERCENT_TO_RPM_MAP = [0, 600, 740, 880, 1020, 1160, 1300, 1440, 1580, 1720, 2000] | |
fan_speed = SPEED_PERCENT_TO_RPM_MAP[math.floor(fan_speed_percentage / 10)] | |
if access_token == None: | |
return | |
target_aqi = "10" | |
print("Fan Speed %:", fan_speed_percentage, "Fan Speed:", fan_speed, "Target AQI:", target_aqi) | |
headers = { | |
"Authorization": 'Bearer {}'.format(access_token), | |
"Content-Type": 'application/json' | |
} | |
payload = { | |
"target_aqi_float": target_aqi, | |
"fan_rpm_int": fan_speed, | |
"enable_display_int": -1 | |
} | |
resp = requests.post( | |
url = API_BASE + "/appliance/" + appliance_code + "/command/control-mode/manual", | |
headers = headers, | |
data = json.dumps(payload) | |
) | |
print("POST command manual response status", resp.status_code) | |
# print("POST command manual response", resp.json()) | |
# It takes a while for the speed to update to the requested fan speed. | |
# The app makes multiple requests to /command/force-data until fan speed matches the requested speed. | |
cur_fan_speed = 5000 | |
retry_count = 0 | |
while abs(cur_fan_speed - fan_speed) >= 30 and retry_count < 5: | |
time.sleep(5) | |
resp = requests.post( | |
url = API_BASE + "/appliance/" + appliance_code + "/command/force-data", | |
headers = { | |
"Authorization": 'Bearer {}'.format(access_token) | |
}, | |
) | |
print(retry_count, "POST force data status", resp.status_code) | |
cur_data = resp.json() | |
print("POST force data response", cur_data) | |
if resp.status_code == 200: | |
cur_fan_speed = cur_data["data"]["speed"] | |
print("Current fan speed", cur_fan_speed) | |
else: | |
print("Error fetching latest data") | |
retry_count += 1 | |
def set_mode_auto(appliance_code): | |
""" | |
Set Fan Mode to Auto | |
""" | |
resp = requests.post( | |
url = API_BASE + "/appliance/" + appliance_code + "/command/control-mode/auto", | |
headers = { | |
"Authorization": 'Bearer {}'.format(access_token) | |
}, | |
data={ | |
"enable_display_int": -1 | |
}, | |
allow_redirects=False | |
) | |
print("POST command auto response status", resp.status_code) | |
# (Unused code) Decode the JWT tokens | |
''' | |
The access and id tokens are JWT tokens apparently. Let's decode the payload. | |
def _b64_decode(data): | |
data += '=' * (4 - len(data) % 4) | |
return base64.b64decode(data).decode('utf-8') | |
def jwt_payload_decode(jwt): | |
_, payload, _ = jwt.split('.') | |
return json.loads(_b64_decode(payload)) | |
print(jwt_payload_decode(access_token)) | |
''' | |
access_token = get_new_access_token() | |
device_id, appliance_code = get_device_info() | |
get_modes(device_id) | |
get_sensors_data(appliance_code) | |
set_mode_manual(appliance_code, 40) | |
set_mode_auto(appliance_code) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment