Skip to content

Instantly share code, notes, and snippets.

@vndee
Created December 3, 2024 13:20
Show Gist options
  • Save vndee/e1b4a72f3753f85a19487e86c39728a5 to your computer and use it in GitHub Desktop.
Save vndee/e1b4a72f3753f85a19487e86c39728a5 to your computer and use it in GitHub Desktop.
def _parse_bulk_string(self) -> RESPBulkString:
# Find the length marker
pos = self.buffer.find(b'\r\n')
if pos == -1:
raise IncompleteResponseError()
# Parse length (excluding the '$' prefix)
length = int(self.buffer[1:pos])
# Handle null strings
if length == -1:
self.buffer = self.buffer[pos + 2:]
return RESPBulkString(value=None)
# Calculate string boundaries
string_start = pos + 2
string_end = string_start + length
# Ensure we have the complete string
if len(self.buffer) < string_end + 2:
raise IncompleteResponseError()
value = self.buffer[string_start:string_end].decode('utf-8')
self.buffer = self.buffer[string_end + 2:]
return RESPBulkString(value=value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment