Created
October 8, 2009 03:37
-
-
Save Suhail/204700 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
client module for memcached (memory cache daemon) | |
Overview | |
======== | |
See U{the MemCached homepage<http://www.danga.com/memcached>} for more about memcached. | |
Usage summary | |
============= | |
This should give you a feel for how this module operates:: | |
import memcache | |
mc = memcache.Client(['127.0.0.1:11211'], debug=0) | |
mc.set("some_key", "Some value") | |
value = mc.get("some_key") | |
mc.set("another_key", 3) | |
mc.delete("another_key") | |
mc.set("key", "1") # note that the key used for incr/decr must be a string. | |
mc.incr("key") | |
mc.decr("key") | |
The standard way to use memcache with a database is like this:: | |
key = derive_key(obj) | |
obj = mc.get(key) | |
if not obj: | |
obj = backend_api.get(...) | |
mc.set(obj) | |
# we now have obj, and future passes through this code | |
# will use the object from the cache. | |
Detailed Documentation | |
====================== | |
More detailed documentation is available in the L{Client} class. | |
""" | |
import sys | |
import socket | |
import time | |
import os | |
import re | |
import types | |
try: | |
import cPickle as pickle | |
except ImportError: | |
import pickle | |
try: | |
from zlib import compress, decompress | |
_supports_compress = True | |
except ImportError: | |
_supports_compress = False | |
# quickly define a decompress just in case we recv compressed data. | |
def decompress(val): | |
raise _Error("received compressed data but I don't support compession (import error)") | |
try: | |
from cStringIO import StringIO | |
except ImportError: | |
from StringIO import StringIO | |
from binascii import crc32 # zlib version is not cross-platform | |
serverHashFunction = crc32 | |
__author__ = "Evan Martin <[email protected]>" | |
__version__ = "1.44" | |
__copyright__ = "Copyright (C) 2003 Danga Interactive" | |
__license__ = "Python" | |
SERVER_MAX_KEY_LENGTH = 250 | |
# Storing values larger than 1MB requires recompiling memcached. If you do, | |
# this value can be changed by doing "memcache.SERVER_MAX_VALUE_LENGTH = N" | |
# after importing this module. | |
SERVER_MAX_VALUE_LENGTH = 1024*1024 | |
class _Error(Exception): | |
pass | |
try: | |
# Only exists in Python 2.4+ | |
from threading import local | |
except ImportError: | |
# TODO: add the pure-python local implementation | |
class local(object): | |
pass | |
class Client(local): | |
""" | |
Object representing a pool of memcache servers. | |
See L{memcache} for an overview. | |
In all cases where a key is used, the key can be either: | |
1. A simple hashable type (string, integer, etc.). | |
2. A tuple of C{(hashvalue, key)}. This is useful if you want to avoid | |
making this module calculate a hash value. You may prefer, for | |
example, to keep all of a given user's objects on the same memcache | |
server, so you could use the user's unique id as the hash value. | |
@group Setup: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog | |
@group Insertion: set, add, replace, set_multi | |
@group Retrieval: get, get_multi | |
@group Integers: incr, decr | |
@group Removal: delete, delete_multi | |
@sort: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog,\ | |
set, set_multi, add, replace, get, get_multi, incr, decr, delete, delete_multi | |
""" | |
_FLAG_PICKLE = 1<<0 | |
_FLAG_INTEGER = 1<<1 | |
_FLAG_LONG = 1<<2 | |
_FLAG_COMPRESSED = 1<<3 | |
_SERVER_RETRIES = 10 # how many times to try finding a free server. | |
# exceptions for Client | |
class MemcachedKeyError(Exception): | |
pass | |
class MemcachedKeyLengthError(MemcachedKeyError): | |
pass | |
class MemcachedKeyCharacterError(MemcachedKeyError): | |
pass | |
class MemcachedKeyNoneError(MemcachedKeyError): | |
pass | |
class MemcachedKeyTypeError(MemcachedKeyError): | |
pass | |
class MemcachedStringEncodingError(Exception): | |
pass | |
def __init__(self, servers, debug=0, pickleProtocol=0, | |
pickler=pickle.Pickler, unpickler=pickle.Unpickler, | |
pload=None, pid=None): | |
""" | |
Create a new Client object with the given list of servers. | |
@param servers: C{servers} is passed to L{set_servers}. | |
@param debug: whether to display error messages when a server can't be | |
contacted. | |
@param pickleProtocol: number to mandate protocol used by (c)Pickle. | |
@param pickler: optional override of default Pickler to allow subclassing. | |
@param unpickler: optional override of default Unpickler to allow subclassing. | |
@param pload: optional persistent_load function to call on pickle loading. | |
Useful for cPickle since subclassing isn't allowed. | |
@param pid: optional persistent_id function to call on pickle storing. | |
Useful for cPickle since subclassing isn't allowed. | |
""" | |
local.__init__(self) | |
self.set_servers(servers) | |
self.debug = debug | |
self.stats = {} | |
# Allow users to modify pickling/unpickling behavior | |
self.pickleProtocol = pickleProtocol | |
self.pickler = pickler | |
self.unpickler = unpickler | |
self.persistent_load = pload | |
self.persistent_id = pid | |
# figure out the pickler style | |
file = StringIO() | |
try: | |
pickler = self.pickler(file, protocol = self.pickleProtocol) | |
self.picklerIsKeyword = True | |
except TypeError: | |
self.picklerIsKeyword = False | |
def set_servers(self, servers): | |
""" | |
Set the pool of servers used by this client. | |
@param servers: an array of servers. | |
Servers can be passed in two forms: | |
1. Strings of the form C{"host:port"}, which implies a default weight of 1. | |
2. Tuples of the form C{("host:port", weight)}, where C{weight} is | |
an integer weight value. | |
""" | |
self.servers = [_Host(s, self.debuglog) for s in servers] | |
self._init_buckets() | |
def get_stats(self): | |
'''Get statistics from each of the servers. | |
@return: A list of tuples ( server_identifier, stats_dictionary ). | |
The dictionary contains a number of name/value pairs specifying | |
the name of the status field and the string value associated with | |
it. The values are not converted from strings. | |
''' | |
data = [] | |
for s in self.servers: | |
if not s.connect(): continue | |
if s.family == socket.AF_INET: | |
name = '%s:%s (%s)' % ( s.ip, s.port, s.weight ) | |
else: | |
name = 'unix:%s (%s)' % ( s.address, s.weight ) | |
s.send_cmd('stats') | |
serverData = {} | |
data.append(( name, serverData )) | |
readline = s.readline | |
while 1: | |
line = readline() | |
if not line or line.strip() == 'END': break | |
stats = line.split(' ', 2) | |
serverData[stats[1]] = stats[2] | |
return(data) | |
def get_slabs(self): | |
data = [] | |
for s in self.servers: | |
if not s.connect(): continue | |
if s.family == socket.AF_INET: | |
name = '%s:%s (%s)' % ( s.ip, s.port, s.weight ) | |
else: | |
name = 'unix:%s (%s)' % ( s.address, s.weight ) | |
serverData = {} | |
data.append(( name, serverData )) | |
s.send_cmd('stats items') | |
readline = s.readline | |
while 1: | |
line = readline() | |
if not line or line.strip() == 'END': break | |
item = line.split(' ', 2) | |
#0 = STAT, 1 = ITEM, 2 = Value | |
slab = item[1].split(':', 2) | |
#0 = items, 1 = Slab #, 2 = Name | |
if not serverData.has_key(slab[1]): | |
serverData[slab[1]] = {} | |
serverData[slab[1]][slab[2]] = item[2] | |
return data | |
def flush_all(self): | |
'Expire all data currently in the memcache servers.' | |
for s in self.servers: | |
if not s.connect(): continue | |
s.send_cmd('flush_all') | |
s.expect("OK") | |
def debuglog(self, str): | |
if self.debug: | |
sys.stderr.write("MemCached: %s\n" % str) | |
def _statlog(self, func): | |
if not self.stats.has_key(func): | |
self.stats[func] = 1 | |
else: | |
self.stats[func] += 1 | |
def forget_dead_hosts(self): | |
""" | |
Reset every host in the pool to an "alive" state. | |
""" | |
for s in self.servers: | |
s.deaduntil = 0 | |
def _init_buckets(self): | |
self.buckets = [] | |
for server in self.servers: | |
for i in range(server.weight): | |
self.buckets.append(server) | |
def _get_server(self, key): | |
if type(key) == types.TupleType: | |
serverhash, key = key | |
else: | |
serverhash = serverHashFunction(key) | |
for i in range(Client._SERVER_RETRIES): | |
server = self.buckets[serverhash % len(self.buckets)] | |
if server.connect(): | |
#print "(using server %s)" % server, | |
return server, key | |
serverhash = serverHashFunction(str(serverhash) + str(i)) | |
return None, None | |
def disconnect_all(self): | |
for s in self.servers: | |
s.close_socket() | |
def delete_multi(self, keys, time=0, key_prefix=''): | |
''' | |
Delete multiple keys in the memcache doing just one query. | |
>>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'}) | |
>>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1', 'key2' : 'val2'} | |
1 | |
>>> mc.delete_multi(['key1', 'key2']) | |
1 | |
>>> mc.get_multi(['key1', 'key2']) == {} | |
1 | |
This method is recommended over iterated regular L{delete}s as it reduces total latency, since | |
your app doesn't have to wait for each round-trip of L{delete} before sending | |
the next one. | |
@param keys: An iterable of keys to clear | |
@param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay. | |
@param key_prefix: Optional string to prepend to each key when sending to memcache. | |
See docs for L{get_multi} and L{set_multi}. | |
@return: 1 if no failure in communication with any memcacheds. | |
@rtype: int | |
''' | |
self._statlog('delete_multi') | |
server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(keys, key_prefix) | |
# send out all requests on each server before reading anything | |
dead_servers = [] | |
rc = 1 | |
for server in server_keys.iterkeys(): | |
bigcmd = [] | |
write = bigcmd.append | |
if time != None: | |
for key in server_keys[server]: # These are mangled keys | |
write("delete %s %d\r\n" % (key, time)) | |
else: | |
for key in server_keys[server]: # These are mangled keys | |
write("delete %s\r\n" % key) | |
try: | |
server.send_cmds(''.join(bigcmd)) | |
except socket.error, msg: | |
rc = 0 | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
dead_servers.append(server) | |
# if any servers died on the way, don't expect them to respond. | |
for server in dead_servers: | |
del server_keys[server] | |
notstored = [] # original keys. | |
for server, keys in server_keys.iteritems(): | |
try: | |
for key in keys: | |
server.expect("DELETED") | |
except socket.error, msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
rc = 0 | |
return rc | |
def delete(self, key, time=0): | |
'''Deletes a key from the memcache. | |
@return: Nonzero on success. | |
@param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay. | |
@rtype: int | |
''' | |
check_key(key) | |
server, key = self._get_server(key) | |
if not server: | |
return 0 | |
self._statlog('delete') | |
if time != None: | |
cmd = "delete %s %d" % (key, time) | |
else: | |
cmd = "delete %s" % key | |
try: | |
server.send_cmd(cmd) | |
server.expect("DELETED") | |
except socket.error, msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
return 0 | |
return 1 | |
def incr(self, key, delta=1): | |
""" | |
Sends a command to the server to atomically increment the value for C{key} by | |
C{delta}, or by 1 if C{delta} is unspecified. Returns None if C{key} doesn't | |
exist on server, otherwise it returns the new value after incrementing. | |
Note that the value for C{key} must already exist in the memcache, and it | |
must be the string representation of an integer. | |
>>> mc.set("counter", "20") # returns 1, indicating success | |
1 | |
>>> mc.incr("counter") | |
21 | |
>>> mc.incr("counter") | |
22 | |
Overflow on server is not checked. Be aware of values approaching | |
2**32. See L{decr}. | |
@param delta: Integer amount to increment by (should be zero or greater). | |
@return: New value after incrementing. | |
@rtype: int | |
""" | |
return self._incrdecr("incr", key, delta) | |
def decr(self, key, delta=1): | |
""" | |
Like L{incr}, but decrements. Unlike L{incr}, underflow is checked and | |
new values are capped at 0. If server value is 1, a decrement of 2 | |
returns 0, not -1. | |
@param delta: Integer amount to decrement by (should be zero or greater). | |
@return: New value after decrementing. | |
@rtype: int | |
""" | |
return self._incrdecr("decr", key, delta) | |
def _incrdecr(self, cmd, key, delta): | |
check_key(key) | |
server, key = self._get_server(key) | |
if not server: | |
return 0 | |
self._statlog(cmd) | |
cmd = "%s %s %d" % (cmd, key, delta) | |
try: | |
server.send_cmd(cmd) | |
line = server.readline() | |
return int(line) | |
except socket.error, msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
return None | |
def add(self, key, val, time = 0, min_compress_len = 0): | |
''' | |
Add new key with value. | |
Like L{set}, but only stores in memcache if the key doesn't already exist. | |
@return: Nonzero on success. | |
@rtype: int | |
''' | |
return self._set("add", key, val, time, min_compress_len) | |
def append(self, key, val, time=0, min_compress_len=0): | |
'''Append the value to the end of the existing key's value. | |
Only stores in memcache if key already exists. | |
Also see L{prepend}. | |
@return: Nonzero on success. | |
@rtype: int | |
''' | |
return self._set("append", key, val, time, min_compress_len) | |
def prepend(self, key, val, time=0, min_compress_len=0): | |
'''Prepend the value to the beginning of the existing key's value. | |
Only stores in memcache if key already exists. | |
Also see L{append}. | |
@return: Nonzero on success. | |
@rtype: int | |
''' | |
return self._set("prepend", key, val, time, min_compress_len) | |
def replace(self, key, val, time=0, min_compress_len=0): | |
'''Replace existing key with value. | |
Like L{set}, but only stores in memcache if the key already exists. | |
The opposite of L{add}. | |
@return: Nonzero on success. | |
@rtype: int | |
''' | |
return self._set("replace", key, val, time, min_compress_len) | |
def set(self, key, val, time=0, min_compress_len=0): | |
'''Unconditionally sets a key to a given value in the memcache. | |
The C{key} can optionally be an tuple, with the first element | |
being the server hash value and the second being the key. | |
If you want to avoid making this module calculate a hash value. | |
You may prefer, for example, to keep all of a given user's objects | |
on the same memcache server, so you could use the user's unique | |
id as the hash value. | |
@return: Nonzero on success. | |
@rtype: int | |
@param time: Tells memcached the time which this value should expire, either | |
as a delta number of seconds, or an absolute unix time-since-the-epoch | |
value. See the memcached protocol docs section "Storage Commands" | |
for more info on <exptime>. We default to 0 == cache forever. | |
@param min_compress_len: The threshold length to kick in auto-compression | |
of the value using the zlib.compress() routine. If the value being cached is | |
a string, then the length of the string is measured, else if the value is an | |
object, then the length of the pickle result is measured. If the resulting | |
attempt at compression yeilds a larger string than the input, then it is | |
discarded. For backwards compatability, this parameter defaults to 0, | |
indicating don't ever try to compress. | |
''' | |
return self._set("set", key, val, time, min_compress_len) | |
def _map_and_prefix_keys(self, key_iterable, key_prefix): | |
"""Compute the mapping of server (_Host instance) -> list of keys to stuff onto that server, as well as the mapping of | |
prefixed key -> original key. | |
""" | |
# Check it just once ... | |
key_extra_len=len(key_prefix) | |
if key_prefix: | |
check_key(key_prefix) | |
# server (_Host) -> list of unprefixed server keys in mapping | |
server_keys = {} | |
prefixed_to_orig_key = {} | |
# build up a list for each server of all the keys we want. | |
for orig_key in key_iterable: | |
if type(orig_key) is types.TupleType: | |
# Tuple of hashvalue, key ala _get_server(). Caller is essentially telling us what server to stuff this on. | |
# Ensure call to _get_server gets a Tuple as well. | |
str_orig_key = str(orig_key[1]) | |
server, key = self._get_server((orig_key[0], key_prefix + str_orig_key)) # Gotta pre-mangle key before hashing to a server. Returns the mangled key. | |
else: | |
str_orig_key = str(orig_key) # set_multi supports int / long keys. | |
server, key = self._get_server(key_prefix + str_orig_key) | |
# Now check to make sure key length is proper ... | |
check_key(str_orig_key, key_extra_len=key_extra_len) | |
if not server: | |
continue | |
if not server_keys.has_key(server): | |
server_keys[server] = [] | |
server_keys[server].append(key) | |
prefixed_to_orig_key[key] = orig_key | |
return (server_keys, prefixed_to_orig_key) | |
def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0): | |
''' | |
Sets multiple keys in the memcache doing just one query. | |
>>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'}) | |
>>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1', 'key2' : 'val2'} | |
1 | |
This method is recommended over regular L{set} as it lowers the number of | |
total packets flying around your network, reducing total latency, since | |
your app doesn't have to wait for each round-trip of L{set} before sending | |
the next one. | |
@param mapping: A dict of key/value pairs to set. | |
@param time: Tells memcached the time which this value should expire, either | |
as a delta number of seconds, or an absolute unix time-since-the-epoch | |
value. See the memcached protocol docs section "Storage Commands" | |
for more info on <exptime>. We default to 0 == cache forever. | |
@param key_prefix: Optional string to prepend to each key when sending to memcache. Allows you to efficiently stuff these keys into a pseudo-namespace in memcache: | |
>>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'}, key_prefix='subspace_') | |
>>> len(notset_keys) == 0 | |
True | |
>>> mc.get_multi(['subspace_key1', 'subspace_key2']) == {'subspace_key1' : 'val1', 'subspace_key2' : 'val2'} | |
True | |
Causes key 'subspace_key1' and 'subspace_key2' to be set. Useful in conjunction with a higher-level layer which applies namespaces to data in memcache. | |
In this case, the return result would be the list of notset original keys, prefix not applied. | |
@param min_compress_len: The threshold length to kick in auto-compression | |
of the value using the zlib.compress() routine. If the value being cached is | |
a string, then the length of the string is measured, else if the value is an | |
object, then the length of the pickle result is measured. If the resulting | |
attempt at compression yeilds a larger string than the input, then it is | |
discarded. For backwards compatability, this parameter defaults to 0, | |
indicating don't ever try to compress. | |
@return: List of keys which failed to be stored [ memcache out of memory, etc. ]. | |
@rtype: list | |
''' | |
self._statlog('set_multi') | |
server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(mapping.iterkeys(), key_prefix) | |
# send out all requests on each server before reading anything | |
dead_servers = [] | |
for server in server_keys.iterkeys(): | |
bigcmd = [] | |
write = bigcmd.append | |
try: | |
for key in server_keys[server]: # These are mangled keys | |
store_info = self._val_to_store_info(mapping[prefixed_to_orig_key[key]], min_compress_len) | |
write("set %s %d %d %d\r\n%s\r\n" % (key, store_info[0], time, store_info[1], store_info[2])) | |
server.send_cmds(''.join(bigcmd)) | |
except socket.error, msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
dead_servers.append(server) | |
# if any servers died on the way, don't expect them to respond. | |
for server in dead_servers: | |
del server_keys[server] | |
# short-circuit if there are no servers, just return all keys | |
if not server_keys: return(mapping.keys()) | |
notstored = [] # original keys. | |
for server, keys in server_keys.iteritems(): | |
try: | |
for key in keys: | |
line = server.readline() | |
if line == 'STORED': | |
continue | |
else: | |
notstored.append(prefixed_to_orig_key[key]) #un-mangle. | |
except (_Error, socket.error), msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
return notstored | |
def _val_to_store_info(self, val, min_compress_len): | |
""" | |
Transform val to a storable representation, returning a tuple of the flags, the length of the new value, and the new value itself. | |
""" | |
flags = 0 | |
if isinstance(val, str): | |
pass | |
elif isinstance(val, int): | |
flags |= Client._FLAG_INTEGER | |
val = "%d" % val | |
# force no attempt to compress this silly string. | |
min_compress_len = 0 | |
elif isinstance(val, long): | |
flags |= Client._FLAG_LONG | |
val = "%d" % val | |
# force no attempt to compress this silly string. | |
min_compress_len = 0 | |
else: | |
flags |= Client._FLAG_PICKLE | |
file = StringIO() | |
if self.picklerIsKeyword: | |
pickler = self.pickler(file, protocol = self.pickleProtocol) | |
else: | |
pickler = self.pickler(file, self.pickleProtocol) | |
if self.persistent_id: | |
pickler.persistent_id = self.persistent_id | |
pickler.dump(val) | |
val = file.getvalue() | |
lv = len(val) | |
# We should try to compress if min_compress_len > 0 and we could | |
# import zlib and this string is longer than our min threshold. | |
if min_compress_len and _supports_compress and lv > min_compress_len: | |
comp_val = compress(val) | |
# Only retain the result if the compression result is smaller | |
# than the original. | |
if len(comp_val) < lv: | |
flags |= Client._FLAG_COMPRESSED | |
val = comp_val | |
# silently do not store if value length exceeds maximum | |
if len(val) >= SERVER_MAX_VALUE_LENGTH: return(0) | |
return (flags, len(val), val) | |
def _set(self, cmd, key, val, time, min_compress_len = 0): | |
check_key(key) | |
server, key = self._get_server(key) | |
if not server: | |
return 0 | |
self._statlog(cmd) | |
store_info = self._val_to_store_info(val, min_compress_len) | |
if not store_info: return(0) | |
fullcmd = "%s %s %d %d %d\r\n%s" % (cmd, key, store_info[0], time, store_info[1], store_info[2]) | |
try: | |
server.send_cmd(fullcmd) | |
return(server.expect("STORED") == "STORED") | |
except socket.error, msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
return 0 | |
def get(self, key): | |
'''Retrieves a key from the memcache. | |
@return: The value or None. | |
''' | |
check_key(key) | |
server, key = self._get_server(key) | |
if not server: | |
return None | |
self._statlog('get') | |
try: | |
server.send_cmd("get %s" % key) | |
rkey, flags, rlen, = self._expectvalue(server) | |
if not rkey: | |
return None | |
value = self._recv_value(server, flags, rlen) | |
server.expect("END") | |
except (_Error, socket.error), msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
return None | |
return value | |
def get_multi(self, keys, key_prefix=''): | |
''' | |
Retrieves multiple keys from the memcache doing just one query. | |
>>> success = mc.set("foo", "bar") | |
>>> success = mc.set("baz", 42) | |
>>> mc.get_multi(["foo", "baz", "foobar"]) == {"foo": "bar", "baz": 42} | |
1 | |
>>> mc.set_multi({'k1' : 1, 'k2' : 2}, key_prefix='pfx_') == [] | |
1 | |
This looks up keys 'pfx_k1', 'pfx_k2', ... . Returned dict will just have unprefixed keys 'k1', 'k2'. | |
>>> mc.get_multi(['k1', 'k2', 'nonexist'], key_prefix='pfx_') == {'k1' : 1, 'k2' : 2} | |
1 | |
get_mult [ and L{set_multi} ] can take str()-ables like ints / longs as keys too. Such as your db pri key fields. | |
They're rotored through str() before being passed off to memcache, with or without the use of a key_prefix. | |
In this mode, the key_prefix could be a table name, and the key itself a db primary key number. | |
>>> mc.set_multi({42: 'douglass adams', 46 : 'and 2 just ahead of me'}, key_prefix='numkeys_') == [] | |
1 | |
>>> mc.get_multi([46, 42], key_prefix='numkeys_') == {42: 'douglass adams', 46 : 'and 2 just ahead of me'} | |
1 | |
This method is recommended over regular L{get} as it lowers the number of | |
total packets flying around your network, reducing total latency, since | |
your app doesn't have to wait for each round-trip of L{get} before sending | |
the next one. | |
See also L{set_multi}. | |
@param keys: An array of keys. | |
@param key_prefix: A string to prefix each key when we communicate with memcache. | |
Facilitates pseudo-namespaces within memcache. Returned dictionary keys will not have this prefix. | |
@return: A dictionary of key/value pairs that were available. If key_prefix was provided, the keys in the retured dictionary will not have it present. | |
''' | |
self._statlog('get_multi') | |
server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(keys, key_prefix) | |
# send out all requests on each server before reading anything | |
dead_servers = [] | |
for server in server_keys.iterkeys(): | |
try: | |
server.send_cmd("get %s" % " ".join(server_keys[server])) | |
except socket.error, msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
dead_servers.append(server) | |
# if any servers died on the way, don't expect them to respond. | |
for server in dead_servers: | |
del server_keys[server] | |
retvals = {} | |
for server in server_keys.iterkeys(): | |
try: | |
line = server.readline() | |
while line and line != 'END': | |
rkey, flags, rlen = self._expectvalue(server, line) | |
# Bo Yang reports that this can sometimes be None | |
if rkey is not None: | |
val = self._recv_value(server, flags, rlen) | |
retvals[prefixed_to_orig_key[rkey]] = val # un-prefix returned key. | |
line = server.readline() | |
except (_Error, socket.error), msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
server.mark_dead(msg) | |
return retvals | |
def _expectvalue(self, server, line=None): | |
if not line: | |
line = server.readline() | |
if line[:5] == 'VALUE': | |
resp, rkey, flags, len = line.split() | |
flags = int(flags) | |
rlen = int(len) | |
return (rkey, flags, rlen) | |
else: | |
return (None, None, None) | |
def _recv_value(self, server, flags, rlen): | |
rlen += 2 # include \r\n | |
buf = server.recv(rlen) | |
if len(buf) != rlen: | |
raise _Error("received %d bytes when expecting %d" % (len(buf), rlen)) | |
if len(buf) == rlen: | |
buf = buf[:-2] # strip \r\n | |
if flags & Client._FLAG_COMPRESSED: | |
buf = decompress(buf) | |
if flags == 0 or flags == Client._FLAG_COMPRESSED: | |
# Either a bare string or a compressed string now decompressed... | |
val = buf | |
elif flags & Client._FLAG_INTEGER: | |
val = int(buf) | |
elif flags & Client._FLAG_LONG: | |
val = long(buf) | |
elif flags & Client._FLAG_PICKLE: | |
try: | |
file = StringIO(buf) | |
unpickler = self.unpickler(file) | |
if self.persistent_load: | |
unpickler.persistent_load = self.persistent_load | |
val = unpickler.load() | |
except Exception, e: | |
self.debuglog('Pickle error: %s\n' % e) | |
val = None | |
else: | |
self.debuglog("unknown flags on get: %x\n" % flags) | |
return val | |
class _Host: | |
_DEAD_RETRY = 30 # number of seconds before retrying a dead server. | |
_SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout. | |
def __init__(self, host, debugfunc=None): | |
if isinstance(host, types.TupleType): | |
host, self.weight = host | |
else: | |
self.weight = 1 | |
# parse the connection string | |
m = re.match(r'^(?P<proto>unix):(?P<path>.*)$', host) | |
if not m: | |
m = re.match(r'^(?P<proto>inet):' | |
r'(?P<host>[^:]+)(:(?P<port>[0-9]+))?$', host) | |
if not m: m = re.match(r'^(?P<host>[^:]+):(?P<port>[0-9]+)$', host) | |
if not m: | |
raise ValueError('Unable to parse connection string: "%s"' % host) | |
hostData = m.groupdict() | |
if hostData.get('proto') == 'unix': | |
self.family = socket.AF_UNIX | |
self.address = hostData['path'] | |
else: | |
self.family = socket.AF_INET | |
self.ip = hostData['host'] | |
self.port = int(hostData.get('port', 11211)) | |
self.address = ( self.ip, self.port ) | |
if not debugfunc: | |
debugfunc = lambda x: x | |
self.debuglog = debugfunc | |
self.deaduntil = 0 | |
self.socket = None | |
self.buffer = '' | |
def _check_dead(self): | |
if self.deaduntil and self.deaduntil > time.time(): | |
return 1 | |
self.deaduntil = 0 | |
return 0 | |
def connect(self): | |
if self._get_socket(): | |
return 1 | |
return 0 | |
def mark_dead(self, reason): | |
self.debuglog("MemCache: %s: %s. Marking dead." % (self, reason)) | |
self.deaduntil = time.time() + _Host._DEAD_RETRY | |
self.close_socket() | |
def _get_socket(self): | |
if self._check_dead(): | |
return None | |
if self.socket: | |
return self.socket | |
s = socket.socket(self.family, socket.SOCK_STREAM) | |
if hasattr(s, 'settimeout'): s.settimeout(self._SOCKET_TIMEOUT) | |
try: | |
s.connect(self.address) | |
except socket.timeout, msg: | |
self.mark_dead("connect: %s" % msg) | |
return None | |
except socket.error, msg: | |
if type(msg) is types.TupleType: msg = msg[1] | |
self.mark_dead("connect: %s" % msg[1]) | |
return None | |
self.socket = s | |
self.buffer = '' | |
return s | |
def close_socket(self): | |
if self.socket: | |
self.socket.close() | |
self.socket = None | |
def send_cmd(self, cmd): | |
self.socket.sendall(cmd + '\r\n') | |
def send_cmds(self, cmds): | |
""" cmds already has trailing \r\n's applied """ | |
self.socket.sendall(cmds) | |
def readline(self): | |
buf = self.buffer | |
recv = self.socket.recv | |
while True: | |
index = buf.find('\r\n') | |
if index >= 0: | |
break | |
data = recv(4096) | |
if not data: | |
self.mark_dead('Connection closed while reading from %s' | |
% repr(self)) | |
break | |
buf += data | |
if index >= 0: | |
self.buffer = buf[index+2:] | |
buf = buf[:index] | |
else: | |
self.buffer = '' | |
return buf | |
def expect(self, text): | |
line = self.readline() | |
if line != text: | |
self.debuglog("while expecting '%s', got unexpected response '%s'" % (text, line)) | |
return line | |
def recv(self, rlen): | |
self_socket_recv = self.socket.recv | |
buf = self.buffer | |
while len(buf) < rlen: | |
foo = self_socket_recv(4096) | |
buf += foo | |
if len(foo) == 0: | |
raise _Error, ( 'Read %d bytes, expecting %d, ' | |
'read returned 0 length bytes' % ( len(buf), rlen )) | |
self.buffer = buf[rlen:] | |
return buf[:rlen] | |
def __str__(self): | |
d = '' | |
if self.deaduntil: | |
d = " (dead until %d)" % self.deaduntil | |
if self.family == socket.AF_INET: | |
return "inet:%s:%d%s" % (self.address[0], self.address[1], d) | |
else: | |
return "unix:%s%s" % (self.address, d) | |
def check_key(key, key_extra_len=0): | |
"""Checks sanity of key. Fails if: | |
Key length is > SERVER_MAX_KEY_LENGTH (Raises MemcachedKeyLength). | |
Contains control characters (Raises MemcachedKeyCharacterError). | |
Is not a string (Raises MemcachedStringEncodingError) | |
Is an unicode string (Raises MemcachedStringEncodingError) | |
Is not a string (Raises MemcachedKeyError) | |
Is None (Raises MemcachedKeyError) | |
""" | |
if type(key) == types.TupleType: key = key[1] | |
if not key: | |
raise Client.MemcachedKeyNoneError, ("Key is None") | |
if isinstance(key, unicode): | |
raise Client.MemcachedStringEncodingError, ("Keys must be str()'s, not " | |
"unicode. Convert your unicode strings using " | |
"mystring.encode(charset)!") | |
if not isinstance(key, str): | |
raise Client.MemcachedKeyTypeError, ("Key must be str()'s") | |
if isinstance(key, basestring): | |
if len(key) + key_extra_len > SERVER_MAX_KEY_LENGTH: | |
raise Client.MemcachedKeyLengthError, ("Key length is > %s" | |
% SERVER_MAX_KEY_LENGTH) | |
for char in key: | |
if ord(char) < 32 or ord(char) == 127: | |
raise Client.MemcachedKeyCharacterError, "Control characters not allowed" | |
def _doctest(): | |
import doctest, memcache | |
servers = ["127.0.0.1:11211"] | |
mc = Client(servers, debug=1) | |
globs = {"mc": mc} | |
return doctest.testmod(memcache, globs=globs) | |
if __name__ == "__main__": | |
print "Testing docstrings..." | |
_doctest() | |
print "Running tests:" | |
serverList = [["127.0.0.1:11211"]] | |
if '--do-unix' in sys.argv: | |
serverList.append([os.path.join(os.getcwd(), 'memcached.socket')]) | |
for servers in serverList: | |
mc = Client(servers, debug=1) | |
def to_s(val): | |
if not isinstance(val, types.StringTypes): | |
return "%s (%s)" % (val, type(val)) | |
return "%s" % val | |
def test_setget(key, val): | |
print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)), | |
mc.set(key, val) | |
newval = mc.get(key) | |
if newval == val: | |
print "OK" | |
return 1 | |
else: | |
print "FAIL" | |
return 0 | |
class FooStruct: | |
def __init__(self): | |
self.bar = "baz" | |
def __str__(self): | |
return "A FooStruct" | |
def __eq__(self, other): | |
if isinstance(other, FooStruct): | |
return self.bar == other.bar | |
return 0 | |
test_setget("a_string", "some random string") | |
test_setget("an_integer", 42) | |
if test_setget("long", long(1<<30)): | |
print "Testing delete ...", | |
if mc.delete("long"): | |
print "OK" | |
else: | |
print "FAIL" | |
print "Testing get_multi ...", | |
print mc.get_multi(["a_string", "an_integer"]) | |
print "Testing get(unknown value) ...", | |
print to_s(mc.get("unknown_value")) | |
f = FooStruct() | |
test_setget("foostruct", f) | |
print "Testing incr ...", | |
x = mc.incr("an_integer", 1) | |
if x == 43: | |
print "OK" | |
else: | |
print "FAIL" | |
print "Testing decr ...", | |
x = mc.decr("an_integer", 1) | |
if x == 42: | |
print "OK" | |
else: | |
print "FAIL" | |
# sanity tests | |
print "Testing sending spaces...", | |
try: | |
x = mc.set("this has spaces", 1) | |
except Client.MemcachedKeyCharacterError, msg: | |
print "OK" | |
else: | |
print "FAIL" | |
print "Testing sending control characters...", | |
try: | |
x = mc.set("this\x10has\x11control characters\x02", 1) | |
except Client.MemcachedKeyCharacterError, msg: | |
print "OK" | |
else: | |
print "FAIL" | |
print "Testing using insanely long key...", | |
try: | |
x = mc.set('a'*SERVER_MAX_KEY_LENGTH + 'aaaa', 1) | |
except Client.MemcachedKeyLengthError, msg: | |
print "OK" | |
else: | |
print "FAIL" | |
print "Testing sending a unicode-string key...", | |
try: | |
x = mc.set(u'keyhere', 1) | |
except Client.MemcachedStringEncodingError, msg: | |
print "OK", | |
else: | |
print "FAIL", | |
try: | |
x = mc.set((u'a'*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1) | |
except: | |
print "FAIL", | |
else: | |
print "OK", | |
import pickle | |
s = pickle.loads('V\\u4f1a\np0\n.') | |
try: | |
x = mc.set((s*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1) | |
except Client.MemcachedKeyLengthError: | |
print "OK" | |
else: | |
print "FAIL" | |
print "Testing using a value larger than the memcached value limit...", | |
x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH) | |
if mc.get('keyhere') == None: | |
print "OK", | |
else: | |
print "FAIL", | |
x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH + 'aaa') | |
if mc.get('keyhere') == None: | |
print "OK" | |
else: | |
print "FAIL" | |
print "Testing set_multi() with no memcacheds running", | |
mc.disconnect_all() | |
errors = mc.set_multi({'keyhere' : 'a', 'keythere' : 'b'}) | |
if errors != []: | |
print "FAIL" | |
else: | |
print "OK" | |
print "Testing delete_multi() with no memcacheds running", | |
mc.disconnect_all() | |
ret = mc.delete_multi({'keyhere' : 'a', 'keythere' : 'b'}) | |
if ret != 1: | |
print "FAIL" | |
else: | |
print "OK" | |
# vim: ts=4 sw=4 et : |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment