Last active
March 20, 2025 02:58
-
-
Save Wh1terat/a9a151d69538abb9d5fe317dee0ffe23 to your computer and use it in GitHub Desktop.
mitmproxy contentview for Mi RC4 messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Optional, List | |
from mitmproxy import contentviews, flow, http, ctx | |
from mitmproxy.addonmanager import Loader | |
from Crypto.Cipher import ARC4 | |
from base64 import b64decode | |
import hashlib | |
import json | |
class MiOTRC4Viewer(contentviews.View): | |
"""Content view for MiOT RC4 encrypted messages""" | |
name = "miotrc4" | |
@staticmethod | |
def _generate_key(ssecurity: str, nonce: str) -> bytes: | |
return hashlib.sha256(b64decode(ssecurity) + b64decode(nonce)).digest() | |
def __call__( | |
self, | |
data: bytes, | |
*, | |
flow: Optional[flow.Flow] = None, | |
http_message: Optional[http.Message] = None, | |
**unknown_metadata | |
) -> contentviews.TViewResult: | |
if self.addon.ssecurity and isinstance(flow, http.HTTPFlow) and flow.request.headers.get("miot-encrypt-algorithm") == "ENCRYPT-RC4": | |
nonce_str = flow.request.urlencoded_form.get("_nonce", "") | |
if nonce_str: | |
key = self._generate_key(self.addon.ssecurity, nonce_str) | |
try: | |
encrypted_data = flow.request.urlencoded_form.get("data", "") if isinstance(http_message, http.Request) else data | |
cipher = ARC4.new(key, drop=1024) | |
decrypted_data = cipher.encrypt(b64decode(encrypted_data)) | |
decrypted_json = json.loads(decrypted_data.decode("utf-8")) | |
return self.name, contentviews.json.format_json(decrypted_json) | |
except (UnicodeDecodeError, json.JSONDecodeError, ValueError): | |
ctx.log.error(f"Decryption failed, ssecurity might be invalid or expired: {self.addon.ssecurity}") | |
# fallback to auto incase of decryption error | |
return contentviews.get("auto")(data, **unknown_metadata) | |
def render_priority( | |
self, | |
data: bytes, | |
*, | |
flow: Optional[flow.Flow] = None, | |
**unknown_metadata | |
) -> float: | |
if self.addon.ssecurity and isinstance(flow, http.HTTPFlow) and flow.request.headers.get("miot-encrypt-algorithm") == "ENCRYPT-RC4": | |
return 1.0 | |
return 0.0 | |
class MiOTRC4Addon: | |
"""Mitmproxy addon for MiOT RC4 decryption""" | |
def __init__(self): | |
self.ssecurity: Optional[str] = None | |
self.viewer = MiOTRC4Viewer() | |
self.viewer.addon = self | |
def load(self, loader: Loader) -> None: | |
loader.add_option( | |
name="ssecurity", | |
typespec=str, | |
default="", | |
help="ssecurity token for MiOT RC4 decryption" | |
) | |
contentviews.add(self.viewer) | |
if 'ssecurity' in ctx.options.deferred: | |
self.ssecurity = ctx.options.deferred['ssecurity'].val[0] | |
elif ctx.options.ssecurity: | |
self.ssecurity = ctx.options.ssecurity | |
def done(self) -> None: | |
contentviews.remove(self.viewer) | |
def request(self, flow: http.HTTPFlow) -> None: | |
flow.request.headers.pop("miot-accept-encoding", None) | |
def response(self, flow: http.HTTPFlow) -> None: | |
if flow.request.path == "/pass/serviceLoginAuth2" and flow.response.text: | |
try: | |
data = json.loads(flow.response.text.replace("&&&START&&&", "")) | |
if "ssecurity" in data: | |
self.ssecurity = data["ssecurity"] | |
ctx.log.info(f"Captured ssecurity: {self.ssecurity}") | |
else: | |
ctx.log.debug("No ssecurity key found in response data") | |
except json.JSONDecodeError: | |
ctx.log.error("Failed to parse auth response") | |
addons = [MiOTRC4Addon()] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment