Last active
July 20, 2024 17:21
-
-
Save Wh1terat/a9a151d69538abb9d5fe317dee0ffe23 to your computer and use it in GitHub Desktop.
mitmproxy contentview for Mi RC4 messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Optional | |
from mitmproxy import contentviews, flow, http, ctx | |
from base64 import b64decode, b64encode | |
import hashlib | |
import json | |
ssecurity = None | |
class RC4: | |
_idx = 0 | |
_jdx = 0 | |
_ksa: list | |
def __init__(self, pwd): | |
self.init_key(pwd) | |
def init_key(self, pwd): | |
cnt = len(pwd) | |
ksa = list(range(256)) | |
j = 0 | |
for i in range(256): | |
j = (j + ksa[i] + pwd[i % cnt]) & 255 | |
ksa[i], ksa[j] = ksa[j], ksa[i] | |
self._ksa = ksa | |
self._idx = 0 | |
self._jdx = 0 | |
return self | |
def crypt(self, data): | |
if isinstance(data, str): | |
data = data.encode() | |
ksa = self._ksa | |
i = self._idx | |
j = self._jdx | |
out = [] | |
for byt in data: | |
i = (i + 1) & 255 | |
j = (j + ksa[i]) & 255 | |
ksa[i], ksa[j] = ksa[j], ksa[i] | |
out.append(byt ^ ksa[(ksa[i] + ksa[j]) & 255]) | |
self._idx = i | |
self._jdx = j | |
self._ksa = ksa | |
return bytearray(out) | |
def init1024(self): | |
self.crypt(bytes(1024)) | |
return self | |
def response(flow: flow.Flow): | |
if flow.request.path == "/pass/serviceLoginAuth2": | |
if "ssecurity" in flow.response.text: | |
resjs = json.loads(flow.response.text.replace("&&&START&&&", "")) | |
global ssecurity | |
ssecurity = resjs["ssecurity"] | |
ctx.log(f"miotrc4 - ssecurity token: {ssecurity}") | |
def request(flow: http.HTTPFlow): | |
# quick hack. needs looking into. | |
if 'miot-accept-encoding' in flow.request.headers: | |
del flow.request.headers['miot-accept-encoding'] | |
class ViewMiOTRC4(contentviews.View): | |
name = "miotrc4" | |
@staticmethod | |
def create_key(ssecurity, nonce): | |
return b64encode(hashlib.sha256(b64decode(ssecurity) + b64decode(nonce)).digest()) | |
@staticmethod | |
def decrypt_data(pwd, data): | |
return RC4(b64decode(pwd)).init1024().crypt(b64decode(data)).decode('utf-8') | |
def __call__( | |
self, | |
data: bytes, | |
*, | |
content_type: Optional[str] = None, | |
flow: Optional[flow.Flow] = None, | |
http_message: Optional[http.Message] = None, | |
**unknown_metadata, | |
) -> contentviews.TViewResult: | |
global ssecurity | |
if ssecurity: | |
if "miot-encrypt-algorithm" in flow.request.headers: | |
enctype = flow.request.headers["miot-encrypt-algorithm"] | |
ctx.log(f"{self.name} - miot-encrypt-algorithm: {enctype}") | |
if enctype == "ENCRYPT-RC4": | |
nonce = flow.request.urlencoded_form.get("_nonce", None).encode() | |
if nonce: | |
key = self.create_key(ssecurity, nonce) | |
ctx.log(f"{self.name} - Key: {key}, Nonce: {nonce}") | |
if isinstance(http_message, http.Request): | |
data = flow.request.urlencoded_form.get( | |
"data", data | |
).encode() | |
ctx.log(f"{self.name} - [REQ] data: {data}") | |
decoded_data = self.decrypt_data(key, data) | |
ctx.log(f"{self.name} - [REQ] decoded_data: {decoded_data}") | |
form_data = flow.request.urlencoded_form | |
pretty = json.dumps( | |
json.loads(decoded_data), indent=4, sort_keys=True | |
) | |
form_data["data (decrypted)"] = "\n" + pretty | |
return self.name, list( | |
contentviews.base.format_dict(form_data) | |
) | |
elif isinstance(http_message, http.Response): | |
ctx.log(f"{self.name} - [RES] data: {data}") | |
decoded_data = self.decrypt_data(key, data) | |
ctx.log(f"{self.name} - [RES] decoded_data: {decoded_data}") | |
pretty = json.dumps( | |
json.loads(decoded_data), indent=4, sort_keys=True | |
) | |
return self.name, list( | |
contentviews.base.format_text(pretty) | |
) | |
else: | |
ctx.log(f"{self.name} - Unknown miot-encrypt-algorithm ({enctype})") | |
return "Raw", contentviews.format_text(data) | |
def render_priority( | |
self, | |
data: bytes, | |
*, | |
content_type: Optional[str] = None, | |
flow: Optional[flow.Flow] = None, | |
**unknown_metadata, | |
) -> float: | |
global ssecurity | |
if ssecurity: | |
if "miot-encrypt-algorithm" in flow.request.headers: | |
if flow.request.headers["miot-encrypt-algorithm"] == "ENCRYPT-RC4": | |
return 1 | |
return 0 | |
view = ViewMiOTRC4() | |
def load(l): | |
contentviews.add(view) | |
def done(): | |
contentviews.remove(view) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment