@@ -397,7 +400,8 @@ class StrictRedis(object): charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, - ssl_cert_reqs=None, ssl_ca_certs=None): + ssl_cert_reqs=None, ssl_ca_certs=None, + max_connections=None): if not connection_pool: if charset is not None: warnings.warn(DeprecationWarning( @@ -415,7 +419,8 @@ class StrictRedis(object): 'encoding': encoding, 'encoding_errors': encoding_errors, 'decode_responses': decode_responses, - 'retry_on_timeout': retry_on_timeout + 'retry_on_timeout': retry_on_timeout, + 'max_connections': max_connections } # based on input, setup appropriate connection args if unix_socket_path is not None: @@ -476,6 +481,7 @@ class StrictRedis(object): """ shard_hint = kwargs.pop('shard_hint', None) value_from_callable = kwargs.pop('value_from_callable', False) + watch_delay = kwargs.pop('watch_delay', None) with self.pipeline(True, shard_hint) as pipe: while 1: try: @@ -485,6 +491,8 @@ class StrictRedis(object): exec_value = pipe.execute() return func_value if value_from_callable else exec_value except WatchError: + if watch_delay is not None and watch_delay > 0: + time.sleep(watch_delay) continue def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None, @@ -1799,12 +1832,12 @@ class StrictRedis(object): "Adds the specified elements to the specified HyperLogLog." return self.execute_command('PFADD', name, *values) - def pfcount(self, name): + def pfcount(self, *sources): """ Return the approximated cardinality of - the set observed by the HyperLogLog at key. + the set observed by the HyperLogLog at key(s). """ - return self.execute_command('PFCOUNT', name) + return self.execute_command('PFCOUNT', *sources) def pfmerge(self, dest, *sources): "Merge N different HyperLogLogs into a single one." @@ -2142,10 +2175,10 @@ class PubSub(object): # previously listening to return command(*args) - def parse_response(self, block=True): + def parse_response(self, block=True, timeout=0): "Parse the response from a publish/subscribe command" connection = self.connection - if not block and not connection.can_read(): + if not block and not connection.can_read(timeout=timeout): return None return self._execute(connection, connection.read_response) --- a/redis/connection.py +++ b/redis/connection.py @@ -79,7 +80,9 @@ class Token(object): class BaseParser(object): EXCEPTION_CLASSES = { - 'ERR': ResponseError, + 'ERR': { + 'max number of clients reached': ConnectionError + }, 'EXECABORT': ExecAbortError, 'LOADING': BusyLoadingError, 'NOSCRIPT': NoScriptError, @@ -91,7 +94,10 @@ class BaseParser(object): error_code = response.split(' ')[0] if error_code in self.EXCEPTION_CLASSES: response = response[len(error_code) + 1:] - return self.EXCEPTION_CLASSES[error_code](response) + exception_class = self.EXCEPTION_CLASSES[error_code] + if isinstance(exception_class, dict): + exception_class = exception_class.get(response, ResponseError) + return exception_class(response) return ResponseError(response) @@ -542,11 +548,12 @@ class Connection(object): e = sys.exc_info()[1] self.disconnect() if len(e.args) == 1: - _errno, errmsg = 'UNKNOWN', e.args[0] + errno, errmsg = 'UNKNOWN', e.args[0] else: - _errno, errmsg = e.args + errno = e.args[0] + errmsg = e.args[1] raise ConnectionError("Error %s while writing to socket. %s." % - (_errno, errmsg)) + (errno, errmsg)) except: self.disconnect() raise @@ -555,13 +562,14 @@ class Connection(object): "Pack and send a command to the Redis server" self.send_packed_command(self.pack_command(*args)) - def can_read(self): + def can_read(self, timeout=0): "Poll the socket to see if there's data that can be read." sock = self._sock if not sock: self.connect() sock = self._sock - return bool(select([sock], [], [], 0)[0]) or self._parser.can_read() + return self._parser.can_read() or \ + bool(select([sock], [], [], timeout)[0]) def read_response(self): "Read the response from a previously sent command" diff --git a/redis/sentinel.py b/redis/sentinel.py index 2f30062..3fb89ce 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -129,6 +129,8 @@ class SentinelConnectionPool(ConnectionPool): self.disconnect() self.reset() self.__init__(self.service_name, self.sentinel_manager, + is_master=self.is_master, + check_connection=self.check_connection, connection_class=self.connection_class, max_connections=self.max_connections, **self.connection_kwargs)