-
-
Save tosunkaya/f2143dad19490c297be867d84b9cf00b to your computer and use it in GitHub Desktop.
Tp-Link Tapo C200 video relaying test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env pyton3 | |
import uuid | |
import requests | |
import ssl | |
import socket | |
import http.client | |
import json | |
import sys | |
# Tp-Link mail | |
cloudmail = '--redacted--' | |
# Tp-Link password | |
cloudpass = '--redacted--' | |
# User-assigned camera name | |
cloudcamera = 'Entrance' | |
# 'VGA' or 'HD' for my Tapo C200s | |
resolution = 'HD' | |
# Hardcoded in application | |
accountserver = 'https://n-wap-gw.tplinkcloud.com' | |
#certs = 'tplinkcloud-com.pem' | |
certs = False | |
clientuuid = str(uuid.uuid4()) | |
loginbody = { | |
'method': 'login', | |
'params': { | |
'appType': 'Tapo_Android', | |
'cloudUserName': cloudmail, | |
'cloudPassword': cloudpass, | |
'terminalUUID': clientuuid | |
} | |
} | |
loginreply = requests.post(accountserver, json=loginbody, verify=certs).json() | |
if loginreply['error_code'] != 0: | |
print('Login error', file=sys.stderr) | |
print(loginreply, file=sys.stderr) | |
sys.exit(1) | |
token = loginreply['result']['token'] | |
devices = [] | |
totaldevs = 9001 | |
while len(devices) < totaldevs: | |
devlistparams = { | |
'token': token | |
} | |
devlistbody = { | |
'method': 'getDeviceListByPage', | |
'params': { | |
'deviceTypeList': [ | |
'SMART.IPCAMERA' | |
], | |
'index': len(devices), | |
'limit': 20 | |
} | |
} | |
devlistreply = requests.post(accountserver, json=devlistbody, params=devlistparams, verify=certs).json() | |
if devlistreply['error_code'] != 0: | |
print('Listing error', file=sys.stderr) | |
print(devlistreply, file=sys.stderr) | |
sys.exit(1) | |
totaldevs = devlistreply['result']['totalNum'] | |
devices.extend(devlistreply['result']['deviceList']) | |
camerainfo = [x for x in devices if x['alias'] == cloudcamera] | |
if len(camerainfo) != 1: | |
print('Cannot find camera "%s"' % cloudcamera, file=sys.stderr) | |
print('Linked devices:', file=sys.stderr) | |
print(devices, file=sys.stderr) | |
sys.exit(1) | |
camerainfo = camerainfo[0] | |
# This is the logic in the app, no kidding | |
relayserver = 'use1-relay-dcipc.i.tplinknbu.com' | |
if 'aps' in camerainfo['appServerUrl']: | |
relayserver = 'aps1-relay-dcipc.i.tplinknbu.com' | |
elif 'euw' in camerainfo['appServerUrl']: | |
relayserver = 'euw1-relay-dcipc.i.tplinknbu.com' | |
ptparams = { | |
'token': token | |
} | |
ptbody = { | |
'method': 'passthrough', | |
'params': { | |
'deviceId': camerainfo['deviceId'], | |
'requestData': { | |
'method':'do', | |
'relay': { | |
'request_relay': { | |
'token': token, | |
'version': '1.3', | |
'stream_type': 0, | |
'protocol': 0, | |
'relay_server': relayserver, | |
'relay_port': 80, | |
'relays_port': 443, | |
'relay_req_url': '/relayservice?deviceid=' + camerainfo['deviceId'] + '&type=video&resolution=' + resolution, | |
'local_req_url': '/stream' | |
} | |
} | |
} | |
} | |
} | |
ptreply = requests.post(camerainfo['appServerUrl'], params=ptparams, json=ptbody, verify=certs).json() | |
context = ssl.create_default_context() | |
context.check_hostname = False | |
context.verify_mode = ssl.CERT_NONE | |
with open('body.bin', 'wb') as f: | |
with socket.create_connection((relayserver, 443)) as raw_sock: | |
with context.wrap_socket(raw_sock, server_hostname=relayserver) as ssl_sock: | |
ssl_file = ssl_sock.makefile('rwb') | |
# Write relay header | |
relayrequestheaders = ( | |
'POST /relayservice?deviceid=' + camerainfo['deviceId'] + '&type=video&resolution=' + resolution + ' HTTP/1.1\r\n' + | |
'User-Agent: Client=TP-Link_Tapo_Android Android 2.2.36/1.3\r\n' + | |
'Keep-Relay: 3600\r\n' + | |
'Accept: */*\r\n' + | |
'Host: ' + relayserver + ':443\r\n' + | |
'Content-Type: multipart/mixed;boundary=--client-stream-boundary--\r\n' + | |
'Content-Length: 9223372036854775807\r\n' + | |
'X-token: ' + token + '\r\n' + | |
'X-Client-Model: SM-A202F\r\n' + | |
'X-Client-UUID: ' + clientuuid + '\r\n' + | |
'X-Client-SessionID: ' + ptreply['result']['responseData']['result']['sid'] + '\r\n' + | |
'X-Redirect-Times: 0\r\n' + | |
'Cookie: ' + ptreply['result']['responseData']['result']['elb_cookie'] + '\r\n' + | |
'\r\n' | |
) | |
relayrequestheaders = relayrequestheaders.encode('ascii') | |
ssl_file.write(relayrequestheaders) | |
# Write boundary marker | |
ssl_file.write(b'----client-stream-boundary--\r\n') | |
# Write stream header | |
streamrequestbody = { | |
"type": "request", | |
"seq": 1, | |
"params": { | |
"method": "get", | |
"preview": { | |
"channels": [0], | |
"resolutions": [resolution], | |
"audio": ["default"] | |
} | |
} | |
} | |
streamrequestbody = json.dumps(streamrequestbody).encode('ascii') | |
streamrequestheader = ( | |
'Content-Type: application/json\r\n' + | |
'Content-Length: ' + str(len(streamrequestbody)) + '\r\n' + | |
'\r\n' | |
) | |
streamrequestheader = streamrequestheader.encode('ascii') | |
ssl_file.write(streamrequestheader) | |
ssl_file.write(streamrequestbody) | |
# Flush as the virtual file is buffered | |
ssl_file.flush() | |
# Read relay header | |
relaystatus = ssl_file.readline().decode('ascii').rstrip('\r\n') | |
if relaystatus != 'HTTP/1.1 200 OK': | |
print('Unexpected relay status: ' + relaystatus, file=sys.stderr) | |
sys.exit(1) | |
relayreplyheaders = http.client.parse_headers(ssl_file) | |
while True: | |
boundaryline = ssl_file.readline().decode('ascii') | |
if len(boundaryline) == 0: | |
print('Reached EOL', file=sys.stderr) | |
break | |
if boundaryline != '--' + relayreplyheaders.get_boundary() + '\r\n': | |
print('Unexpected boundary: %s' % boundaryline, file=sys.stderr) | |
sys.exit(1) | |
chunkheaders = http.client.parse_headers(ssl_file) | |
chunktype = chunkheaders.get_content_type() | |
chunklength = int(chunkheaders.get('content-length')) | |
if not chunktype.startswith('video/'): | |
print('Ignoring ' + chunktype, file=sys.stderr) | |
ssl_file.read(chunklength) | |
else: | |
sys.stdout.buffer.write(ssl_file.read(chunklength)) | |
boundaryline = ssl_file.readline().decode('ascii') | |
if boundaryline != '\r\n': | |
print('End of chunk new line missing', file=sys.stderr) | |
print(boundaryline, file=sys.stderr) | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment