Created
December 7, 2023 15:51
-
-
Save joshuafried/81df91f5dc49d922744fff95936a29a6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# import requests | |
import json | |
import time | |
from datetime import datetime, timedelta | |
from urllib.parse import quote | |
import jwt | |
import uuid | |
import logging | |
APP_API_ENDPOINT = "https://sznacaamsprod.azure-api.net/consumerapp" | |
API_CRED = "SOME CONSTANT VALUE?" | |
CLIENT_SECRET = "SOME CONSTANT VALUE?" | |
SIGNALR_KEY = "SOME CONSTANT VALUE?" | |
USERNAME = "XX" | |
PASSWORD = "YY" | |
import asyncio | |
import aiohttp | |
import websockets | |
import websockets.exceptions | |
_LOGGER = logging.getLogger(__name__) | |
_LOGGER.setLevel(logging.DEBUG) | |
_LOGGER.addHandler(logging.StreamHandler()) | |
class SubZeroFridge: | |
""" get a non-user-specific token for requests to APP_API_ENDPOINT """ | |
async def get_access_token(self): | |
if self.access_token: | |
if self.at_request_time + self.access_token_info['expires_in'] > time.time() + 30: | |
return self.access_token | |
payload = { | |
"audience": "consumerapi", | |
"client_id": "SubZeroOwnersApp-ios", | |
"client_secret": CLIENT_SECRET, | |
"grant_type": "client_credentials", | |
"scope": "consumerapi" | |
} | |
self.at_request_time = time.time() | |
async with aiohttp.ClientSession() as session: | |
async with session.post("https://oauth.subzero-wolf.com/connect/token", data=payload) as response: | |
self.access_token_info = await response.json() | |
assert response.status == 200 | |
self.access_token = self.access_token_info['access_token'] | |
return self.access_token | |
""" retrieve unique user id """ | |
async def get_userid(self): | |
try: | |
with open(".cachedcreds", "r") as f: | |
dat = json.loads(f.read()) | |
expr = datetime.strptime(dat['TokenExpiration'], "%Y-%m-%d %H:%M:%S") | |
assert datetime.now() + timedelta(hours=24) < expr | |
self.userid_info = dat | |
self.userid = self.userid_info["UniqueUserId"] | |
return | |
except: | |
_LOGGER.info("Making login request") | |
url = "https://www.subzero-wolf.com/api/connectedappliance/AuthenticateUser" | |
data = f'password={quote(PASSWORD)}&username={quote(USERNAME)}' | |
headers = { | |
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', | |
'Authorization': 'Basic ' + API_CRED, # some random password? | |
} | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url, data=data, headers=headers) as response: | |
self.userid_info = await response.json() | |
assert response.status == 200 | |
assert self.userid_info['Success'], json.dumps(self.userid_info) | |
with open(".cachedcreds", "w") as f: | |
f.write(json.dumps(self.userid_info)) | |
self.userid = self.userid_info["UniqueUserId"] | |
""" get device ID from API endpoint """ | |
async def getdevices(self): | |
assert self.userid | |
url = f"{APP_API_ENDPOINT}/user/devices" | |
token = await self.get_access_token() | |
headers = { | |
'Content-Type':'application/json', | |
'Authorization': f'Bearer {token}', | |
'Userid': self.userid, | |
} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, headers=headers) as response: | |
self.devices = await response.json() | |
assert response.status == 200 | |
assert self.devices.get('devices', []) | |
self.device = self.devices['devices'][0]['id'] | |
async def setup(self): | |
assert not self.is_setup | |
await self.get_userid() | |
await self.getdevices() | |
assert self.userid | |
assert self.device | |
self.is_setup = True | |
async def issue_cmd(self, cmd, params=None): | |
assert self.is_setup | |
reqid = str(uuid.uuid4()) | |
pload = { | |
"req_id": reqid, | |
"pload": { | |
"cmd": cmd, | |
} | |
} | |
if params: | |
pload['pload']['params'] = params | |
token = await self.get_access_token() | |
headers = { | |
'Content-Type':'application/json', | |
'Authorization': f'Bearer {token}', | |
'Userid': self.userid, | |
} | |
url = f"{APP_API_ENDPOINT}/device/{self.device}/directmethod/executeAPICmd" | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url, headers=headers, data=json.dumps(pload)) as response: | |
dat = await response.text() | |
assert response.status == 200, dat | |
_LOGGER.debug("sent cmd %s", str(cmd)) | |
def handle_updates(self, updated_props): | |
if not updated_props: return | |
_LOGGER.debug("Updated props:") | |
for k in updated_props: | |
_LOGGER.debug(f"{k}: {self.props[k]}") | |
def _parse_update_message(self, message): | |
message = message.strip() | |
message = json.loads(message) | |
self.last_message_at = time.time() | |
if message['type'] == 6: | |
# heartbeat? | |
return | |
assert message['type'] == 1, 'type' | |
assert message['target'] == "ConnectedApplianceMessage", 'target' | |
message = message['arguments'] | |
assert len(message) == 2 | |
assert message[0].lower() == self.userid.lower() | |
message = json.loads(message[1]) | |
assert message['DeviceId'] == self.device | |
message = json.loads(message['Payload']) | |
assert set(message.keys()) == set(['api.async_channel', 'timezone_id_name']) | |
message = json.loads(message['api.async_channel']) | |
assert message['device_id'] == self.device | |
pload = message['pload'] | |
# full update... | |
updated_props = [] | |
if message['type'] == 1: | |
del pload['notifs'] | |
del pload['time'] | |
del pload['uptime'] | |
for k, v in pload.items(): | |
if k not in self.props or self.props[k] != v: | |
self.props[k] = v | |
updated_props.append(k) | |
elif message['type'] == 2: | |
assert 'props' in pload | |
for k,v in pload['props'].items(): | |
assert k in self.props | |
if self.props[k] != v: | |
self.props[k] = v | |
updated_props.append(k) | |
return updated_props | |
async def run_websocket(self): | |
assert self.userid | |
url = "https://sznacasigprod.service.signalr.net/client/negotiate?hub=connectedappliances" | |
pload = { | |
"nameid": self.userid, | |
"aud": "https://sznacasigprod.service.signalr.net/client/?hub=connectedappliances", | |
"iss": "" | |
} | |
pload['iat'] = time.time() | |
pload['exp'] = pload['iat'] + 3600 | |
encoded_jwt = jwt.encode(pload, SIGNALR_KEY, algorithm="HS256") #.decode("utf-8") | |
headers = { | |
'Authorization': f'Bearer {encoded_jwt}', | |
} | |
dat = None | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url, headers=headers) as response: | |
dat = await response.json() | |
assert response.status == 200, str(dat) | |
wsurl = "wss://sznacasigprod.service.signalr.net/client/?hub=connectedappliances&id=" + dat['connectionId'] | |
async with websockets.connect(wsurl, extra_headers=headers) as ws: | |
await ws.send('{"protocol": "json", "version": 1}\x1e') | |
for i in range(3): | |
dat = await ws.recv() | |
_LOGGER.debug(dat) | |
await self.issue_cmd("open_cloud_async") | |
self.dats = [] | |
connection_end_time = time.time() + (30 * 60) # 30 minutes | |
while time.time() < connection_end_time: | |
dat = await ws.recv() | |
self.dats.append(dat) | |
_LOGGER.debug("recv %s", dat) | |
try: | |
updates = self._parse_update_message(dat) | |
if self.props["is_offline"]: | |
updates.append("is_offline") | |
self.props["is_offline"] = False | |
self.last_update_at = time.time() | |
self.handle_updates(updates) | |
except Exception as e: | |
_LOGGER.error(e) | |
async def _poll_timeout(self): | |
while True: | |
await asyncio.sleep(60) | |
if self.props["is_offline"]: continue | |
if time.time() - self.last_update_at < 60: continue | |
self.props["is_offline"] = True | |
_LOGGER.info("No update since %f, going offline", self.last_update_at) | |
self.handle_updates(["is_offline"]) | |
async def _poll_status_updates(self): | |
while True: | |
try: | |
await self.run_websocket() | |
except asyncio.exceptions.CancelledError as e: | |
_LOGGER.error(e) | |
time.sleep(60) | |
except websockets.exceptions.ConnectionClosedOK as e: | |
_LOGGER.error(e) | |
time.sleep(60) | |
except websockets.exceptions.ConnectionClosedError as e: | |
_LOGGER.error(e) | |
time.sleep(60) | |
def schedule_tasks(self, loop): | |
loop.create_task(self._poll_status_updates()) | |
loop.create_task(self._poll_timeout()) | |
def __init__(self): | |
self.access_token = None | |
self.is_setup = False | |
self.props = {"is_offline": True} | |
pass | |
if __name__ == '__main__': | |
x = SubZeroFridge() | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(x.setup()) | |
x.schedule_tasks(loop) | |
loop.run_forever() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment