-
-
Save system1357/286be2e055724ca3a3aa8a9f94fc4143 to your computer and use it in GitHub Desktop.
mitmproxy contentview for Mi RC4 messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Optional | |
from mitmproxy import contentviews, flow, http, ctx | |
from base64 import b64decode, b64encode | |
import hashlib | |
import json | |
ssecurity = None | |
class RC4: | |
_idx = 0 | |
_jdx = 0 | |
_ksa: list | |
def __init__(self, pwd): | |
self.init_key(pwd) | |
def init_key(self, pwd): | |
cnt = len(pwd) | |
ksa = list(range(256)) | |
j = 0 | |
for i in range(256): | |
j = (j + ksa[i] + pwd[i % cnt]) & 255 | |
ksa[i], ksa[j] = ksa[j], ksa[i] | |
self._ksa = ksa | |
self._idx = 0 | |
self._jdx = 0 | |
return self | |
def crypt(self, data): | |
if isinstance(data, str): | |
data = data.encode() | |
ksa = self._ksa | |
i = self._idx | |
j = self._jdx | |
out = [] | |
for byt in data: | |
i = (i + 1) & 255 | |
j = (j + ksa[i]) & 255 | |
ksa[i], ksa[j] = ksa[j], ksa[i] | |
out.append(byt ^ ksa[(ksa[i] + ksa[j]) & 255]) | |
self._idx = i | |
self._jdx = j | |
self._ksa = ksa | |
return bytearray(out) | |
def init1024(self): | |
self.crypt(bytes(1024)) | |
return self | |
def response(flow: flow.Flow): | |
if "/pass/serviceLogin" in flow.request.path: | |
set_pragma_str = flow.response.headers.get("extension-pragma", "") | |
if "The URL has moved" in flow.response.text: | |
resjs = json.loads(set_pragma_str) | |
global ssecurity | |
ssecurity = resjs["ssecurity"] | |
ctx.log(f"miotrc4 - ssecurity token: {ssecurity}") | |
def request(flow: http.HTTPFlow): | |
# quick hack. needs looking into. | |
if 'miot-accept-encoding' in flow.request.headers: | |
del flow.request.headers['miot-accept-encoding'] | |
class ViewMiOTRC4(contentviews.View): | |
name = "miotrc4" | |
@staticmethod | |
def create_key(ssecurity, nonce): | |
return b64encode(hashlib.sha256(b64decode(ssecurity) + b64decode(nonce)).digest()) | |
@staticmethod | |
def decrypt_data(pwd, data): | |
return RC4(b64decode(pwd)).init1024().crypt(b64decode(data)).decode('utf-8') | |
def __call__( | |
self, | |
data: bytes, | |
*, | |
content_type: Optional[str] = None, | |
flow: Optional[flow.Flow] = None, | |
http_message: Optional[http.Message] = None, | |
**unknown_metadata, | |
) -> contentviews.TViewResult: | |
global ssecurity | |
if ssecurity: | |
if flow.request.host == "sg.api.io.mi.com": | |
nonce = flow.request.urlencoded_form.get("_nonce", None).encode() | |
if nonce: | |
key = self.create_key(ssecurity, nonce) | |
ctx.log(f"{self.name} - Key: {key}, Nonce: {nonce}") | |
if isinstance(http_message, http.Request): | |
data = flow.request.urlencoded_form.get( | |
"data", data | |
).encode() | |
ctx.log(f"{self.name} - [REQ] data: {data}") | |
decoded_data = self.decrypt_data(key, data) | |
ctx.log(f"{self.name} - [REQ] decoded_data: {decoded_data}") | |
form_data = flow.request.urlencoded_form | |
pretty = json.dumps( | |
json.loads(decoded_data), indent=4, sort_keys=True | |
) | |
form_data["data (decrypted)"] = "\n" + pretty | |
return self.name, list( | |
contentviews.base.format_dict(form_data) | |
) | |
elif isinstance(http_message, http.Response): | |
ctx.log(f"{self.name} - [RES] data: {data}") | |
decoded_data = self.decrypt_data(key, data) | |
ctx.log(f"{self.name} - [RES] decoded_data: {decoded_data}") | |
pretty = json.dumps( | |
json.loads(decoded_data), indent=4, sort_keys=True | |
) | |
return self.name, list( | |
contentviews.base.format_text(pretty) | |
) | |
return "Raw", contentviews.format_text(data) | |
def render_priority( | |
self, | |
data: bytes, | |
*, | |
content_type: Optional[str] = None, | |
flow: Optional[flow.Flow] = None, | |
**unknown_metadata, | |
) -> float: | |
global ssecurity | |
if ssecurity: | |
if flow.request.host == "sg.api.io.mi.com": | |
return 1 | |
return 0 | |
view = ViewMiOTRC4() | |
def load(l): | |
contentviews.add(view) | |
def done(): | |
contentviews.remove(view) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment