Skip to content

Instantly share code, notes, and snippets.

View vndee's full-sized avatar
🎯
Focusing

Duy Huynh vndee

🎯
Focusing
View GitHub Profile
from redis_data_structures import HashMap
from datetime import datetime
from pydantic import BaseModel
class UserProfile(BaseModel):
name: str
last_active: datetime
preferences: dict[str, Any]
hash_map = HashMap()
from redis_data_structures import RingBuffer, BloomFilter
# Ring Buffer for metrics collection
metrics = RingBuffer(capacity=1000)
metrics.push("system.metrics", {
"cpu_usage": 75.5,
"memory_used": 1024 * 1024 * 1024,
"timestamp": datetime.now()
})
# Traditional local structure - simple but not distributed
local_queue = []
local_queue.append(task) # What happens if our process crashes?
# Raw Redis approach - distributed but verbose
redis_conn.lpush("queue", json.dumps(task)) # How do we handle types?
task = json.loads(redis_conn.lpop("queue")) # What about error handling?
# Redis Data Structures - best of both worlds
from redis_data_structures import Queue
def _read_length(self, f: BinaryIO) -> int:
"""Read a length-encoded integer"""
first_byte = f.read(1)[0]
# Check the first two bits (00, 01, 10, 11)
bits = first_byte >> 6
if bits == 0: # 00xxxxxx
return first_byte & 0x3F
elif bits == 1: # 01xxxxxx
class RDBOperationType(IntEnum):
EOF = 0xFF # End of file marker
SELECTDB = 0xFE # Switch to new database
EXPIRETIME = 0xFD # Key with seconds-precision expire
EXPIRETIME_MS = 0xFC # Key with milliseconds-precision expire
RESIZEDB = 0xFB # Database size metadata
AUX = 0xFA # Auxiliary metadata
class RDBParser:
def __init__(self, file_path: str):
self.file_path = file_path
self.data: Dict[int, Dict[str, Any]] = {} # database -> key -> value
self.aux_fields: Dict[str, str] = {} # Store auxiliary fields
self.current_db = 0
def parse(self) -> Dict[int, Dict[str, Any]]:
"""Parse the RDB file and return the data structure"""
with open(self.file_path, "rb") as f:
async def handle_command(self, reader, writer, data: RESPObject):
command = data.value[0].value.lower()
match command:
case RedisCommand.SET:
key = data.value[1].value
value = data.value[2].value
response = self.__data_store.set(key, value)
await self.__send_data(
writer,
async def start(self):
server = await asyncio.start_server(
self.handle_client,
self.host,
self.port
)
async with server:
await server.serve_forever()
def _parse_bulk_string(self) -> RESPBulkString:
# Find the length marker
pos = self.buffer.find(b'\r\n')
if pos == -1:
raise IncompleteResponseError()
# Parse length (excluding the '$' prefix)
length = int(self.buffer[1:pos])
# Handle null strings
@vndee
vndee / resp_002.py
Last active December 4, 2024 15:22
class RESPParser:
def __init__(
self, protocol_version: RESPProtocolVersion = RESPProtocolVersion.RESP2
):
self.protocol_version = protocol_version
def parse(self, data: bytes) -> List[RESPObject]:
"""
Parse RESP data into a list of RESPObjects.
Each complete command will be one RESPObject in the list.