Skip to content

Instantly share code, notes, and snippets.

@72squared
Created October 27, 2014 13:32
Show Gist options
  • Save 72squared/9b6c60db39043f295986 to your computer and use it in GitHub Desktop.
Save 72squared/9b6c60db39043f295986 to your computer and use it in GitHub Desktop.
git diff rediscluster/client.py
diff --git a/rediscluster/client.py b/rediscluster/client.py
index 7405dad..aeee4a8 100644
--- a/rediscluster/client.py
+++ b/rediscluster/client.py
@@ -60,6 +60,28 @@ class RedisCluster(StrictRedis):
"""
RedisClusterRequestTTL = 16
+
+ # multi-key commands that prevent parallel execution.
+ # have to fall back to serial execution.
+ _rewritten_commands = [
+ 'mget',
+ 'mset',
+ 'msetnx',
+ # 'rename',
+ # 'renamenx',
+ # 'brpoplpush',
+ # 'rpoplpush',
+ # 'sort',
+ # 'sdiff',
+ # 'sdiffstore',
+ # 'sinter',
+ # 'sinterstore',
+ # 'smove',
+ # 'sunion',
+ # 'sunionstore',
+ # 'pfmerge',
+ ]
+
NODES_CALLBACKS = dict_merge(
string_keys_to_dict([
"CLIENT SETNAME", "SENTINEL GET-MASTER-ADDR-BY-NAME", 'SENTINEL MASTER', 'SENTINEL MASTERS',
@@ -238,6 +260,11 @@ class RedisCluster(StrictRedis):
"""
Send a command to a node in the cluster
"""
+ command = args[0].lower()
+ if command in self._rewritten_commands:
+ cmd = getattr(self, '_rewrite_'+ command)
+ return cmd(*args[1:], **kwargs)
+
if self.refresh_table_asap:
self.initialize_slots_cache()
@@ -317,7 +344,13 @@ class RedisCluster(StrictRedis):
for item in data:
yield item
- def mget(self, keys, *args):
+
+ @staticmethod
+ def _get_pairs(l):
+ for i in xrange(0, len(l), 2):
+ yield l[i:i+2]
+
+ def _rewrite_mget(self, *keys):
"""
Returns a list of values ordered identically to ``keys``
@@ -325,24 +358,20 @@ class RedisCluster(StrictRedis):
This will go alot slower than a normal mget call in StrictRedis.
This method is no longer atomic.
"""
- return [self.get(arg) for arg in list_or_args(keys, args)]
+ return [self.get(k) for k in keys]
- def mset(self, *args, **kwargs):
+ def _rewrite_mset(self, *args):
"""
Sets key/values based on a mapping. Mapping can be supplied as a single
dictionary argument or as kwargs.
Cluster impl: Itterate over all items and do SET on each (k,v) pair
"""
- if args:
- if len(args) != 1 or not isinstance(args[0], dict):
- raise RedisError('MSET requires **kwargs or a single dict arg')
- kwargs.update(args[0])
- for pair in iteritems(kwargs):
+ for pair in self._get_pairs(args):
self.set(pair[0], pair[1])
return True
- def msetnx(self, *args, **kwargs):
+ def _rewrite_msetnx(self, *args):
"""
Sets key/values based on a mapping if none of the keys are already set.
Mapping can be supplied as a single dictionary argument or as kwargs.
@@ -351,17 +380,12 @@ class RedisCluster(StrictRedis):
Clutser impl: Itterate over all items and do GET to determine if all keys do not exists.
If true then call mset() on all keys.
"""
- if args:
- if len(args) != 1 or not isinstance(args[0], dict):
- raise RedisError('MSETNX requires **kwargs or a single dict arg')
- kwargs.update(args[0])
-
- # Itterate over all items and fail fast if one value is True.
- for k, v in kwargs.items():
+ keys = [pair[0] for pair in self._get_pairs(args)]
+ for k in keys:
if self.get(k):
return False
- return self.mset(**kwargs)
+ return self._rewrite_mset(*args)
def rename(self, src, dst):
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment