Skip to content

Instantly share code, notes, and snippets.

@Wh1terat
Last active July 20, 2024 17:21
Show Gist options
  • Save Wh1terat/a9a151d69538abb9d5fe317dee0ffe23 to your computer and use it in GitHub Desktop.
Save Wh1terat/a9a151d69538abb9d5fe317dee0ffe23 to your computer and use it in GitHub Desktop.
mitmproxy contentview for Mi RC4 messages
from typing import Optional
from mitmproxy import contentviews, flow, http, ctx
from base64 import b64decode, b64encode
import hashlib
import json
ssecurity = None
class RC4:
_idx = 0
_jdx = 0
_ksa: list
def __init__(self, pwd):
self.init_key(pwd)
def init_key(self, pwd):
cnt = len(pwd)
ksa = list(range(256))
j = 0
for i in range(256):
j = (j + ksa[i] + pwd[i % cnt]) & 255
ksa[i], ksa[j] = ksa[j], ksa[i]
self._ksa = ksa
self._idx = 0
self._jdx = 0
return self
def crypt(self, data):
if isinstance(data, str):
data = data.encode()
ksa = self._ksa
i = self._idx
j = self._jdx
out = []
for byt in data:
i = (i + 1) & 255
j = (j + ksa[i]) & 255
ksa[i], ksa[j] = ksa[j], ksa[i]
out.append(byt ^ ksa[(ksa[i] + ksa[j]) & 255])
self._idx = i
self._jdx = j
self._ksa = ksa
return bytearray(out)
def init1024(self):
self.crypt(bytes(1024))
return self
def response(flow: flow.Flow):
if flow.request.path == "/pass/serviceLoginAuth2":
if "ssecurity" in flow.response.text:
resjs = json.loads(flow.response.text.replace("&&&START&&&", ""))
global ssecurity
ssecurity = resjs["ssecurity"]
ctx.log(f"miotrc4 - ssecurity token: {ssecurity}")
def request(flow: http.HTTPFlow):
# quick hack. needs looking into.
if 'miot-accept-encoding' in flow.request.headers:
del flow.request.headers['miot-accept-encoding']
class ViewMiOTRC4(contentviews.View):
name = "miotrc4"
@staticmethod
def create_key(ssecurity, nonce):
return b64encode(hashlib.sha256(b64decode(ssecurity) + b64decode(nonce)).digest())
@staticmethod
def decrypt_data(pwd, data):
return RC4(b64decode(pwd)).init1024().crypt(b64decode(data)).decode('utf-8')
def __call__(
self,
data: bytes,
*,
content_type: Optional[str] = None,
flow: Optional[flow.Flow] = None,
http_message: Optional[http.Message] = None,
**unknown_metadata,
) -> contentviews.TViewResult:
global ssecurity
if ssecurity:
if "miot-encrypt-algorithm" in flow.request.headers:
enctype = flow.request.headers["miot-encrypt-algorithm"]
ctx.log(f"{self.name} - miot-encrypt-algorithm: {enctype}")
if enctype == "ENCRYPT-RC4":
nonce = flow.request.urlencoded_form.get("_nonce", None).encode()
if nonce:
key = self.create_key(ssecurity, nonce)
ctx.log(f"{self.name} - Key: {key}, Nonce: {nonce}")
if isinstance(http_message, http.Request):
data = flow.request.urlencoded_form.get(
"data", data
).encode()
ctx.log(f"{self.name} - [REQ] data: {data}")
decoded_data = self.decrypt_data(key, data)
ctx.log(f"{self.name} - [REQ] decoded_data: {decoded_data}")
form_data = flow.request.urlencoded_form
pretty = json.dumps(
json.loads(decoded_data), indent=4, sort_keys=True
)
form_data["data (decrypted)"] = "\n" + pretty
return self.name, list(
contentviews.base.format_dict(form_data)
)
elif isinstance(http_message, http.Response):
ctx.log(f"{self.name} - [RES] data: {data}")
decoded_data = self.decrypt_data(key, data)
ctx.log(f"{self.name} - [RES] decoded_data: {decoded_data}")
pretty = json.dumps(
json.loads(decoded_data), indent=4, sort_keys=True
)
return self.name, list(
contentviews.base.format_text(pretty)
)
else:
ctx.log(f"{self.name} - Unknown miot-encrypt-algorithm ({enctype})")
return "Raw", contentviews.format_text(data)
def render_priority(
self,
data: bytes,
*,
content_type: Optional[str] = None,
flow: Optional[flow.Flow] = None,
**unknown_metadata,
) -> float:
global ssecurity
if ssecurity:
if "miot-encrypt-algorithm" in flow.request.headers:
if flow.request.headers["miot-encrypt-algorithm"] == "ENCRYPT-RC4":
return 1
return 0
view = ViewMiOTRC4()
def load(l):
contentviews.add(view)
def done():
contentviews.remove(view)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment