Last active
December 4, 2024 15:22
-
-
Save vndee/592291acf552953db9afd32c1c3b09e0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class RESPParser: | |
def __init__( | |
self, protocol_version: RESPProtocolVersion = RESPProtocolVersion.RESP2 | |
): | |
self.protocol_version = protocol_version | |
def parse(self, data: bytes) -> List[RESPObject]: | |
""" | |
Parse RESP data into a list of RESPObjects. | |
Each complete command will be one RESPObject in the list. | |
""" | |
if not data: | |
return [] | |
objects = [] | |
remaining = data | |
while remaining: | |
try: | |
object_size = self._get_object_size(remaining) | |
if object_size <= 0: | |
break | |
obj = self._parse_single(remaining[:object_size]) | |
if obj: | |
objects.append(obj) | |
if ( | |
isinstance(obj, RESPSimpleString) | |
and len(remaining) - object_size >= 2 | |
and remaining[object_size : object_size + 2] == b"\r\n" | |
): | |
object_size += 2 | |
remaining = remaining[object_size:] | |
except (ValueError, IndexError): | |
break | |
return objects | |
def _get_object_size(self, data: bytes) -> int: | |
""" | |
Calculate the size of the next complete RESP object in bytes. | |
""" | |
if len(data) < 4: # Minimum valid RESP object size | |
return 0 | |
try: | |
type_byte = data[0:1].decode() | |
resp_type = RESPObjectType(type_byte) | |
line_end = data.find(b"\r\n") | |
if line_end == -1: | |
return 0 | |
if resp_type in { | |
RESPObjectType.SIMPLE_STRING, | |
RESPObjectType.SIMPLE_ERROR, | |
RESPObjectType.INTEGER, | |
}: | |
return line_end + 2 | |
elif resp_type == RESPObjectType.BULK_STRING: | |
length = int(data[1:line_end]) | |
if length == -1: | |
return line_end + 2 | |
if ( | |
len(data) - line_end + length + 2 > 0 | |
and data[line_end + 2 + length : line_end + 2 + length + 2] | |
== b"\r\n" | |
): | |
return ( | |
line_end + 2 + length + 2 | |
) # Size of the bulk string header + length + CRLF | |
return line_end + 2 + length | |
elif resp_type == RESPObjectType.ARRAY: | |
total_size = line_end + 2 # Size of the array header | |
elements_count = int(data[1:line_end]) | |
if elements_count == 0: | |
return total_size | |
remaining = data[total_size:] | |
for _ in range(elements_count): | |
element_size = self._get_object_size(remaining) | |
if element_size <= 0: | |
return 0 | |
total_size += element_size | |
remaining = remaining[element_size:] | |
return total_size | |
except (ValueError, IndexError): | |
return 0 | |
return 0 | |
def _parse_single(self, data: bytes) -> Optional[RESPObject]: | |
""" | |
Parse a single complete RESP object. | |
""" | |
if not data: | |
return None | |
try: | |
type_byte = data[0:1].decode() | |
resp_type = RESPObjectType(type_byte) | |
is_not_end_with_crlf = not data.endswith(b"\r\n") | |
lines = data.split(b"\r\n") | |
if resp_type == RESPObjectType.SIMPLE_STRING: | |
return RESPSimpleString( | |
value=lines[0][1:].decode(), bytes_length=len(data) | |
) | |
elif resp_type == RESPObjectType.SIMPLE_ERROR: | |
return RESPError(value=lines[0][1:].decode(), bytes_length=len(data)) | |
elif resp_type == RESPObjectType.INTEGER: | |
return RESPInteger(value=int(lines[0][1:]), bytes_length=len(data)) | |
elif resp_type == RESPObjectType.BULK_STRING: | |
length = int(lines[0][1:]) | |
if length == -1: | |
return RESPBulkString(value=None) | |
return ( | |
RESPBulkBytes(value=lines[1], bytes_length=len(data)) | |
if is_not_end_with_crlf | |
else RESPBulkString(value=lines[1].decode(), bytes_length=len(data)) | |
) | |
elif resp_type == RESPObjectType.ARRAY: | |
length = int(lines[0][1:]) | |
if length == 0: | |
return RESPArray(value=[], bytes_length=len(data)) | |
elements = [] | |
current_data = data[data.find(b"\r\n") + 2 :] | |
for _ in range(length): | |
element_size = self._get_object_size(current_data) | |
if element_size <= 0: | |
break | |
element = self._parse_single(current_data[:element_size]) | |
if element: | |
elements.append(element) | |
current_data = current_data[element_size:] | |
return RESPArray(value=elements, bytes_length=len(data)) | |
except (ValueError, IndexError) as e: | |
print(f"Error parsing RESP data: {e}") | |
return None | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment