Skip to content

Instantly share code, notes, and snippets.

@dipu-bd
Created February 2, 2022 07:22
Show Gist options
  • Save dipu-bd/b2d3a30501770cfe67bbc27bdae4d784 to your computer and use it in GitHub Desktop.
Save dipu-bd/b2d3a30501770cfe67bbc27bdae4d784 to your computer and use it in GitHub Desktop.
Protobuf encoder and decoder test
class Protobuf:
def __init__(self, raw: bytes) -> None:
self.raw = raw
self.offset = 0
@property
def fields(self) -> dict:
if not hasattr(self, '__fields__'):
self.__fields__ = {}
while self.has_next():
i, d = self.next_field()
self.__fields__[i] = d
return self.__fields__
def has_next(self) -> bool:
return self.offset < len(self.raw)
def _read(self, size = 1) -> bytes:
b = self.offset
e = self.offset + size
s = self.raw[b:e]
self.offset = e
return s
def next_byte(self) -> int:
return self._read(1)[0]
def next_int(self, signed = False) -> int:
n = 0
p = 0
b = 0xFF
while b >> 7:
b = self.next_byte()
n |= (b & 0x7F) << p
p += 7
if signed:
s = n & 1
n >>= 1
if s:
n = -(n + 1)
return n
def next_string(self) -> bytes:
return self._read(self.next_int())
def next_field(self):
f = self.next_int()
field_number = f >> 3
wire_type = f & 0b111
if wire_type == 0:
return field_number, self.next_int()
elif wire_type == 2:
return field_number, self.next_string()
elif wire_type == 1:
self.offset += 8
return field_number, None
elif wire_type == 1:
self.offset += 4
return field_number, None
else:
self.offset = len(self.raw)
return field_number, None
class ProtobufWriter:
def __init__(self) -> None:
self._buf = []
def write(self, data: bytes) -> 'ProtobufWriter':
self._buf += list(data)
return self
def write_int(self, number: int = 0, signed = False) -> 'ProtobufWriter':
if number < 0 or signed:
if number < 0:
number = ((-number) << 1) - 1
else:
number <<= 1
b = []
while number:
if b:
b[-1] |= 0x80
b.append(number & 0x7F)
number >>= 7
self._buf += b
return self
def write_string(self, string: str) -> 'ProtobufWriter':
self.write_int(len(string))
return self.write(string.encode('utf8'))
def to_bytes(self) -> bytes:
return bytes(self._buf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment