-
-
Save bahorn/9bebbbf37c2167f7057aea0244ff2d92 to your computer and use it in GitHub Desktop.
import requests | |
import hashlib | |
import time | |
import uuid | |
import os | |
import copy | |
import json | |
# This is based on my personal implementation but stripped down to only what is | |
# needed to verify it. | |
# This should correct anywhere I was unclear in my original post. | |
# I developed against a Mobile API key, which is why I used the mobile methods | |
# in my test cases. | |
# Signing works the same with the Cloud API. You get a "NO_PERMISSION" if your | |
# keys aren't valid for the request, but if you sign wrong, you'll get a error | |
# related to signing instead, which is checked earlier on their end. | |
# The signing process is also leaked in the mobile apps (which are all | |
# rebrandings, no joke) in the Android logs. | |
## For mobile app keys: | |
## Be careful about which host you use. If you are registered in the US, use the | |
## US host. | |
host = "https://a1.tuyaeu.com/api.json" | |
# Random and not really checked. Should keep persistent if you are using | |
# sessions. | |
device_id = os.urandom(32).encode('hex') | |
sid = "" # Used to mobile requests that require login. | |
# Needs to be set, but they don't care what it is. | |
os = ("TEST", "0.1.2", "TEST") | |
# Your API keys. | |
appKey = "<BLANKED>" | |
appSecret = "<BLANKED>" | |
# This is likely a cause of confusion. They decided for this and only this to | |
# rearrange the hash. Not their normal md5-mid which they use a lot in their | |
# MQTT client. | |
def post_data_hash_transform(post_data): | |
h = hashlib.md5() | |
h.update(post_data) | |
pre_hash = h.hexdigest() | |
return pre_hash[8:16] + pre_hash[0:8] + pre_hash[24:32] + pre_hash[16:24] | |
# This is the implementation of "sign". | |
def generate_request_sign(pairs): | |
# This are the values that get "signed" in a request, worth checking if I | |
# missed one in this list. | |
values_to_hash = ["a", "v", "lat", "lon", "lang", "deviceId", "imei", | |
"imsi", "appVersion", "ttid", "isH5", "h5Token", "os", | |
"clientId", "postData", "time", "n4h5", "sid", "sp"] | |
out = [] | |
sorted_pairs = sorted(pairs) | |
for item in sorted_pairs: | |
if item not in values_to_hash: | |
continue | |
if pairs[item] == "": | |
continue | |
if item == "postData": | |
out += [item + "=" + post_data_hash_transform(pairs["postData"])] | |
else: | |
out += [item + "=" + str(pairs[item])] | |
sign_request = "||".join(out) + "||" + appSecret | |
h = hashlib.md5() | |
h.update(sign_request) | |
return h.hexdigest() | |
# This will give you a dict with all the request parameters. | |
def url_generator(action, version, post_data=None, sid=None, time_param=None): | |
client_id = appKey | |
ttid = "sdk_google1@{}".format(appKey) | |
timezone_id = "America/New_York" | |
lang = "en" | |
if not time_param: | |
time_param = int(time.time()) | |
request_id = uuid.uuid4() | |
pairs = { | |
"sdkVersion": "1.11.11", | |
"platform": os[2], | |
"ttid": ttid, | |
"a": action, | |
"timeZoneId": timezone_id, | |
"deviceId": device_id, | |
"osSystem": os[1], | |
"os": os[0], | |
"v": version, | |
"appVersion": "2.9.1", | |
"clientId": client_id, | |
"lang": lang, | |
"requestId": request_id, | |
"time": time_param, | |
"appRnVersion": "2.9", | |
} | |
if sid: | |
pairs['sid'] = sid | |
to_sign = copy.deepcopy(pairs) | |
if post_data: | |
to_sign['postData'] = post_data | |
pairs['sign'] = generate_request_sign(to_sign) | |
return pairs | |
# Call the endpoint. | |
# * action is the name as defined in the tuya docs | |
# * version is the version they say to provide, normally "1.0" | |
# * data is the JSON data you want to pass to the action | |
# * requires_sid means it adds a session_id to the parameters. Used in mobile endpoints. | |
def preform_action(action, version, data=None, requires_sid=False): | |
if requires_sid is True and not sid: | |
return None | |
params = url_generator(action, version, data, sid) | |
print params | |
endpoint = host | |
headers = { | |
# Maybe set a user agent or something here. | |
} | |
if data: | |
r = requests.post(endpoint, params=params, data={"postData":data}, | |
headers=headers) | |
else: | |
r = requests.post(endpoint, params=params, headers=headers) | |
return r.status_code, r.json(), r.headers | |
if __name__ == "__main__": | |
# Should return a list of countries, doesn't use post data. | |
print preform_action("tuya.m.country.list", "1.0") | |
# Attempt to login with a fake email, takes POST data. | |
attempt = { | |
"email":"fake_email_attempt@fake_email.com", | |
"countryCode":1 | |
} | |
print preform_action("tuya.m.user.email.token.create", "1.0", | |
json.dumps(attempt)) |
Thankyou so much for post_data_hash_transform (rearanging md5 info)!
Apart from that I can see Tuya uses different signing and encryption algorithms for different aps or api versions. I my case (thermostats) i spent hours figuring this:
- Instead of md5 to sign the request I had to use HMAC-SHA256:
sign_request = "||".join(out) #no appsecret here
hmac_key = "A_"+tuya_bmpkey+"_"+tuya_appsecret # here you have to use secret2 (encoded in the image file) and standard secret
signature = hmac.new(key=hmac_key.encode('utf-8'),msg=feed.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
encrypted postData as base64 still has to be put to above sign_request using peculiar "post_data_hash_transform"
- the postData field has to be encrypted with AES in MODE_GMC with 12 bytes of random nonce as prefix and 16 bytes of validation MAC as suffix. The key is derived from request_id using HMAC-SHA256 with key obtained by contatenation of various secret values:
def encryptPostData(postData, requestId):
#create key from requestid and ecode. ecode is created together with session id upon login, as far as i can see it is valid undefinietly,
#so it's easier to sniff it than to request it
keyparts = "A_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode # secret1, secret2 and ecode used here
#generate key from request_id and secrets
keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
shortKey = keyHex[0:16].encode('utf-8') #yes! you use only the first 16 characters of hexadecimal form as AES key
postDataStr = json.dumps(postData)
nonce = os.urandom(12)
plainBytes = postDataStr.encode('utf-8')
encryptedPostData, mac = AES.new(shortKey, AES.MODE_GCM, nonce).encrypt_and_digest(plainBytes)
encryptedPostDataWithNonce = nonce+encryptedPostData+mac
encryptedPostDataBase64 = base64.b64encode(encryptedPostDataWithNonce).decode("utf-8")
return encryptedPostDataBase64
- The same method is used to decrypt the response. In json response, the "result" field is encrypted:
def decryptResult(result, requestId):
#create key from requestid and ecode
keyparts = "A_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
#generate key from request_id and ecode
keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
shortKey = keyHex[0:16].encode('utf-8')
encryptedBytes = base64.b64decode(result)
nonce = encryptedBytes[0:12]
encryptedPayload = encryptedBytes[12:]
decrypted = AES.new(shortKey, AES.MODE_GCM, nonce).decrypt(encryptedPayload[:-16]) #drop last 16 bytes, it's MAC signature
return decrypted.decode("utf-8")
hope it'll help someone!
hey @panjanek , also here, do you have a full script avaible with new md5 ? Also that appsecret2 , do you have that for me? Seems its taken from the app itself? Smartlife or Tuya ?
thnx!!
hey @bahorn or @panjanek , is it possible to update the script to use the keys below, to use smartlife or app, the appsecret2 / certsign are now needed, would be great to have a python vesion... i know use this javascript version, but want a python version of it...
https://github.com/TuyaAPI/cloud/blob/master/index.js
For Tuya:
{
"key": "3fjrekuxank9eaej3gcx",
"secret": "aq7xvqcyqcnegvew793pqjmhv77rneqc",
"secret2": "vay9g59g9g99qf3rtqptmc3emhkanwkx",
"certSign": "93:21:9F:C2:73:E2:20:0F:4A:DE:E5:F7:19:1D:C6:56:BA:2A:2D:7B:2F:F5:D2:4C:D5:5C:4B:61:55:00:1E:40"
}
For Smartlife:
{
"key": "ekmnwp9f5pnh3trdtpgy",
"secret": "r3me7ghmxjevrvnpemwmhw3fxtacphyg",
"secret2": "jfg5rs5kkmrj5mxahugvucrsvw43t48x",
"certSign": "0F:C3:61:99:9C:C0:C3:5B:A8:AC:A5:7D:AA:55:93:A2:0C:F5:57:27:70:2E:A8:5A:D7:B3:22:89:49:F8:88:FE"
}
@pergolafabio It should work
Here is my tool for tuyasmart app (I don't like to post secrets, seems illicit):
import requests
import hashlib
import time
import uuid
import os
import copy
import json
import urllib3
import logging
import hmac
import base64
import string
import sys
from Crypto.Cipher import AES
tuya_appid = "***"
tuya_appsecret = "***"
tuya_bmpkey = "***"
tuya_endpoint = "https://a1.tuyaeu.com/api.json"
tuya_sid = "***"
tuya_ecode = "***"
tuya_gid = "***"
tuya_certsign = "***"
urllib3.disable_warnings()
def addTuyaSignature(params):
if not "sign" in params:
values_to_hash = ["a", "v", "lat", "lon", "et", "lang", "deviceId", "imei",
"imsi", "appVersion", "ttid", "isH5", "h5Token", "os",
"clientId", "postData", "time", "n4h5", "sid", "sp", "requestId"]
sorted_params = sorted(params)
out = []
for key in sorted_params:
if key not in values_to_hash:
continue
if params[key] == "":
continue
value = str(params[key])
if key == "postData":
h = hashlib.md5()
h.update(value.encode('utf-8'))
value=h.hexdigest()
value = value[8:16] + value[0:8] + value[24:32] + value[16:24]
out += [key + "=" + value]
feed = "||".join(out)
hmac_key = tuya_certsign + "_"+tuya_bmpkey+"_"+tuya_appsecret
signature = hmac.new(key=hmac_key.encode('utf-8'),msg=feed.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
params["sign"] = signature
return params
def decryptResult(result, requestId):
#create key from requestid and ecode
keyparts = tuya_certsign + "_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
#generate key from request_id and ecode
keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
shortKey = keyHex[0:16].encode('utf-8')
encryptedBytes = base64.b64decode(result)
nonce = encryptedBytes[0:12]
encryptedPayload = encryptedBytes[12:]
decrypted = AES.new(shortKey, AES.MODE_GCM, nonce).decrypt(encryptedPayload[:-16])
return decrypted.decode("utf-8")
def encryptPostData(postData, requestId):
#create key from requestid and ecode
keyparts = tuya_certsign + "_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
#generate key from request_id and ecode
keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
shortKey = keyHex[0:16].encode('utf-8')
postDataStr = json.dumps(postData)
nonce = os.urandom(12)
plainBytes = postDataStr.encode('utf-8')
encryptedPostData, mac = AES.new(shortKey, AES.MODE_GCM, nonce).encrypt_and_digest(plainBytes)
encryptedPostDataWithNonce = nonce+encryptedPostData+mac
encryptedPostDataBase64 = base64.b64encode(encryptedPostDataWithNonce).decode("utf-8")
return encryptedPostDataBase64
def callTuyaApi(action, postData):
time_param = int(time.time())
request_id = str(uuid.uuid4())
params = {
"a":action,
"appRnVersion":"5.44",
"appVersion":"3.33.5",
"channel":"oem",
"clientId":tuya_appid,
"deviceCoreVersion":"3.29.5",
"deviceId":"f1cf817055401e82b60fa5f74d8779e64133a59215b7",
"et":"3",
"lang":"pl_PL",
"os":"Android",
"osSystem":"7.1.1",
"platform":"ONEPLUS A5000",
"requestId":request_id,
"sdkVersion":"3.29.5",
"sid":tuya_sid,
"time": str(time_param),
"timeZoneId":"Europe/Warsaw",
"ttid":"sdk_tuya_international",
"v":"1.0" ,
"gid": tuya_gid
}
#print(postData)
params["postData"] = encryptPostData(postData, request_id)
params = addTuyaSignature(params)
headers = { 'User-Agent' : 'Android/com.google.android.gms/203615023 (OnePlus5 NMF26X)' }
#print(params)
response = requests.post(tuya_endpoint, params=params, headers=headers, verify=False)
encryptedResult = response.json()["result"]
encryptedBytes = base64.b64decode(encryptedResult)
jsonStr = decryptResult(encryptedResult, request_id)
return json.loads(jsonStr)
def getDeviceDetails(devId):
response = callTuyaApi("tuya.m.device.get", { "devId" :devId})
#print(json.dumps(response, indent=4, sort_keys=True,ensure_ascii=False))
dps = response["result"]["dps"]
result = {}
result["id"] = response["result"]["devId"]
result["online"] = response["result"]["isOnline"]
result["active"] = response["result"]["isActive"]
result["localKey"] = response["result"]["localKey"]
result["name"] = response["result"]["name"]
for key in dps:
result["dps_"+key] = dps[key]
if "1" in dps and isinstance(dps["1"], bool):
result["1"] = dps["1"]
if "2" in dps and isinstance(dps["2"], bool):
result["2"] = dps["2"]
if "3" in dps and isinstance(dps["3"], bool):
result["3"] = dps["3"]
if "7" in dps and dps["7"]!=0:
result["usb"] = dps["7"]
if "20" in dps:
result["voltage"] = float(dps["20"]) / 10.0
if "18" in dps:
result["current"] = float(dps["18"]) / 1000.0
if "19" in dps:
result["power"] = float(dps["19"]) / 10.0
return result
if __name__ == "__main__":
if len(sys.argv) <= 1:
#print("usage: python tuyasmart.py <device-id> [1|2|3|usb] [on|off]")
#print("devices:")
devices = callTuyaApi("tuya.m.device.ext.prop.list", {})
all = []
for dev in devices["result"]:
devId = dev["devId"]
device = getDeviceDetails(devId)
all.append(device)
print(json.dumps(all, indent=4, sort_keys=True,ensure_ascii=False))
elif len(sys.argv) == 2:
devId = sys.argv[1]
device = getDeviceDetails(devId)
print(json.dumps(device, indent=4, sort_keys=True,ensure_ascii=False))
else:
devId = sys.argv[1]
socket = sys.argv[2]
if len(sys.argv) == 3:
device = getDeviceDetails(devId)
s = "1" if device[socket] else "0"
print(s)
else:
state = sys.argv[3]
if state == "on" or state == "off":
if socket == "usb":
socket = "7"
postData = {}
postData["devId"] = devId
postData["dps"] = {}
postData["dps"][str(socket)] = state == "on"
response = callTuyaApi("tuya.m.device.dp.publish", postData)
print(json.dumps(response, indent=4, sort_keys=True,ensure_ascii=False))
elif state == "set":
value = sys.argv[4]
if value == "true":
value = True
elif value == "false":
value = False
elif value.isnumeric():
value = int(value)
postData = {}
postData["devId"] = devId
postData["dps"] = {}
postData["dps"][str(socket)] = value
response = callTuyaApi("tuya.m.device.dp.publish", postData)
print(json.dumps(response, indent=4, sort_keys=True,ensure_ascii=False))
Here is the tool for thermostats:
import requests
import hashlib
import time
import uuid
import os
import copy
import json
import urllib3
import logging
import hmac
import base64
import string
import sys
from Crypto.Cipher import AES
tuya_appid = "***"
tuya_appsecret = "***"
tuya_bmpkey = "***"
tuya_endpoint = "https://a1.tuyaeu.com/api.json"
tuya_sid = "***"
tuya_ecode = "***"
tuya_gid = "***"
#"1" - on/off, "2" - target temp * 2, "3" - current temp * 2, "102" - heater temp *2
#/bin/sh
#apk add gcc
#sudo pip istall pycrypto
urllib3.disable_warnings()
def addTuyaSignature(params):
if not "sign" in params:
values_to_hash = ["a", "v", "lat", "lon", "et", "lang", "deviceId", "imei",
"imsi", "appVersion", "ttid", "isH5", "h5Token", "os",
"clientId", "postData", "time", "n4h5", "sid", "sp", "requestId"]
sorted_params = sorted(params)
out = []
for key in sorted_params:
if key not in values_to_hash:
continue
if params[key] == "":
continue
value = str(params[key])
if key == "postData":
h = hashlib.md5()
h.update(value.encode('utf-8'))
value=h.hexdigest()
value = value[8:16] + value[0:8] + value[24:32] + value[16:24]
out += [key + "=" + value]
feed = "||".join(out)
hmac_key = "A_"+tuya_bmpkey+"_"+tuya_appsecret
signature = hmac.new(key=hmac_key.encode('utf-8'),msg=feed.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
params["sign"] = signature
return params
def decryptResult(result, requestId):
#create key from requestid and ecode
keyparts = "A_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
#generate key from request_id and ecode
keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
shortKey = keyHex[0:16].encode('utf-8')
encryptedBytes = base64.b64decode(result)
nonce = encryptedBytes[0:12]
encryptedPayload = encryptedBytes[12:]
decrypted = AES.new(shortKey, AES.MODE_GCM, nonce).decrypt(encryptedPayload[:-16])
return decrypted.decode("utf-8")
def encryptPostData(postData, requestId):
#create key from requestid and ecode
keyparts = "A_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
#generate key from request_id and ecode
keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()
shortKey = keyHex[0:16].encode('utf-8')
postDataStr = json.dumps(postData)
nonce = os.urandom(12)
plainBytes = postDataStr.encode('utf-8')
#encryptedPostData = AES.new(shortKey, AES.MODE_GCM, nonce).encrypt(plainBytes)
encryptedPostData, mac = AES.new(shortKey, AES.MODE_GCM, nonce).encrypt_and_digest(plainBytes)
encryptedPostDataWithNonce = nonce+encryptedPostData+mac
encryptedPostDataBase64 = base64.b64encode(encryptedPostDataWithNonce).decode("utf-8")
return encryptedPostDataBase64
def callTuyaApi(action, postData):
time_param = int(time.time())
request_id = str(uuid.uuid4())
params = {
"a":action,
"appRnVersion":"5.44",
"appVersion":"2.0.3",
"channel":"oem",
"clientId":tuya_appid,
"deviceCoreVersion":"3.29.5",
"deviceId":"0cbe6a9f082de4beb1e184c84bce097cef0dde8312bc",
"et":"3",
"lang":"pl_PL",
"os":"Android",
"osSystem":"7.1.1",
"platform":"ONEPLUS A5000",
"requestId":request_id,
"sdkVersion":"3.29.5",
"sid":tuya_sid,
"time": str(time_param),
"timeZoneId":"Europe/Warsaw",
"ttid":"sdk_tuya_international@"+tuya_appid,
"v":"1.0",
"gid": tuya_gid
}
params["postData"] = encryptPostData(postData, request_id)
params = addTuyaSignature(params)
headers = { 'User-Agent' : 'Android/com.google.android.gms/203615023 (OnePlus5 NMF26X)' }
response = requests.post(tuya_endpoint, params=params, headers=headers, verify=False)
encryptedResult = response.json()["result"]
encryptedBytes = base64.b64decode(encryptedResult)
jsonStr = decryptResult(encryptedResult, request_id)
return json.loads(jsonStr)
def getDeviceDetails(devId):
device = callTuyaApi("tuya.m.device.get", { "devId" :devId})
result = {}
result["id"] = devId
result["name"] = device["result"]["name"]
result["online"] = device["result"]["isOnline"]
result["switch"] = device["result"]["dps"]["1"]
result["target"] = float(device["result"]["dps"]["2"]) / 2.0
result["current"] = float(device["result"]["dps"]["3"]) / 2.0
result["heater"] = float(device["result"]["dps"]["102"]) / 2.0
return result
if len(sys.argv) <= 1:
#list all
devices = callTuyaApi("tuya.m.my.group.device.sort.list", {})
all = []
for dev in devices["result"]:
devId = dev["bizId"]
device = getDeviceDetails(devId)
all.append(device)
print(json.dumps(all, indent=4, sort_keys=True,ensure_ascii=False))
elif len(sys.argv) == 2:
devId = sys.argv[1]
device = getDeviceDetails(devId)
print(json.dumps(device, indent=4, sort_keys=True,ensure_ascii=False))
else:
devId = sys.argv[1]
result=None
if sys.argv[2]=="on":
result = callTuyaApi("tuya.m.device.dp.publish", { "devId":devId, "dps":{"1":True}})
elif sys.argv[2]=="off":
result = callTuyaApi("tuya.m.device.dp.publish", { "devId":devId, "dps":{"1":False}})
else:
target_temp = float(sys.argv[2])
result = callTuyaApi("tuya.m.device.dp.publish", { "devId":devId, "dps":{"2":int(target_temp*2)}})
time.sleep(1)
print(json.dumps(getDeviceDetails(devId), indent=4, sort_keys=True,ensure_ascii=False))
aha, thats great, thnx for sharing!!
@panjanek thanks for sharing your findings. You saved me a lot of time and frustration :) thanks!
Just removed a reference to self that I accidentally left in after stripping preform_action from the class I took it from. Would have caused issues when using a session ID.