Created
March 22, 2018 22:16
-
-
Save senderista/aa40844cc5198b734ed1da5b6cab4fd6 to your computer and use it in GitHub Desktop.
An experimental algorithm to use quasirandomness to assign nodes to positions on the consistent hashing ring (deterministically bounding skew to at most a factor of 2)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import math | |
from bisect import * | |
from collections import defaultdict | |
from collections import Counter | |
"""Find rightmost value less than or equal to x""" | |
def find_le(a, x): | |
i = bisect_right(a, x) | |
if i: | |
return a[i-1] | |
raise ValueError | |
# bit-reversal permutation | |
def brp(num_bits, n): | |
fmt = '{:0%db}' % num_bits | |
return int(fmt.format(n)[::-1], 2) | |
def vnode_to_worker(vnode): | |
# first apply BRP to invert permutation | |
real_vnode = brp(NUM_BITS, vnode) | |
# vnodes are assigned sequentially one worker at a time | |
worker = real_vnode / VNODES_PER_WORKER | |
return worker | |
def find_vnode_owning_partition(part_id): | |
# find the first vnode on or to the left of the partition | |
try: | |
return find_le(SORTED_VNODES, part_id) | |
except ValueError: | |
# if no vnodes on or to the left of the partition, wrap around and pick the first vnode on the circle | |
return SORTED_VNODES[0] | |
def find_worker_owning_partition(part_id): | |
# find the vnode | |
vnode = find_vnode_owning_partition(part_id) | |
return vnode_to_worker(vnode) | |
def print_partition_assignments(): | |
global PARTITIONS | |
global PARTITION_ASSIGNMENTS | |
for worker, partitions in PARTITION_ASSIGNMENTS.iteritems(): | |
share = float(len(partitions))/float(PARTITIONS) | |
print "*" * int(share*2000) | |
# print "Worker %d has %d partitions" % (worker, len(partitions)) | |
def update_partition_assignments(): | |
global PARTITION_ASSIGNMENTS | |
partition_assignments = defaultdict(list) | |
# move clockwise around the circle, assigning all partitions between the current vnode (inclusive) and the next vnode (exclusive) to the current vnode | |
for idx, vnode in enumerate(SORTED_VNODES): | |
owned_partitions_upper_bound = SORTED_VNODES[idx+1] if idx+1 < len(SORTED_VNODES) else PARTITIONS | |
owned_partitions = range(vnode, owned_partitions_upper_bound) | |
# print "vnode ", vnode, ": ", ','.join(map(str, owned_partitions)) | |
worker = vnode_to_worker(vnode) | |
# print "vnode, worker: ", vnode, worker | |
for p in owned_partitions: | |
insort(partition_assignments[worker], p) | |
PARTITION_ASSIGNMENTS = partition_assignments | |
def main(args): | |
global NUM_BITS | |
global WORKERS | |
global PARTITIONS | |
global VNODES_PER_WORKER | |
global SORTED_VNODES | |
NUM_BITS = int(args[0]) | |
WORKERS = int(args[1]) | |
PARTITIONS = 2**NUM_BITS | |
VNODES_PER_WORKER = NUM_BITS | |
VNODES_PER_WORKER = 1 | |
# VNODES_PER_WORKER = PARTITIONS / WORKERS | |
# VNODES_PER_WORKER = int(math.log(WORKERS, 2)+1) | |
if WORKERS * VNODES_PER_WORKER > PARTITIONS: | |
print "Too many vnodes (%d) for %d partitions" % (WORKERS * VNODES_PER_WORKER, PARTITIONS) | |
sys.exit(1) | |
SORTED_VNODES = [] | |
for i in xrange(WORKERS): | |
for j in xrange(VNODES_PER_WORKER): | |
vnode = i * VNODES_PER_WORKER + j | |
hashed_vnode = brp(NUM_BITS, vnode) | |
insort(SORTED_VNODES, hashed_vnode) | |
# print SORTED_VNODES | |
update_partition_assignments() | |
print_partition_assignments() | |
format_str = "{: <16} {: <11}" | |
print(format_str.format('OWNED_PARTITIONS', 'NUM_WORKERS')) | |
print(format_str.format('----------------', '-----------')) | |
for item, count in Counter(map(len, PARTITION_ASSIGNMENTS.values())).items(): | |
print(format_str.format(item, count)) | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment