Last active
August 8, 2024 11:05
-
-
Save Hacktivate-TH/cd1345dfe772d8c58741313adf8872ca to your computer and use it in GitHub Desktop.
Mitmproxy extension for editing gRPC messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Author: Hacktivate Co., Ltd. (https://hacktivate.tech) | |
# | |
# Description: This is an mitmproxy extension for editing gRPC messages over HTTP/2. | |
# Full blog post can be found at: https://hacktivate.tech/2022/10/27/a-hackish-way-to-tamper-grpc-traffic-in-android.html | |
# | |
from concurrent.futures.process import _threads_wakeups | |
import logging | |
import struct | |
from dataclasses import dataclass, field | |
from enum import Enum | |
from typing import Generator, Iterable, Iterator | |
import ast | |
import os | |
import shlex | |
import subprocess | |
import shutil | |
import tempfile | |
from collections.abc import Sequence | |
from typing import Optional, cast | |
from mitmproxy import command | |
from mitmproxy import ctx, flow, http | |
from mitmproxy.tools.console import signals | |
class ProtoParser: | |
@dataclass | |
class ParserRule: | |
""" | |
A parser rule lists Field definitions which are applied if the filter rule matches the flow. | |
Matching on flow-level also means, a match applies to request AND response messages. | |
To restrict a rule to a requests only use 'ParserRuleRequest', instead. | |
To restrict a rule to a responses only use 'ParserRuleResponse', instead. | |
""" | |
field_definitions = [] | |
"""List of field definitions for this rule """ | |
name: str = "" | |
"""Name of this rule, only used for debugging""" | |
filter: str = "" | |
""" | |
Flowfilter to select which flows to apply to ('~q' and '~s' can not be used to distinguish | |
if the rule should apply to the request or response of a flow. To do so, use ParserRuleRequest | |
or ParserRuleResponse. ParserRule always applies to request and response.) | |
""" | |
@dataclass | |
class ParserOptions: | |
# output should contain wiretype of fields | |
include_wiretype: bool = False | |
# output should contain the fields which describe nested messages | |
# (the nested messages bodies are always included, but the "header fields" could | |
# add unnecessary output overhead) | |
exclude_message_headers: bool = False | |
# output should contain wiretype of fields | |
include_wiretype: bool = False | |
# output should contain the fields which describe nested messages | |
# (the nested messages bodies are always included, but the "header fields" could | |
# add unnecessary output overhead) | |
exclude_message_headers: bool = False | |
# optional: rules | |
# rules: List[ProtoParser.ParserRule] = field(default_factory=list) | |
class DecodedTypes(Enum): | |
# varint | |
int32 = 0 | |
int64 = 1 | |
uint32 = 2 | |
uint64 = 3 | |
sint32 = 4 # ZigZag encoding | |
sint64 = 5 # ZigZag encoding | |
bool = 6 | |
enum = 7 | |
# bit_32 | |
fixed32 = 8 | |
sfixed32 = 9 | |
float = 10 | |
# bit_64 | |
fixed64 = 11 | |
sfixed64 = 12 | |
double = 13 | |
# len_delimited | |
string = 14 | |
bytes = 15 | |
message = 16 | |
# helper | |
unknown = 17 | |
@staticmethod | |
def _read_base128le(data: bytes) -> tuple[int, int]: | |
res = 0 | |
offset = 0 | |
while offset < len(data): | |
o = data[offset] | |
res += (o & 0x7F) << (7 * offset) | |
offset += 1 | |
if o < 0x80: | |
# the Kaitai parser for protobuf support base128 le values up | |
# to 8 groups (bytes). Due to the nature of the encoding, each | |
# group attributes 7bit to the resulting value, which give | |
# a 56 bit value at maximum. | |
# The values which get encoded into protobuf variable length integers, | |
# on the other hand, include full 64bit types (int64, uint64, sint64). | |
# This means, the Kaitai encoder can not cover the full range of | |
# possible values | |
# | |
# This decoder puts no limitation on the maximum value of variable | |
# length integers. Values exceeding 64bit have to be handled externally | |
return offset, res | |
raise ValueError("varint exceeds bounds of provided data") | |
@staticmethod | |
def _read_u32(data: bytes) -> tuple[int, int]: | |
return 4, struct.unpack("<I", data[:4])[0] | |
@staticmethod | |
def _read_u64(data: bytes) -> tuple[int, int]: | |
return 8, struct.unpack("<Q", data[:8])[0] | |
class WireTypes(Enum): | |
varint = 0 | |
bit_64 = 1 | |
len_delimited = 2 | |
group_start = 3 | |
group_end = 4 | |
bit_32 = 5 | |
@staticmethod | |
def read_fields( | |
wire_data: bytes, | |
parent_field, | |
options, | |
rules, | |
): | |
res = [] | |
pos = 0 | |
while pos < len(wire_data): | |
# read field key (tag and wire_type) | |
offset, key = ProtoParser._read_base128le(wire_data[pos:]) | |
# casting raises exception for invalid WireTypes | |
wt = ProtoParser.WireTypes(key & 7) | |
tag = key >> 3 | |
pos += offset | |
val = None | |
preferred_decoding: ProtoParser.DecodedTypes | |
if wt == ProtoParser.WireTypes.varint: | |
offset, val = ProtoParser._read_base128le(wire_data[pos:]) | |
pos += offset | |
bl = val.bit_length() | |
if bl > 64: | |
preferred_decoding = ProtoParser.DecodedTypes.unknown | |
if bl > 32: | |
preferred_decoding = ProtoParser.DecodedTypes.uint64 | |
if bl == 1: | |
preferred_decoding = ProtoParser.DecodedTypes.bool | |
else: | |
preferred_decoding = ProtoParser.DecodedTypes.uint32 | |
elif wt == ProtoParser.WireTypes.bit_64: | |
offset, val = ProtoParser._read_u64(wire_data[pos:]) | |
pos += offset | |
preferred_decoding = ProtoParser.DecodedTypes.fixed64 | |
elif wt == ProtoParser.WireTypes.len_delimited: | |
offset, length = ProtoParser._read_base128le(wire_data[pos:]) | |
pos += offset | |
if length > len(wire_data[pos:]): | |
raise ValueError("length delimited field exceeds data size") | |
val = wire_data[pos : pos + length] | |
pos += length | |
preferred_decoding = ProtoParser.DecodedTypes.message | |
elif ( | |
wt == ProtoParser.WireTypes.group_start | |
or wt == ProtoParser.WireTypes.group_end | |
): | |
raise ValueError(f"deprecated field: {wt}") | |
elif wt == ProtoParser.WireTypes.bit_32: | |
offset, val = ProtoParser._read_u32(wire_data[pos:]) | |
pos += offset | |
preferred_decoding = ProtoParser.DecodedTypes.fixed32 | |
else: | |
# not reachable as if-else statements contain all possible WireTypes | |
# wrong types raise Exception during typecasting in `wt = ProtoParser.WireTypes((key & 7))` | |
raise ValueError("invalid WireType for protobuf messsage field") | |
field = ProtoParser.Field( | |
wire_type=wt, | |
preferred_decoding=preferred_decoding, | |
options=options, | |
rules=rules, | |
tag=tag, | |
wire_value=val, | |
parent_field=parent_field, | |
) | |
res.append(field) | |
return res | |
class Field: | |
""" | |
Represents a single field of a protobuf message and handles the varios encodings. | |
As mitmproxy sees the data passing by as raw protobuf message, it only knows the | |
WireTypes. Each of the WireTypes could represent different Protobuf field types. | |
The exact Protobuf field type can not be determined from the wire format, thus different | |
options for decoding have to be supported. | |
In addition the parsed WireTypes are (intermediary) stored in Python types, which adds | |
some additional overhead type conversions. | |
WireType represented Protobuf Types Python type (intermediary) | |
0: varint int32, int64, uint32, uint64, enum, int (*) | |
sint32, sint64 (both ZigZag encoded), int | |
bool bool | |
float (**) | |
1: bit_64 fixed64, sfixed64, int (*) | |
double float | |
2: len_delimited string, str | |
message, class 'Message' | |
bytes, bytes (*) | |
packed_repeated_field class 'Message' (fields with same tag) | |
3: group_start unused (deprecated) - | |
4: group_end unused (deprecated) - | |
5: bit_32 fixed32, sfixed32, int (*) | |
float float | |
(*) Note 1: Conversion between WireType and intermediary python representation | |
is handled by Kaitai protobuf decoder and always uses the python | |
representation marked with (*). Converting to alternative representations | |
is handled inside this class. | |
(**) Note 2: Varint is not used to represent floating point values, but some applications | |
store native floats in uint32 protobuf types (or native double in uint64). | |
Thus we allow conversion of varint to floating point values for convenience | |
(A well known APIs "hide" GPS latitude and longitude values in varint types, | |
much easier to spot such things when rendered as float) | |
Ref: - https://developers.google.com/protocol-buffers/docs/proto3 | |
- https://developers.google.com/protocol-buffers/docs/encoding | |
""" | |
def __init__( | |
self, | |
wire_type, | |
preferred_decoding, | |
tag: int, | |
parent_field, | |
wire_value, | |
options, | |
rules, | |
is_unpacked_children: bool = False, | |
) -> None: | |
self.wire_type: ProtoParser.WireTypes = wire_type | |
self.preferred_decoding: ProtoParser.DecodedTypes = preferred_decoding | |
self.wire_value = wire_value | |
self.tag: int = tag | |
self.options: ProtoParser.ParserOptions = options | |
self.name: str = "" | |
self.rules = rules | |
self.parent_field = parent_field | |
self.is_unpacked_children: bool = ( | |
is_unpacked_children # marks field as being a result of unpacking | |
) | |
self.is_packed_parent: bool = ( | |
False # marks field as being parent of successfully unpacked children | |
) | |
self.parent_tags: list[int] = [] | |
if self.parent_field is not None: | |
self.parent_tags = self.parent_field.parent_tags[:] | |
self.parent_tags.append(self.parent_field.tag) | |
self.try_unpack = False | |
# rules can overwrite self.try_unpack | |
self.apply_rules() | |
# do not unpack fields which are the result of unpacking | |
if parent_field is not None and self.is_unpacked_children: | |
self.try_unpack = False | |
# no tests for only_first_hit=False, as not user-changable | |
def apply_rules(self, only_first_hit=True): | |
tag_str = self._gen_tag_str() | |
name = None | |
decoding = None | |
as_packed = False | |
try: | |
for rule in self.rules: | |
for fd in rule.field_definitions: | |
match = False | |
if len(fd.tag_prefixes) == 0 and fd.tag == tag_str: | |
match = True | |
else: | |
for rt in fd.tag_prefixes: | |
if rt + fd.tag == tag_str: | |
match = True | |
break | |
if match: | |
if only_first_hit: | |
# only first match | |
self.name = fd.name | |
self.preferred_decoding = fd.intended_decoding | |
self.try_unpack = fd.as_packed | |
return | |
else: | |
# overwrite matches till last rule was inspected | |
# (f.e. allows to define name in one rule and intended_decoding in another one) | |
name = fd.name if fd.name else name | |
decoding = ( | |
fd.intended_decoding | |
if fd.intended_decoding | |
else decoding | |
) | |
if fd.as_packed: | |
as_packed = True | |
if name: | |
self.name = name | |
if decoding: | |
self.preferred_decoding = decoding | |
self.try_unpack = as_packed | |
except Exception as e: | |
logging.warning(e) | |
def _gen_tag_str(self): | |
tags = self.parent_tags[:] | |
tags.append(self.tag) | |
return ".".join([str(tag) for tag in tags]) | |
def safe_decode_as( | |
self, | |
intended_decoding, | |
try_as_packed: bool = False, | |
): | |
""" | |
Tries to decode as intended, applies failover, if not possible | |
Returns selected decoding and decoded value | |
""" | |
if self.wire_type == ProtoParser.WireTypes.varint: | |
try: | |
return intended_decoding, self.decode_as( | |
intended_decoding, try_as_packed | |
) | |
except: | |
if int(self.wire_value).bit_length() > 32: | |
# ignore the fact that varint could exceed 64bit (would violate the specs) | |
return ProtoParser.DecodedTypes.uint64, self.wire_value | |
else: | |
return ProtoParser.DecodedTypes.uint32, self.wire_value | |
elif self.wire_type == ProtoParser.WireTypes.bit_64: | |
try: | |
return intended_decoding, self.decode_as( | |
intended_decoding, try_as_packed | |
) | |
except: | |
return ProtoParser.DecodedTypes.fixed64, self.wire_value | |
elif self.wire_type == ProtoParser.WireTypes.bit_32: | |
try: | |
return intended_decoding, self.decode_as( | |
intended_decoding, try_as_packed | |
) | |
except: | |
return ProtoParser.DecodedTypes.fixed32, self.wire_value | |
elif self.wire_type == ProtoParser.WireTypes.len_delimited: | |
try: | |
return intended_decoding, self.decode_as( | |
intended_decoding, try_as_packed | |
) | |
except: | |
# failover strategy: message --> string (valid UTF-8) --> bytes | |
len_delimited_strategy = [ | |
ProtoParser.DecodedTypes.message, | |
ProtoParser.DecodedTypes.string, | |
ProtoParser.DecodedTypes.bytes, # should always work | |
] | |
for failover_decoding in len_delimited_strategy: | |
if failover_decoding == intended_decoding and not try_as_packed: | |
# don't try same decoding twice, unless first attempt was packed | |
continue | |
try: | |
return failover_decoding, self.decode_as( | |
failover_decoding, False | |
) | |
except: | |
pass | |
# we should never get here (could not be added to tests) | |
return ProtoParser.DecodedTypes.unknown, self.wire_value | |
def decode_as( | |
self, intended_decoding, as_packed: bool = False | |
): | |
if as_packed is True: | |
return ProtoParser.read_packed_fields(packed_field=self) | |
if self.wire_type == ProtoParser.WireTypes.varint: | |
assert isinstance(self.wire_value, int) | |
if intended_decoding == ProtoParser.DecodedTypes.bool: | |
# clamp result to 64bit | |
return self.wire_value & 0xFFFFFFFFFFFFFFFF != 0 | |
elif intended_decoding == ProtoParser.DecodedTypes.int32: | |
if self.wire_value.bit_length() > 32: | |
raise TypeError("wire value too large for int32") | |
return struct.unpack("!i", struct.pack("!I", self.wire_value))[0] | |
elif intended_decoding == ProtoParser.DecodedTypes.int64: | |
if self.wire_value.bit_length() > 64: | |
raise TypeError("wire value too large for int64") | |
return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0] | |
elif intended_decoding == ProtoParser.DecodedTypes.uint32: | |
if self.wire_value.bit_length() > 32: | |
raise TypeError("wire value too large for uint32") | |
return self.wire_value # already 'int' which was parsed as unsigned | |
elif ( | |
intended_decoding == ProtoParser.DecodedTypes.uint64 | |
or intended_decoding == ProtoParser.DecodedTypes.enum | |
): | |
if self.wire_value.bit_length() > 64: | |
raise TypeError("wire value too large") | |
return self.wire_value # already 'int' which was parsed as unsigned | |
elif intended_decoding == ProtoParser.DecodedTypes.sint32: | |
if self.wire_value.bit_length() > 32: | |
raise TypeError("wire value too large for sint32") | |
return (self.wire_value >> 1) ^ -( | |
self.wire_value & 1 | |
) # zigzag_decode | |
elif intended_decoding == ProtoParser.DecodedTypes.sint64: | |
if self.wire_value.bit_length() > 64: | |
raise TypeError("wire value too large for sint64") | |
# ZigZag decode | |
# Ref: https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba | |
return (self.wire_value >> 1) ^ -(self.wire_value & 1) | |
elif ( | |
intended_decoding == ProtoParser.DecodedTypes.float | |
or intended_decoding == ProtoParser.DecodedTypes.double | |
): | |
# special case, not complying to protobuf specs | |
return self._wire_value_as_float() | |
elif self.wire_type == ProtoParser.WireTypes.bit_64: | |
if intended_decoding == ProtoParser.DecodedTypes.fixed64: | |
return self.wire_value | |
elif intended_decoding == ProtoParser.DecodedTypes.sfixed64: | |
return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0] | |
elif intended_decoding == ProtoParser.DecodedTypes.double: | |
return self._wire_value_as_float() | |
elif self.wire_type == ProtoParser.WireTypes.bit_32: | |
if intended_decoding == ProtoParser.DecodedTypes.fixed32: | |
return self.wire_value | |
elif intended_decoding == ProtoParser.DecodedTypes.sfixed32: | |
return struct.unpack("!i", struct.pack("!I", self.wire_value))[0] | |
elif intended_decoding == ProtoParser.DecodedTypes.float: | |
return self._wire_value_as_float() | |
elif self.wire_type == ProtoParser.WireTypes.len_delimited: | |
assert isinstance(self.wire_value, bytes) | |
if intended_decoding == ProtoParser.DecodedTypes.string: | |
# According to specs, a protobuf string HAS TO be UTF-8 parsable | |
# throw exception on invalid UTF-8 chars, but escape linebreaks | |
return self.wire_value_as_utf8(escape_newline=True) | |
elif intended_decoding == ProtoParser.DecodedTypes.bytes: | |
# always works, assure to hand back a copy | |
return self.wire_value[:] | |
elif intended_decoding == ProtoParser.DecodedTypes.message: | |
return ProtoParser.read_fields( | |
wire_data=self.wire_value, | |
parent_field=self, | |
options=self.options, | |
rules=self.rules, | |
) | |
# if here, there is no valid decoding | |
raise TypeError("intended decoding mismatches wire type") | |
def encode_from(inputval, intended_encoding): | |
raise NotImplementedError( | |
"Future work, needed to manipulate and re-encode protobuf message, with respect to given wire types" | |
) | |
def _wire_value_as_float(self) -> float: | |
""" | |
Handles double (64bit) and float (32bit). | |
Assumes Network Byte Order (big endian). | |
Usable for: | |
WireType --> Protobuf Type): | |
---------------------------- | |
varint --> double/float (not intended by ProtoBuf, but used in the wild) | |
bit_32 --> float | |
bit_64 --> double | |
len_delimited --> 4 bytes: float / 8 bytes: double / other sizes return NaN | |
""" | |
v = self._value_as_bytes() | |
if len(v) == 4: | |
return struct.unpack("!f", v)[0] | |
elif len(v) == 8: | |
return struct.unpack("!d", v)[0] | |
# no need to raise an Exception | |
raise TypeError("can not be converted to floatingpoint representation") | |
def _value_as_bytes(self) -> bytes: | |
if isinstance(self.wire_value, bytes): | |
return self.wire_value | |
elif isinstance(self.wire_value, int): | |
if self.wire_value.bit_length() > 64: | |
# source for a python int are wiretypes varint/bit_32/bit64 and should never convert to int values 64bit | |
# currently avoided by kaitai decoder (can not be added to tests) | |
raise ValueError("value exceeds 64bit, violating protobuf specs") | |
elif self.wire_value.bit_length() > 32: | |
# packing uses network byte order (to assure consistent results across architectures) | |
return struct.pack("!Q", self.wire_value) | |
else: | |
# packing uses network byte order (to assure consistent results across architectures) | |
return struct.pack("!I", self.wire_value) | |
else: | |
# should never happen, no tests | |
raise ValueError("can not be converted to bytes") | |
def _wire_type_str(self): | |
return str(self.wire_type).split(".")[-1] | |
def _decoding_str(self, decoding): | |
return str(decoding).split(".")[-1] | |
def wire_value_as_utf8(self, escape_newline=True) -> str: | |
if isinstance(self.wire_value, bytes): | |
res = self.wire_value.decode("utf-8") | |
return res.replace("\n", "\\n") if escape_newline else res | |
return str(self.wire_value) | |
def gen_flat_decoded_field_dicts(self) -> Generator[dict, None, None]: | |
""" | |
Returns a generator which passes the field as a dict. | |
In order to return the field value it gets decoded (based on a failover strategy and | |
provided ParserRules). | |
If the field holds a nested message, the fields contained in the message are appended. | |
Ultimately this flattens all fields recursively. | |
""" | |
selected_decoding, decoded_val = self.safe_decode_as( | |
self.preferred_decoding, self.try_unpack | |
) | |
field_desc_dict = { | |
"tag": self._gen_tag_str(), | |
"wireType": self._wire_type_str(), | |
"decoding": self._decoding_str(selected_decoding), | |
"name": self.name, | |
} | |
if isinstance(decoded_val, list): | |
if ( | |
selected_decoding | |
== ProtoParser.DecodedTypes.message # field is a message with subfields | |
and not self.is_packed_parent # field is a message, but replaced by packed fields | |
): | |
# Field is a message, not packed, thus include it as message header | |
field_desc_dict["val"] = "" | |
yield field_desc_dict | |
# add sub-fields of messages or packed fields | |
for f in decoded_val: | |
yield from f.gen_flat_decoded_field_dicts() | |
else: | |
field_desc_dict["val"] = decoded_val | |
yield field_desc_dict | |
def __init__( | |
self, | |
data: bytes, | |
rules = None, | |
parser_options: ParserOptions = None, | |
) -> None: | |
self.data: bytes = data | |
if parser_options is None: | |
parser_options = ProtoParser.ParserOptions() | |
self.options = parser_options | |
if rules is None: | |
rules = [] | |
self.rules = rules | |
try: | |
self.root_fields = ProtoParser.read_fields( | |
wire_data=self.data, | |
options=self.options, | |
parent_field=None, | |
rules=self.rules, | |
) | |
except Exception as e: | |
raise ValueError("not a valid protobuf message") from e | |
def gen_flat_decoded_field_dicts(self) -> Generator[dict, None, None]: | |
for f in self.root_fields: | |
yield from f.gen_flat_decoded_field_dicts() | |
def gen_str_rows(self) -> Generator[tuple[str, ...], None, None]: | |
for field_dict in self.gen_flat_decoded_field_dicts(): | |
if ( | |
self.options.exclude_message_headers | |
and field_dict["decoding"] == "message" | |
): | |
continue | |
if self.options.include_wiretype: | |
col1 = "[{}->{}]".format(field_dict["wireType"], field_dict["decoding"]) | |
else: | |
col1 = "[{}]".format(field_dict["decoding"]) | |
col2 = field_dict["name"] # empty string if not set (consumes no space) | |
col3 = field_dict["tag"] | |
col4 = str(field_dict["val"]) | |
yield col1, col2, col3, col4 | |
def parse_grpc_messages( | |
data, compression_scheme | |
) -> Generator[tuple[bool, bytes], None, None]: | |
while data: | |
try: | |
msg_is_compressed, length = struct.unpack("!?i", data[:5]) | |
decoded_message = struct.unpack("!%is" % length, data[5 : 5 + length])[0] | |
except Exception as e: | |
raise ValueError("invalid gRPC message") from e | |
if msg_is_compressed: | |
try: | |
#to implement gzip decoding function | |
decoded_message = decode( | |
encoded=decoded_message, encoding=compression_scheme | |
) | |
except Exception as e: | |
raise ValueError("Failed to decompress gRPC message with gzip") from e | |
yield msg_is_compressed, decoded_message | |
data = data[5 + length :] | |
class ProtoSerializer: | |
@staticmethod | |
def _write_base128le(proto_tuple: tuple[str, ...]) -> bytearray: | |
# 0 = decoded type, 1 = name, 2 = tag, 3 = value | |
decoded_type = proto_tuple[0][1:-1] | |
res = bytearray(b"") | |
#remove . from tag of embeded message | |
tag = proto_tuple[2] | |
if(proto_tuple[2].find(".") != -1): | |
tag = tag.split(".")[-1] | |
#message key is (field_number << 3) | wire_type (varint = 0x00) | |
res.append(int(tag)<<3|0x00) | |
#bool | |
if (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.bool): | |
res.append((1 if proto_tuple[3]=="True" else 0)) | |
return res | |
#uint32, uint64 | |
val = int(proto_tuple[3]) | |
if (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.int32 | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.uint32 | |
): | |
for i in range(4): #32/7 = 4 | |
if(val) == 0: | |
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big") | |
break | |
res.append(val&0x7F|0x80) | |
val = val >> 7 | |
#append remaining 4 bits | |
if(val) != 0: | |
res.append(val) | |
else: | |
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big") | |
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.int64 | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.uint64 | |
): | |
for i in range(9): #64/7 = 4 | |
if(val) == 0: | |
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big") | |
break | |
res.append(val&0x7F|0x80) | |
val = val >> 7 | |
#append remaining 1 bits | |
if(val) != 0: | |
res.append(val) | |
else: | |
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big") | |
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.bool): | |
res.append((1 if val=="True" else 0)) | |
return res | |
def _write_u32(proto_tuple: tuple[str, ...]) -> bytearray: | |
# 0 = decoded type, 1 = name, 2 = tag, 3 = value | |
decoded_type = proto_tuple[0][1:-1] | |
res = bytearray(b"") | |
#remove . from tag of embeded message | |
tag = proto_tuple[2] | |
if(proto_tuple[2].find(".") != -1): | |
tag = tag.split(".")[-1] | |
#message key is (field_number << 3) | wire_type (32-bit = 0x05) | |
res.append(int(tag)<<3|0x05) | |
return res+int(proto_tuple[3]).to_bytes(4,'little') | |
def _write_u64(proto_tuple: tuple[str, ...]) -> bytearray: | |
# 0 = decoded type, 1 = name, 2 = tag, 3 = value | |
decoded_type = proto_tuple[0][1:-1] | |
res = bytearray(b"") | |
#remove . from tag of embeded message | |
tag = proto_tuple[2] | |
if(proto_tuple[2].find(".") != -1): | |
tag = tag.split(".")[-1] | |
#message key is (field_number << 3) | wire_type (64-bit = 0x01) | |
res.append(int(tag)<<3|0x01) | |
return res+int(proto_tuple[3]).to_bytes(8,'little') | |
def _write_length_delimited(proto_tuple: tuple[str, ...]) -> bytearray: | |
# 0 = decoded type, 1 = name, 2 = tag, 3 = value | |
decoded_type = proto_tuple[0][1:-1] | |
res = bytearray(b"") | |
#remove . from tag of embeded message | |
tag = proto_tuple[2] | |
if(proto_tuple[2].find(".") != -1): | |
tag = tag.split(".")[-1] | |
#message key is (field_number << 3) | wire_type (64-bit = 0x01) | |
res.append((int(tag)<<3|0x02)) | |
return res+ ProtoSerializer._int_to_base128le(len(proto_tuple[3])) + bytes(proto_tuple[3], 'latin1') | |
def _int_to_base128le(val: int) -> bytearray: | |
res = b"" | |
bl = val.bit_length() | |
if bl > 32: | |
for i in range(9): #64/7 = 4 | |
if(val) == 0: | |
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big") | |
break | |
res += (val&0x7F|0x80).to_bytes(1,"big") | |
val = val >> 7 | |
#append remaining 1 bits | |
if(val) != 0: | |
res += val.to_bytes(1,"big") | |
else: | |
for i in range(4): #32/7 = 4 | |
if(val) == 0: | |
res = res[:-1]+(res[-1]&0x7F).to_bytes(1,"big") | |
break | |
res += (val&0x7F|0x80).to_bytes(1,"big") | |
val = val >> 7 | |
if(val) != 0: | |
res += val.to_bytes(1,"big") | |
return res | |
def serializeProto(proto_tuples: list[tuple[str,...]]): | |
res = b"" | |
found_embeded_message = False | |
embeded_message = [] | |
for i in range(len(proto_tuples)): | |
proto_tuple = proto_tuples[i] | |
# 0 = decoded type, 1 = name, 2 = tag, 3 = value | |
decoded_type = proto_tuple[0][1:-1] | |
if found_embeded_message == True: | |
if proto_tuple[2].find(".") != -1: | |
embeded_message.append(proto_tuple) | |
if(i==len(proto_tuples)-1): | |
embeded_message_bytes = ProtoSerializer.serializeProto(embeded_message) | |
res += ProtoSerializer._int_to_base128le(len(embeded_message_bytes)) + embeded_message_bytes | |
continue | |
embeded_message_bytes = ProtoSerializer.serializeProto(embeded_message) | |
res += ProtoSerializer._int_to_base128le(len(embeded_message_bytes)) + embeded_message_bytes | |
found_embeded_message = False | |
if (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.int32 | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.int64 | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.uint32 | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.uint64 | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.sint32 | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.sint64 | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.bool | |
or ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.enum | |
): | |
res += ProtoSerializer._write_base128le(proto_tuple) | |
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.fixed64): | |
res += ProtoSerializer._write_u64(proto_tuple) | |
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.fixed32): | |
res += ProtoSerializer._write_u32(proto_tuple) | |
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.string): | |
res += ProtoSerializer._write_length_delimited(proto_tuple) | |
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.bytes): | |
tmp = list(proto_tuple) | |
#removing prefix b' and suffix ' | |
tmp[3] = proto_tuple[3][2:-1] | |
tmp[3] = tmp[3].encode('utf-8').decode('unicode_escape').encode('latin1').decode('latin1') | |
res += ProtoSerializer._write_length_delimited(tuple(tmp)) | |
elif (ProtoParser.DecodedTypes[decoded_type] == ProtoParser.DecodedTypes.message): | |
tag = proto_tuple[2] | |
if(proto_tuple[2].find(".") != -1): | |
tag = tag.split(".")[-1] | |
res += (int(tag)<<3|0x02).to_bytes(1,'big') | |
found_embeded_message=True | |
continue | |
return res | |
def serializeGrpc(proto: bytes): | |
ctx.log.info(len(proto)) | |
ctx.log.info(len(proto).to_bytes(4, 'big')) | |
res = b"\x00"+ len(proto).to_bytes(4, 'big') +proto | |
return res | |
def string_to_tuples_list(text: str): | |
result = "" | |
text = text.strip() | |
for each_line in text: | |
result += each_line.replace("\n", ",") | |
result = '[' + result.strip() + ']' | |
return ast.literal_eval(result) | |
class MyAddon: | |
def __init__(self): | |
self.num = 0 | |
def get_editor(self) -> str: | |
# based upon https://github.com/pallets/click/blob/main/src/click/_termui_impl.py | |
if m := os.environ.get("MITMPROXY_EDITOR"): | |
return m | |
if m := os.environ.get("EDITOR"): | |
return m | |
for editor in "sensible-editor", "nano", "vim": | |
if shutil.which(editor): | |
return editor | |
if os.name == "nt": | |
return "notepad" | |
else: | |
return "vi" | |
@command.command("grpc.edit") | |
def edit(self, flow: flow.Flow): | |
http_flow = cast(http.HTTPFlow, flow) | |
content = None | |
ctx.log.info("This is some informative text.") | |
if http_flow.response is not None: | |
content = http_flow.response.content | |
else: | |
content = http_flow.request.content | |
data = "" | |
i = 0 | |
for compressed, pb_message in ProtoParser.parse_grpc_messages(content,"compression_alg"): | |
pb_text = "" | |
for field in ProtoParser( | |
data=pb_message, parser_options=None, rules=None | |
).gen_str_rows(): | |
pb_text = pb_text + str(field) + '\n' | |
data += "=== Protobuf message #" + str(i) + " ===\n" + pb_text | |
i = i+1 | |
text = not isinstance(data, bytes) | |
fd, name = tempfile.mkstemp("", "mitmproxy", text=text) | |
with open(fd, "w" if text else "wb") as f: | |
f.write(data) | |
c = self.get_editor() | |
cmd = shlex.split(c) | |
cmd.append(name) | |
modified_data = None | |
with ctx.master.uistopped(): | |
try: | |
subprocess.call(cmd) | |
except: | |
signals.status_message.send(message="Can't start editor: %s" % c) | |
else: | |
with open(name, "r" if text else "rb") as f: | |
modified_data = f.read() | |
os.unlink(name) | |
modified_body = b"" | |
for pb_message in modified_data.split("===")[2::2]: | |
modified_body += ProtoSerializer.serializeGrpc(ProtoSerializer.serializeProto(string_to_tuples_list(pb_message))) | |
if http_flow.response is not None: | |
http_flow.response.content = modified_body | |
else: | |
http_flow.request.content = modified_body | |
ctx.master.window.focus_changed() | |
addons = [MyAddon()] |
Hi @wtfiwtz,
Unfortunately, so far we haven't implemented the support for compressed gRPC messages. It seems like you are dealing with compressed gRPC so the error occurs.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Great tool... however, I get this:
Looks like I have to disable the compression somehow.