Skip to content

Instantly share code, notes, and snippets.

@senderista
Created March 22, 2018 22:16
Show Gist options
  • Save senderista/aa40844cc5198b734ed1da5b6cab4fd6 to your computer and use it in GitHub Desktop.
Save senderista/aa40844cc5198b734ed1da5b6cab4fd6 to your computer and use it in GitHub Desktop.
An experimental algorithm to use quasirandomness to assign nodes to positions on the consistent hashing ring (deterministically bounding skew to at most a factor of 2)
#!/usr/bin/env python
import sys
import math
from bisect import *
from collections import defaultdict
from collections import Counter
"""Find rightmost value less than or equal to x"""
def find_le(a, x):
i = bisect_right(a, x)
if i:
return a[i-1]
raise ValueError
# bit-reversal permutation
def brp(num_bits, n):
fmt = '{:0%db}' % num_bits
return int(fmt.format(n)[::-1], 2)
def vnode_to_worker(vnode):
# first apply BRP to invert permutation
real_vnode = brp(NUM_BITS, vnode)
# vnodes are assigned sequentially one worker at a time
worker = real_vnode / VNODES_PER_WORKER
return worker
def find_vnode_owning_partition(part_id):
# find the first vnode on or to the left of the partition
try:
return find_le(SORTED_VNODES, part_id)
except ValueError:
# if no vnodes on or to the left of the partition, wrap around and pick the first vnode on the circle
return SORTED_VNODES[0]
def find_worker_owning_partition(part_id):
# find the vnode
vnode = find_vnode_owning_partition(part_id)
return vnode_to_worker(vnode)
def print_partition_assignments():
global PARTITIONS
global PARTITION_ASSIGNMENTS
for worker, partitions in PARTITION_ASSIGNMENTS.iteritems():
share = float(len(partitions))/float(PARTITIONS)
print "*" * int(share*2000)
# print "Worker %d has %d partitions" % (worker, len(partitions))
def update_partition_assignments():
global PARTITION_ASSIGNMENTS
partition_assignments = defaultdict(list)
# move clockwise around the circle, assigning all partitions between the current vnode (inclusive) and the next vnode (exclusive) to the current vnode
for idx, vnode in enumerate(SORTED_VNODES):
owned_partitions_upper_bound = SORTED_VNODES[idx+1] if idx+1 < len(SORTED_VNODES) else PARTITIONS
owned_partitions = range(vnode, owned_partitions_upper_bound)
# print "vnode ", vnode, ": ", ','.join(map(str, owned_partitions))
worker = vnode_to_worker(vnode)
# print "vnode, worker: ", vnode, worker
for p in owned_partitions:
insort(partition_assignments[worker], p)
PARTITION_ASSIGNMENTS = partition_assignments
def main(args):
global NUM_BITS
global WORKERS
global PARTITIONS
global VNODES_PER_WORKER
global SORTED_VNODES
NUM_BITS = int(args[0])
WORKERS = int(args[1])
PARTITIONS = 2**NUM_BITS
VNODES_PER_WORKER = NUM_BITS
VNODES_PER_WORKER = 1
# VNODES_PER_WORKER = PARTITIONS / WORKERS
# VNODES_PER_WORKER = int(math.log(WORKERS, 2)+1)
if WORKERS * VNODES_PER_WORKER > PARTITIONS:
print "Too many vnodes (%d) for %d partitions" % (WORKERS * VNODES_PER_WORKER, PARTITIONS)
sys.exit(1)
SORTED_VNODES = []
for i in xrange(WORKERS):
for j in xrange(VNODES_PER_WORKER):
vnode = i * VNODES_PER_WORKER + j
hashed_vnode = brp(NUM_BITS, vnode)
insort(SORTED_VNODES, hashed_vnode)
# print SORTED_VNODES
update_partition_assignments()
print_partition_assignments()
format_str = "{: <16} {: <11}"
print(format_str.format('OWNED_PARTITIONS', 'NUM_WORKERS'))
print(format_str.format('----------------', '-----------'))
for item, count in Counter(map(len, PARTITION_ASSIGNMENTS.values())).items():
print(format_str.format(item, count))
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment