Created
March 24, 2015 03:10
-
-
Save haridas/e7db5d28536fab4ebe52 to your computer and use it in GitHub Desktop.
Ketama based Consistent hashing implementation of python-memcache library.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
To Test this Script the start 8 memcache servers using this command. | |
$ memcached -d -p {PortNumber} | |
PortNumber: | |
11211 | |
11212 | |
11213 | |
11214 | |
11215 | |
11216 | |
11217 | |
11218 | |
If this is done then run this program by execution. | |
$ python new_memecached.py | |
""" | |
import random | |
import string | |
import memcache | |
from binascii import crc32 | |
class MemcacheClient(memcache.Client): | |
""" A memcache subclass. It currently allows you to add a new host at run | |
time. | |
Sadly, this truely messes with the our keys. I.E. Adding a host at runtime | |
effectively wipes our cache all together...Wonder why? | |
""" | |
def _get_server(self, key): | |
""" Current implementation of Memcache client | |
""" | |
return super(MemcacheClient, self)._get_server(key) | |
def add_server(self, server): | |
""" Adds a host at runtime to client | |
""" | |
# Create a new host entry | |
server = memcache._Host( | |
server, self.debug, dead_retry=self.dead_retry, | |
socket_timeout=self.socket_timeout, | |
flush_on_reconnect=self.flush_on_reconnect | |
) | |
# Add this to our server choices | |
self.servers.append(server) | |
# Update our buckets | |
self.buckets.append(server) | |
class KetamaMemcacheClient(memcache.Client): | |
""" | |
This memcache client implements consistent hashing algorithm "ketama". | |
This will make sure that the cache miss happening while adding or removing | |
a node from the client to very minimal. | |
""" | |
# | |
# Server weight means, numer of slots given for one server. For better | |
# performence it whould be between 100-200 - Adjust the weight to see how | |
# cache miss changing. | |
# | |
DEFAULT_SERVER_WEIGHT = 200 | |
# Total number of slots on the ring. | |
# If addition or deltion of a new node only causes 1 to 5 percentage cache | |
# miss on the current configuraiton. ie; K / RING_SIZE - where K means total | |
# keys. | |
RING_SIZE = 2 ** 16 | |
def __init__(self, *args, **kwargs): | |
""" | |
Add some special parameters to handle the servers allocation. | |
""" | |
# Mapping between ring slot -> server. | |
self._ketama_server_ring = {} | |
# Sorted server slots on top of the virtual ring. | |
self._ketama_server_slots = [] | |
super(KetamaMemcacheClient, self).__init__(*args, **kwargs) | |
def _get_server(self, key): | |
""" | |
Get the memcache server corresponding to the given key. | |
:param key: The input query. | |
:return: A tuple with (server_obj, key). | |
""" | |
# map the key on to the ring slot space. | |
h_key = self._generate_ring_slot(key) | |
for slot in self._ketama_server_slots: | |
if h_key <= slot: | |
server = self._ketama_server_ring[slot] | |
if server.connect(): | |
return (server, key) | |
# Even after allocating the server, if the h_key won't fit | |
# on any server, then pick the first server on the ring. | |
server = self._ketama_server_ring[self._ketama_server_slots[0]] if \ | |
self._ketama_server_slots else None | |
server and server.connect() | |
return server, key | |
def add_server(self, server): | |
""" | |
Add new server to the client. | |
:param servers: server host in <IP>:<PORT> format. | |
or in tuple of (<IP>:<PORT>, weight) | |
""" | |
server_obj = memcache._Host( | |
server if isinstance(server, tuple) else ( | |
server, self.DEFAULT_SERVER_WEIGHT), | |
self.debug, dead_retry=self.dead_retry, | |
socket_timeout=self.socket_timeout, | |
flush_on_reconnect=self.flush_on_reconnect) | |
self._place_server_on_ring(server_obj) | |
def set_servers(self, servers): | |
""" | |
Add a pool of servers into the client. | |
:param servers: List of server hosts in <IP>:<PORT> format. | |
or | |
List of tuples with each tuple of the format | |
(<IP>:<PORT>, weight) | |
""" | |
# Set the default weight if weight isn't passed. | |
self.servers = [memcache._Host( | |
s if isinstance(s, tuple) else (s, self.DEFAULT_SERVER_WEIGHT), | |
self.debug, dead_retry=self.dead_retry, | |
socket_timeout=self.socket_timeout, | |
flush_on_reconnect=self.flush_on_reconnect) for s in servers] | |
# Place all the servers on rings based on the slot allocation | |
# specifications. | |
# [self._place_server_on_ring(s) for s in self.servers] | |
map(self._place_server_on_ring, self.servers) | |
def _place_server_on_ring(self, server): | |
""" | |
Place given server on the ring. | |
:param server: An instance of :class:~`memcache._Host`. | |
""" | |
server_slots = self._get_server_slots_on_ring(server) | |
for slot in server_slots: | |
if slot not in self._ketama_server_ring: | |
self._ketama_server_ring[slot] = server | |
self._ketama_server_slots.append(slot) | |
else: | |
# There is a key collection(<<<1% chance). | |
# Discarding this scenario now. | |
# TODO: Handle it. | |
pass | |
# Append the sorted server slot list | |
self._ketama_server_slots.sort() | |
def _get_server_slots_on_ring(self, server): | |
""" | |
Returns list of slot on the ring for given server. | |
This make sure that the slots won't collid with others server. | |
:param server: An object of :class:~`memcache._Host`. | |
:return: list of slots on the ring. | |
""" | |
server_slots = [] | |
for i in range(0, server.weight): | |
server_key = "{}_{}".format("{}:{}".format(server.ip, | |
server.port), i) | |
server_slots.append(self._generate_ring_slot(server_key)) | |
return server_slots | |
def _generate_ring_slot(self, key): | |
""" | |
Hash function which give random slots on the ring. Hash functon make | |
sure that the key distribution is even as much as possible. | |
:param key: Key which need to be mapped to the hash space. | |
:type key: str | |
:return: hash key corresponding to `key` | |
""" | |
# Simple hash method using python's internal hash algorithm. | |
#h_key = hash(key) & 0xffff | |
# crc32 based hashing | |
#h_key = ((crc32(key) & 0xffffffff) >> 16) & 0xffff | |
# For better randomness | |
h_key = ((crc32(key) & 0xffffffff)) & 0xffff | |
return h_key | |
def random_key(size): | |
""" Generates a random key | |
""" | |
return ''.join(random.choice(string.letters) for _ in range(size)) | |
if __name__ == '__main__': | |
# We have 7 running memcached servers | |
servers = ['127.0.0.1:1121%d' % i for i in range(1, 8)] | |
# We have 100 keys to split across our servers | |
def _naive_memcache_client(servers): | |
print "============= USING NAIVE HASHING ALGORITHM ================" | |
keys = [random_key(10) for i in range(100)] | |
# Init our subclass | |
client = MemcacheClient(servers=servers) | |
# Distribute the keys on our servers | |
for key in keys: | |
client.set(key, 1) | |
# Check how many keys come back | |
valid_keys = client.get_multi(keys) | |
print '%s percent of keys matched' % ((len(valid_keys)/float(len(keys))) * 100) | |
# We add another server...and pow! | |
client.add_server('127.0.0.1:11218') | |
print 'Added new server' | |
valid_keys = client.get_multi(keys) | |
print '%s percent of keys stil matched' % ((len(valid_keys)/float(len(keys))) * 100) | |
def _ketama_memcache_client(servers): | |
print "============= USING KETAMA HASHING ALGORITHM ================" | |
keys = [random_key(10) for i in range(100)] | |
ketama_client = KetamaMemcacheClient(servers=servers) | |
# Distribute the keys on our servers | |
for key in keys: | |
#client.set(key, 1) | |
ketama_client.set(key, 1) | |
valid_keys = ketama_client.get_multi(keys) | |
print '%s percent of keys matched' % ((len(valid_keys)/float(len(keys))) * 100) | |
# We add another server...and pow! | |
ketama_client.add_server('127.0.0.1:11218') | |
print 'Added new server' | |
# Check how many keys come back | |
valid_keys = ketama_client.get_multi(keys) | |
print '%s percent of keys stil matched' % ((len(valid_keys)/float(len(keys))) * 100) | |
_naive_memcache_client(servers) | |
_ketama_memcache_client(servers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment