Created
January 30, 2025 22:01
-
-
Save matanper/c499694cccfe3d18004327dcd171f65c to your computer and use it in GitHub Desktop.
Prefixed Valkey/Redis client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import valkey | |
class PrefixedValkey(valkey.Valkey): # Use `ValkeyCluster` if needed | |
def __init__(self, *args, key_prefix: str, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.key_prefix = key_prefix | |
def _prefix_key(self, key): | |
"""Prefix Redis keys with the worker-specific namespace.""" | |
if isinstance(key, bytes): # Handle binary keys | |
return f"{self.key_prefix}:".encode() + key | |
elif isinstance(key, str): | |
return f"{self.key_prefix}:{key}" | |
return key # Return unchanged if it's not a key | |
def execute_command(self, command, *args, **options): | |
"""Intercept all Redis commands and apply prefix to keys dynamically.""" | |
command_upper = command.upper() | |
# Retrieve Redis command argument specification | |
command_info = super().execute_command("COMMAND", "INFO", command_upper)[command.lower()] | |
if 'pubsub' in command_info['flags'] or 'movablekeys' in command_info['flags'] or "subcommands" in command: | |
raise NotImplemented() | |
# Taken from valkey._parsers.commands.CommandsParser | |
last_key_pos = command_info["last_key_pos"] | |
if last_key_pos < 0: | |
last_key_pos = len(args) + 1 - abs(last_key_pos) | |
keys_pos = list( | |
range(command_info["first_key_pos"], last_key_pos + 1, command_info["step_count"]) | |
) | |
new_args = [self._prefix_key(arg) if pos + 1 in keys_pos else arg | |
for pos, arg in enumerate(args)] | |
return super().execute_command(command, *new_args, **options) | |
valkey = PrefixedValkey(key_prefix='prefix') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment