Created
December 28, 2020 11:23
-
-
Save socram8888/739f9add392f58f72b9ab21800daeba0 to your computer and use it in GitHub Desktop.
Tp-Link Tapo C200 video relaying test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env pyton3 | |
import uuid | |
import requests | |
import ssl | |
import socket | |
import http.client | |
import json | |
import sys | |
# Tp-Link mail | |
cloudmail = '--redacted--' | |
# Tp-Link password | |
cloudpass = '--redacted--' | |
# User-assigned camera name | |
cloudcamera = 'Entrance' | |
# 'VGA' or 'HD' for my Tapo C200s | |
resolution = 'HD' | |
# Hardcoded in application | |
accountserver = 'https://n-wap-gw.tplinkcloud.com' | |
#certs = 'tplinkcloud-com.pem' | |
certs = False | |
clientuuid = str(uuid.uuid4()) | |
loginbody = { | |
'method': 'login', | |
'params': { | |
'appType': 'Tapo_Android', | |
'cloudUserName': cloudmail, | |
'cloudPassword': cloudpass, | |
'terminalUUID': clientuuid | |
} | |
} | |
loginreply = requests.post(accountserver, json=loginbody, verify=certs).json() | |
if loginreply['error_code'] != 0: | |
print('Login error', file=sys.stderr) | |
print(loginreply, file=sys.stderr) | |
sys.exit(1) | |
token = loginreply['result']['token'] | |
devices = [] | |
totaldevs = 9001 | |
while len(devices) < totaldevs: | |
devlistparams = { | |
'token': token | |
} | |
devlistbody = { | |
'method': 'getDeviceListByPage', | |
'params': { | |
'deviceTypeList': [ | |
'SMART.IPCAMERA' | |
], | |
'index': len(devices), | |
'limit': 20 | |
} | |
} | |
devlistreply = requests.post(accountserver, json=devlistbody, params=devlistparams, verify=certs).json() | |
if devlistreply['error_code'] != 0: | |
print('Listing error', file=sys.stderr) | |
print(devlistreply, file=sys.stderr) | |
sys.exit(1) | |
totaldevs = devlistreply['result']['totalNum'] | |
devices.extend(devlistreply['result']['deviceList']) | |
camerainfo = [x for x in devices if x['alias'] == cloudcamera] | |
if len(camerainfo) != 1: | |
print('Cannot find camera "%s"' % cloudcamera, file=sys.stderr) | |
print('Linked devices:', file=sys.stderr) | |
print(devices, file=sys.stderr) | |
sys.exit(1) | |
camerainfo = camerainfo[0] | |
# This is the logic in the app, no kidding | |
relayserver = 'use1-relay-dcipc.i.tplinknbu.com' | |
if 'aps' in camerainfo['appServerUrl']: | |
relayserver = 'aps1-relay-dcipc.i.tplinknbu.com' | |
elif 'euw' in camerainfo['appServerUrl']: | |
relayserver = 'euw1-relay-dcipc.i.tplinknbu.com' | |
ptparams = { | |
'token': token | |
} | |
ptbody = { | |
'method': 'passthrough', | |
'params': { | |
'deviceId': camerainfo['deviceId'], | |
'requestData': { | |
'method':'do', | |
'relay': { | |
'request_relay': { | |
'token': token, | |
'version': '1.3', | |
'stream_type': 0, | |
'protocol': 0, | |
'relay_server': relayserver, | |
'relay_port': 80, | |
'relays_port': 443, | |
'relay_req_url': '/relayservice?deviceid=' + camerainfo['deviceId'] + '&type=video&resolution=' + resolution, | |
'local_req_url': '/stream' | |
} | |
} | |
} | |
} | |
} | |
ptreply = requests.post(camerainfo['appServerUrl'], params=ptparams, json=ptbody, verify=certs).json() | |
context = ssl.create_default_context() | |
context.check_hostname = False | |
context.verify_mode = ssl.CERT_NONE | |
with open('body.bin', 'wb') as f: | |
with socket.create_connection((relayserver, 443)) as raw_sock: | |
with context.wrap_socket(raw_sock, server_hostname=relayserver) as ssl_sock: | |
ssl_file = ssl_sock.makefile('rwb') | |
# Write relay header | |
relayrequestheaders = ( | |
'POST /relayservice?deviceid=' + camerainfo['deviceId'] + '&type=video&resolution=' + resolution + ' HTTP/1.1\r\n' + | |
'User-Agent: Client=TP-Link_Tapo_Android Android 2.2.36/1.3\r\n' + | |
'Keep-Relay: 3600\r\n' + | |
'Accept: */*\r\n' + | |
'Host: ' + relayserver + ':443\r\n' + | |
'Content-Type: multipart/mixed;boundary=--client-stream-boundary--\r\n' + | |
'Content-Length: 9223372036854775807\r\n' + | |
'X-token: ' + token + '\r\n' + | |
'X-Client-Model: SM-A202F\r\n' + | |
'X-Client-UUID: ' + clientuuid + '\r\n' + | |
'X-Client-SessionID: ' + ptreply['result']['responseData']['result']['sid'] + '\r\n' + | |
'X-Redirect-Times: 0\r\n' + | |
'Cookie: ' + ptreply['result']['responseData']['result']['elb_cookie'] + '\r\n' + | |
'\r\n' | |
) | |
relayrequestheaders = relayrequestheaders.encode('ascii') | |
ssl_file.write(relayrequestheaders) | |
# Write boundary marker | |
ssl_file.write(b'----client-stream-boundary--\r\n') | |
# Write stream header | |
streamrequestbody = { | |
"type": "request", | |
"seq": 1, | |
"params": { | |
"method": "get", | |
"preview": { | |
"channels": [0], | |
"resolutions": [resolution], | |
"audio": ["default"] | |
} | |
} | |
} | |
streamrequestbody = json.dumps(streamrequestbody).encode('ascii') | |
streamrequestheader = ( | |
'Content-Type: application/json\r\n' + | |
'Content-Length: ' + str(len(streamrequestbody)) + '\r\n' + | |
'\r\n' | |
) | |
streamrequestheader = streamrequestheader.encode('ascii') | |
ssl_file.write(streamrequestheader) | |
ssl_file.write(streamrequestbody) | |
# Flush as the virtual file is buffered | |
ssl_file.flush() | |
# Read relay header | |
relaystatus = ssl_file.readline().decode('ascii').rstrip('\r\n') | |
if relaystatus != 'HTTP/1.1 200 OK': | |
print('Unexpected relay status: ' + relaystatus, file=sys.stderr) | |
sys.exit(1) | |
relayreplyheaders = http.client.parse_headers(ssl_file) | |
while True: | |
boundaryline = ssl_file.readline().decode('ascii') | |
if len(boundaryline) == 0: | |
print('Reached EOL', file=sys.stderr) | |
break | |
if boundaryline != '--' + relayreplyheaders.get_boundary() + '\r\n': | |
print('Unexpected boundary: %s' % boundaryline, file=sys.stderr) | |
sys.exit(1) | |
chunkheaders = http.client.parse_headers(ssl_file) | |
chunktype = chunkheaders.get_content_type() | |
chunklength = int(chunkheaders.get('content-length')) | |
if not chunktype.startswith('video/'): | |
print('Ignoring ' + chunktype, file=sys.stderr) | |
ssl_file.read(chunklength) | |
else: | |
sys.stdout.buffer.write(ssl_file.read(chunklength)) | |
boundaryline = ssl_file.readline().decode('ascii') | |
if boundaryline != '\r\n': | |
print('End of chunk new line missing', file=sys.stderr) | |
print(boundaryline, file=sys.stderr) | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment