Skip to content

Instantly share code, notes, and snippets.

@vndee
Last active December 4, 2024 15:22
Show Gist options
  • Save vndee/592291acf552953db9afd32c1c3b09e0 to your computer and use it in GitHub Desktop.
Save vndee/592291acf552953db9afd32c1c3b09e0 to your computer and use it in GitHub Desktop.
class RESPParser:
def __init__(
self, protocol_version: RESPProtocolVersion = RESPProtocolVersion.RESP2
):
self.protocol_version = protocol_version
def parse(self, data: bytes) -> List[RESPObject]:
"""
Parse RESP data into a list of RESPObjects.
Each complete command will be one RESPObject in the list.
"""
if not data:
return []
objects = []
remaining = data
while remaining:
try:
object_size = self._get_object_size(remaining)
if object_size <= 0:
break
obj = self._parse_single(remaining[:object_size])
if obj:
objects.append(obj)
if (
isinstance(obj, RESPSimpleString)
and len(remaining) - object_size >= 2
and remaining[object_size : object_size + 2] == b"\r\n"
):
object_size += 2
remaining = remaining[object_size:]
except (ValueError, IndexError):
break
return objects
def _get_object_size(self, data: bytes) -> int:
"""
Calculate the size of the next complete RESP object in bytes.
"""
if len(data) < 4: # Minimum valid RESP object size
return 0
try:
type_byte = data[0:1].decode()
resp_type = RESPObjectType(type_byte)
line_end = data.find(b"\r\n")
if line_end == -1:
return 0
if resp_type in {
RESPObjectType.SIMPLE_STRING,
RESPObjectType.SIMPLE_ERROR,
RESPObjectType.INTEGER,
}:
return line_end + 2
elif resp_type == RESPObjectType.BULK_STRING:
length = int(data[1:line_end])
if length == -1:
return line_end + 2
if (
len(data) - line_end + length + 2 > 0
and data[line_end + 2 + length : line_end + 2 + length + 2]
== b"\r\n"
):
return (
line_end + 2 + length + 2
) # Size of the bulk string header + length + CRLF
return line_end + 2 + length
elif resp_type == RESPObjectType.ARRAY:
total_size = line_end + 2 # Size of the array header
elements_count = int(data[1:line_end])
if elements_count == 0:
return total_size
remaining = data[total_size:]
for _ in range(elements_count):
element_size = self._get_object_size(remaining)
if element_size <= 0:
return 0
total_size += element_size
remaining = remaining[element_size:]
return total_size
except (ValueError, IndexError):
return 0
return 0
def _parse_single(self, data: bytes) -> Optional[RESPObject]:
"""
Parse a single complete RESP object.
"""
if not data:
return None
try:
type_byte = data[0:1].decode()
resp_type = RESPObjectType(type_byte)
is_not_end_with_crlf = not data.endswith(b"\r\n")
lines = data.split(b"\r\n")
if resp_type == RESPObjectType.SIMPLE_STRING:
return RESPSimpleString(
value=lines[0][1:].decode(), bytes_length=len(data)
)
elif resp_type == RESPObjectType.SIMPLE_ERROR:
return RESPError(value=lines[0][1:].decode(), bytes_length=len(data))
elif resp_type == RESPObjectType.INTEGER:
return RESPInteger(value=int(lines[0][1:]), bytes_length=len(data))
elif resp_type == RESPObjectType.BULK_STRING:
length = int(lines[0][1:])
if length == -1:
return RESPBulkString(value=None)
return (
RESPBulkBytes(value=lines[1], bytes_length=len(data))
if is_not_end_with_crlf
else RESPBulkString(value=lines[1].decode(), bytes_length=len(data))
)
elif resp_type == RESPObjectType.ARRAY:
length = int(lines[0][1:])
if length == 0:
return RESPArray(value=[], bytes_length=len(data))
elements = []
current_data = data[data.find(b"\r\n") + 2 :]
for _ in range(length):
element_size = self._get_object_size(current_data)
if element_size <= 0:
break
element = self._parse_single(current_data[:element_size])
if element:
elements.append(element)
current_data = current_data[element_size:]
return RESPArray(value=elements, bytes_length=len(data))
except (ValueError, IndexError) as e:
print(f"Error parsing RESP data: {e}")
return None
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment