Created
May 7, 2022 16:13
-
-
Save danisla/237bc96411289948228ffe57440a183c to your computer and use it in GitHub Desktop.
Python Library for interacting with the Nest camera API including authorization flow.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import os | |
import time | |
from datetime import datetime | |
from urllib.parse import quote | |
from urllib import request, parse | |
import pytz | |
NEST_IOS_CLIENT_ID = '733249279899-1gpkq9duqmdp55a7e5lft1pr2smumdla.apps.googleusercontent.com' | |
NEST_API_TIMEOUT_SECONDS = 40 | |
NEST_USER_AGENT_STRING = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36' | |
NEST_TOKEN_URL = 'https://oauth2.googleapis.com/token' | |
# Nest API Endpoints | |
NEST_CAMERA_API_HOSTNAME = 'webapi.camera.home.nest.com' | |
NEST_TIMELAPSE_ENDPOINT = 'https://%s/api/clips.request_time_lapse' % NEST_CAMERA_API_HOSTNAME | |
NEST_LIST_CLIPS_ENDPOINT = 'https://%s/api/clips.get_visible_with_quota' % NEST_CAMERA_API_HOSTNAME | |
NEST_DELETE_CLIP_ENDPOINT = 'https://%s/api/clips.delete' % NEST_CAMERA_API_HOSTNAME | |
def stringify_querystring(data): | |
return '&'.join(["%s=%s" % (k, quote(v)) for k,v in data.items()]) | |
class NestAuth(): | |
def __init__(self, cache_file=None): | |
self.cache_file = cache_file | |
if cache_file is None: | |
self.cache_file = os.path.join(os.environ['HOME'], ".nest_token_cache") | |
self.cache_data = {} | |
if os.path.exists(self.cache_file): | |
with open(self.cache_file, 'rb') as f: | |
try: | |
self.cache_data = json.load(f) | |
except Exception as e: | |
print("WARN: error reading cache file: %s" % str(e)) | |
self.cache_data = {} | |
self.google_access_token = None | |
self.access_token = None | |
# Subclass overrides | |
self.api_hostname = None | |
def login(self): | |
refresh_token = self.cache_data.get('refresh_token', None) | |
if refresh_token: | |
# Use existing token | |
self.get_google_access_token(refresh_token=refresh_token) | |
self.get_nest_jwt() | |
return self.access_token | |
else: | |
# Get new token | |
print("1. Open the url below in a browser to continue:\n\n%s\n" % self.get_nest_auth_code_uri()) | |
code = "" | |
while len(code.strip()) == 0: | |
print("2. Copy the authorization code from the browser, and paste it here: ", end='') | |
code = input() | |
self.get_google_access_token(code=code) | |
self.get_nest_jwt() | |
return self.access_token | |
def get_nest_auth_code_uri(self): | |
data = { | |
"access_type": 'offline', | |
"response_type": 'code', | |
"scope": 'openid profile email https://www.googleapis.com/auth/nest-account', | |
"redirect_uri": 'urn:ietf:wg:oauth:2.0:oob', | |
"client_id": NEST_IOS_CLIENT_ID, | |
} | |
return "https://accounts.google.com/o/oauth2/auth/oauthchooseaccount?%s" % stringify_querystring(data) | |
def get_google_access_token(self, code=None, refresh_token=None): | |
req_google_auth = { | |
'method': 'POST', | |
'timeout': NEST_API_TIMEOUT_SECONDS * 1000, | |
'url': NEST_TOKEN_URL, | |
'headers': { | |
'Content-Type': 'application/x-www-form-urlencoded', | |
'User-Agent': NEST_USER_AGENT_STRING, | |
}, | |
'data': { | |
'code': code, | |
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob', | |
'client_id': NEST_IOS_CLIENT_ID, | |
'grant_type': 'authorization_code', | |
}, | |
} | |
req_refresh_auth = { | |
'method': 'POST', | |
'timeout': NEST_API_TIMEOUT_SECONDS * 1000, | |
'url': NEST_TOKEN_URL, | |
'headers': { | |
'Content-Type': 'application/x-www-form-urlencoded', | |
'User-Agent': NEST_USER_AGENT_STRING, | |
}, | |
'data': { | |
'refresh_token': refresh_token, | |
'client_id': NEST_IOS_CLIENT_ID, | |
'grant_type': 'refresh_token', | |
}, | |
} | |
write_cache = False | |
if code: | |
req = req_google_auth | |
write_cache = True | |
elif refresh_token: | |
req = req_refresh_auth | |
else: | |
raise Exception("bad req type") | |
data = parse.urlencode(req['data']).encode() | |
r = request.Request(req['url'], method=req['method'], headers=req['headers'], data=data) | |
resp = request.urlopen(r, timeout=req['timeout']) | |
token_data = json.load(resp) | |
if write_cache: | |
with open(self.cache_file, 'w', encoding="utf8") as f: | |
json.dump(token_data, f) | |
self.google_access_token = token_data.get("access_token", None) | |
return self.google_access_token | |
def get_nest_jwt(self): | |
assert self.api_hostname | |
req = { | |
'method': 'POST', | |
'timeout': NEST_API_TIMEOUT_SECONDS * 1000, | |
'url': 'https://nestauthproxyservice-pa.googleapis.com/v1/issue_jwt', | |
'data': { | |
'embed_google_oauth_access_token': True, | |
'expire_after': '3600s', | |
'google_oauth_access_token': self.google_access_token, | |
'policy_id': 'authproxy-oauth-policy' | |
}, | |
'headers': { | |
'Authorization': 'Bearer ' + self.google_access_token, | |
'User-Agent': NEST_USER_AGENT_STRING, | |
'Referer': 'https://' + self.api_hostname, | |
} | |
} | |
data = parse.urlencode(req['data']).encode() | |
r = request.Request(req['url'], method=req['method'], headers=req['headers'], data=data) | |
resp = request.urlopen(r, timeout=req['timeout']) | |
jwt_data = json.load(resp) | |
self.access_token = jwt_data['jwt'] | |
def request(self, endpoint, req_data=None, method=None): | |
if not self.access_token: | |
raise Exception("not logged in") | |
if method is None: | |
method = "POST" | |
if req_data is None: | |
method = "GET" | |
req = { | |
'url': endpoint, | |
'timeout': NEST_API_TIMEOUT_SECONDS * 1000, | |
'method': method, | |
'headers': { | |
'Content-Type': 'application/x-www-form-urlencoded', | |
'User-Agent': NEST_USER_AGENT_STRING, | |
'origin': 'https://home.nest.com', | |
'referer': 'https://home.nest.com', | |
'Authorization': "Basic %s" % self.access_token, | |
'cookie': 'G_ENABLED_IDPS=google; eu_cookie_accepted=1; viewer-volume=0.5; cztoken=' + self.access_token, | |
}, | |
'data': req_data, | |
} | |
if req_data: | |
data = parse.urlencode(req['data']).encode() | |
else: | |
data = None | |
r = request.Request(req['url'], method=req['method'], headers=req['headers'], data=data) | |
resp = request.urlopen(r, timeout=req['timeout']) | |
return json.load(resp) | |
class NestCamera(NestAuth): | |
def __init__(self, cache_file=None): | |
super().__init__(cache_file) | |
self.api_hostname = NEST_CAMERA_API_HOSTNAME | |
def __make_timelapse_req_data(self, camera_uuid, title, length, target_length, start_date_utc_seconds, is_public=False, donate_video=False): | |
data = { | |
'uuid': camera_uuid, | |
'title': title, | |
'start_date': "%0.3f" % start_date_utc_seconds, | |
'length': str(length), | |
'target_length': str(target_length), | |
'is_public': str(is_public).lower(), | |
'donate_video': str(donate_video).lower(), | |
} | |
return data | |
def create_timelapse(self, camera_uuid, title, length, target_length, start_date_utc_seconds): | |
endpoint = NEST_TIMELAPSE_ENDPOINT | |
req_data = self.__make_timelapse_req_data(camera_uuid, title, length, target_length, start_date_utc_seconds) | |
return self.request(endpoint, req_data) | |
def list_clips(self): | |
# https://webapi.camera.home.nest.com/api/clips.get_visible_with_quota?_=1650320018004 | |
return self.request(NEST_LIST_CLIPS_ENDPOINT + "?_=" + str(int(time.time()*1000))) | |
def download_clip(self, url, dest): | |
opener = request.build_opener() | |
opener.addheaders = [ | |
('User-agent', NEST_USER_AGENT_STRING), | |
('origin', 'https://home.nest.com'), | |
('referer', 'https://home.nest.com'), | |
('Authorization', "Basic %s" % self.access_token), | |
('cookie', 'G_ENABLED_IDPS=google; eu_cookie_accepted=1; viewer-volume=0.5; cztoken=' + self.access_token), | |
] | |
request.install_opener(opener) | |
request.urlretrieve(url, dest) | |
def delete_clip(self, id): | |
endpoint = NEST_DELETE_CLIP_ENDPOINT | |
req_data = {'id': id} | |
return self.request(endpoint, req_data, method="DELETE") | |
def date_to_utc_seconds(d, tz="US/Pacific"): | |
localtime = pytz.timezone(tz) | |
dstart = localtime.localize(datetime.strptime(d, '%b %d %Y %H')) | |
return dstart.timestamp() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment