Skip to content

Instantly share code, notes, and snippets.

@joshuafried
Created December 7, 2023 15:51
Show Gist options
  • Save joshuafried/81df91f5dc49d922744fff95936a29a6 to your computer and use it in GitHub Desktop.
Save joshuafried/81df91f5dc49d922744fff95936a29a6 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# import requests
import json
import time
from datetime import datetime, timedelta
from urllib.parse import quote
import jwt
import uuid
import logging
APP_API_ENDPOINT = "https://sznacaamsprod.azure-api.net/consumerapp"
API_CRED = "SOME CONSTANT VALUE?"
CLIENT_SECRET = "SOME CONSTANT VALUE?"
SIGNALR_KEY = "SOME CONSTANT VALUE?"
USERNAME = "XX"
PASSWORD = "YY"
import asyncio
import aiohttp
import websockets
import websockets.exceptions
_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.DEBUG)
_LOGGER.addHandler(logging.StreamHandler())
class SubZeroFridge:
""" get a non-user-specific token for requests to APP_API_ENDPOINT """
async def get_access_token(self):
if self.access_token:
if self.at_request_time + self.access_token_info['expires_in'] > time.time() + 30:
return self.access_token
payload = {
"audience": "consumerapi",
"client_id": "SubZeroOwnersApp-ios",
"client_secret": CLIENT_SECRET,
"grant_type": "client_credentials",
"scope": "consumerapi"
}
self.at_request_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.post("https://oauth.subzero-wolf.com/connect/token", data=payload) as response:
self.access_token_info = await response.json()
assert response.status == 200
self.access_token = self.access_token_info['access_token']
return self.access_token
""" retrieve unique user id """
async def get_userid(self):
try:
with open(".cachedcreds", "r") as f:
dat = json.loads(f.read())
expr = datetime.strptime(dat['TokenExpiration'], "%Y-%m-%d %H:%M:%S")
assert datetime.now() + timedelta(hours=24) < expr
self.userid_info = dat
self.userid = self.userid_info["UniqueUserId"]
return
except:
_LOGGER.info("Making login request")
url = "https://www.subzero-wolf.com/api/connectedappliance/AuthenticateUser"
data = f'password={quote(PASSWORD)}&username={quote(USERNAME)}'
headers = {
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
'Authorization': 'Basic ' + API_CRED, # some random password?
}
async with aiohttp.ClientSession() as session:
async with session.post(url, data=data, headers=headers) as response:
self.userid_info = await response.json()
assert response.status == 200
assert self.userid_info['Success'], json.dumps(self.userid_info)
with open(".cachedcreds", "w") as f:
f.write(json.dumps(self.userid_info))
self.userid = self.userid_info["UniqueUserId"]
""" get device ID from API endpoint """
async def getdevices(self):
assert self.userid
url = f"{APP_API_ENDPOINT}/user/devices"
token = await self.get_access_token()
headers = {
'Content-Type':'application/json',
'Authorization': f'Bearer {token}',
'Userid': self.userid,
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
self.devices = await response.json()
assert response.status == 200
assert self.devices.get('devices', [])
self.device = self.devices['devices'][0]['id']
async def setup(self):
assert not self.is_setup
await self.get_userid()
await self.getdevices()
assert self.userid
assert self.device
self.is_setup = True
async def issue_cmd(self, cmd, params=None):
assert self.is_setup
reqid = str(uuid.uuid4())
pload = {
"req_id": reqid,
"pload": {
"cmd": cmd,
}
}
if params:
pload['pload']['params'] = params
token = await self.get_access_token()
headers = {
'Content-Type':'application/json',
'Authorization': f'Bearer {token}',
'Userid': self.userid,
}
url = f"{APP_API_ENDPOINT}/device/{self.device}/directmethod/executeAPICmd"
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=json.dumps(pload)) as response:
dat = await response.text()
assert response.status == 200, dat
_LOGGER.debug("sent cmd %s", str(cmd))
def handle_updates(self, updated_props):
if not updated_props: return
_LOGGER.debug("Updated props:")
for k in updated_props:
_LOGGER.debug(f"{k}: {self.props[k]}")
def _parse_update_message(self, message):
message = message.strip()
message = json.loads(message)
self.last_message_at = time.time()
if message['type'] == 6:
# heartbeat?
return
assert message['type'] == 1, 'type'
assert message['target'] == "ConnectedApplianceMessage", 'target'
message = message['arguments']
assert len(message) == 2
assert message[0].lower() == self.userid.lower()
message = json.loads(message[1])
assert message['DeviceId'] == self.device
message = json.loads(message['Payload'])
assert set(message.keys()) == set(['api.async_channel', 'timezone_id_name'])
message = json.loads(message['api.async_channel'])
assert message['device_id'] == self.device
pload = message['pload']
# full update...
updated_props = []
if message['type'] == 1:
del pload['notifs']
del pload['time']
del pload['uptime']
for k, v in pload.items():
if k not in self.props or self.props[k] != v:
self.props[k] = v
updated_props.append(k)
elif message['type'] == 2:
assert 'props' in pload
for k,v in pload['props'].items():
assert k in self.props
if self.props[k] != v:
self.props[k] = v
updated_props.append(k)
return updated_props
async def run_websocket(self):
assert self.userid
url = "https://sznacasigprod.service.signalr.net/client/negotiate?hub=connectedappliances"
pload = {
"nameid": self.userid,
"aud": "https://sznacasigprod.service.signalr.net/client/?hub=connectedappliances",
"iss": ""
}
pload['iat'] = time.time()
pload['exp'] = pload['iat'] + 3600
encoded_jwt = jwt.encode(pload, SIGNALR_KEY, algorithm="HS256") #.decode("utf-8")
headers = {
'Authorization': f'Bearer {encoded_jwt}',
}
dat = None
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers) as response:
dat = await response.json()
assert response.status == 200, str(dat)
wsurl = "wss://sznacasigprod.service.signalr.net/client/?hub=connectedappliances&id=" + dat['connectionId']
async with websockets.connect(wsurl, extra_headers=headers) as ws:
await ws.send('{"protocol": "json", "version": 1}\x1e')
for i in range(3):
dat = await ws.recv()
_LOGGER.debug(dat)
await self.issue_cmd("open_cloud_async")
self.dats = []
connection_end_time = time.time() + (30 * 60) # 30 minutes
while time.time() < connection_end_time:
dat = await ws.recv()
self.dats.append(dat)
_LOGGER.debug("recv %s", dat)
try:
updates = self._parse_update_message(dat)
if self.props["is_offline"]:
updates.append("is_offline")
self.props["is_offline"] = False
self.last_update_at = time.time()
self.handle_updates(updates)
except Exception as e:
_LOGGER.error(e)
async def _poll_timeout(self):
while True:
await asyncio.sleep(60)
if self.props["is_offline"]: continue
if time.time() - self.last_update_at < 60: continue
self.props["is_offline"] = True
_LOGGER.info("No update since %f, going offline", self.last_update_at)
self.handle_updates(["is_offline"])
async def _poll_status_updates(self):
while True:
try:
await self.run_websocket()
except asyncio.exceptions.CancelledError as e:
_LOGGER.error(e)
time.sleep(60)
except websockets.exceptions.ConnectionClosedOK as e:
_LOGGER.error(e)
time.sleep(60)
except websockets.exceptions.ConnectionClosedError as e:
_LOGGER.error(e)
time.sleep(60)
def schedule_tasks(self, loop):
loop.create_task(self._poll_status_updates())
loop.create_task(self._poll_timeout())
def __init__(self):
self.access_token = None
self.is_setup = False
self.props = {"is_offline": True}
pass
if __name__ == '__main__':
x = SubZeroFridge()
loop = asyncio.get_event_loop()
loop.run_until_complete(x.setup())
x.schedule_tasks(loop)
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment