Skip to content

Instantly share code, notes, and snippets.

@Wh1terat
Last active March 20, 2025 02:58
Show Gist options
  • Save Wh1terat/a9a151d69538abb9d5fe317dee0ffe23 to your computer and use it in GitHub Desktop.
Save Wh1terat/a9a151d69538abb9d5fe317dee0ffe23 to your computer and use it in GitHub Desktop.
mitmproxy contentview for Mi RC4 messages
from typing import Optional, List
from mitmproxy import contentviews, flow, http, ctx
from mitmproxy.addonmanager import Loader
from Crypto.Cipher import ARC4
from base64 import b64decode
import hashlib
import json
class MiOTRC4Viewer(contentviews.View):
"""Content view for MiOT RC4 encrypted messages"""
name = "miotrc4"
@staticmethod
def _generate_key(ssecurity: str, nonce: str) -> bytes:
return hashlib.sha256(b64decode(ssecurity) + b64decode(nonce)).digest()
def __call__(
self,
data: bytes,
*,
flow: Optional[flow.Flow] = None,
http_message: Optional[http.Message] = None,
**unknown_metadata
) -> contentviews.TViewResult:
if self.addon.ssecurity and isinstance(flow, http.HTTPFlow) and flow.request.headers.get("miot-encrypt-algorithm") == "ENCRYPT-RC4":
nonce_str = flow.request.urlencoded_form.get("_nonce", "")
if nonce_str:
key = self._generate_key(self.addon.ssecurity, nonce_str)
try:
encrypted_data = flow.request.urlencoded_form.get("data", "") if isinstance(http_message, http.Request) else data
cipher = ARC4.new(key, drop=1024)
decrypted_data = cipher.encrypt(b64decode(encrypted_data))
decrypted_json = json.loads(decrypted_data.decode("utf-8"))
return self.name, contentviews.json.format_json(decrypted_json)
except (UnicodeDecodeError, json.JSONDecodeError, ValueError):
ctx.log.error(f"Decryption failed, ssecurity might be invalid or expired: {self.addon.ssecurity}")
# fallback to auto incase of decryption error
return contentviews.get("auto")(data, **unknown_metadata)
def render_priority(
self,
data: bytes,
*,
flow: Optional[flow.Flow] = None,
**unknown_metadata
) -> float:
if self.addon.ssecurity and isinstance(flow, http.HTTPFlow) and flow.request.headers.get("miot-encrypt-algorithm") == "ENCRYPT-RC4":
return 1.0
return 0.0
class MiOTRC4Addon:
"""Mitmproxy addon for MiOT RC4 decryption"""
def __init__(self):
self.ssecurity: Optional[str] = None
self.viewer = MiOTRC4Viewer()
self.viewer.addon = self
def load(self, loader: Loader) -> None:
loader.add_option(
name="ssecurity",
typespec=str,
default="",
help="ssecurity token for MiOT RC4 decryption"
)
contentviews.add(self.viewer)
if 'ssecurity' in ctx.options.deferred:
self.ssecurity = ctx.options.deferred['ssecurity'].val[0]
elif ctx.options.ssecurity:
self.ssecurity = ctx.options.ssecurity
def done(self) -> None:
contentviews.remove(self.viewer)
def request(self, flow: http.HTTPFlow) -> None:
flow.request.headers.pop("miot-accept-encoding", None)
def response(self, flow: http.HTTPFlow) -> None:
if flow.request.path == "/pass/serviceLoginAuth2" and flow.response.text:
try:
data = json.loads(flow.response.text.replace("&&&START&&&", ""))
if "ssecurity" in data:
self.ssecurity = data["ssecurity"]
ctx.log.info(f"Captured ssecurity: {self.ssecurity}")
else:
ctx.log.debug("No ssecurity key found in response data")
except json.JSONDecodeError:
ctx.log.error("Failed to parse auth response")
addons = [MiOTRC4Addon()]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment