Skip to content

Instantly share code, notes, and snippets.

@matanper
Created January 30, 2025 22:01
Show Gist options
  • Save matanper/c499694cccfe3d18004327dcd171f65c to your computer and use it in GitHub Desktop.
Save matanper/c499694cccfe3d18004327dcd171f65c to your computer and use it in GitHub Desktop.
Prefixed Valkey/Redis client
import valkey
class PrefixedValkey(valkey.Valkey): # Use `ValkeyCluster` if needed
def __init__(self, *args, key_prefix: str, **kwargs):
super().__init__(*args, **kwargs)
self.key_prefix = key_prefix
def _prefix_key(self, key):
"""Prefix Redis keys with the worker-specific namespace."""
if isinstance(key, bytes): # Handle binary keys
return f"{self.key_prefix}:".encode() + key
elif isinstance(key, str):
return f"{self.key_prefix}:{key}"
return key # Return unchanged if it's not a key
def execute_command(self, command, *args, **options):
"""Intercept all Redis commands and apply prefix to keys dynamically."""
command_upper = command.upper()
# Retrieve Redis command argument specification
command_info = super().execute_command("COMMAND", "INFO", command_upper)[command.lower()]
if 'pubsub' in command_info['flags'] or 'movablekeys' in command_info['flags'] or "subcommands" in command:
raise NotImplemented()
# Taken from valkey._parsers.commands.CommandsParser
last_key_pos = command_info["last_key_pos"]
if last_key_pos < 0:
last_key_pos = len(args) + 1 - abs(last_key_pos)
keys_pos = list(
range(command_info["first_key_pos"], last_key_pos + 1, command_info["step_count"])
)
new_args = [self._prefix_key(arg) if pos + 1 in keys_pos else arg
for pos, arg in enumerate(args)]
return super().execute_command(command, *new_args, **options)
valkey = PrefixedValkey(key_prefix='prefix')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment