Last active
September 19, 2017 02:57
-
-
Save fulmicoton/21c0433901bfd0309d3238e4fab29baa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from heapq import heappush, heappop | |
""" | |
Our internal analytics system relies heavily on producing sorted streams of | |
(key: string, payload: bytearray). | |
In the following, I will forget the payload as they are not important here. | |
We will only consider a stream of sorted keys. | |
Different shards typically compute them, send them through the network to another | |
server that will in turn merge them. | |
We use delta-encoding as a cheap mean of compression for these streams. | |
Namingly, we take advantage of the fact that these keys are sorted, | |
so that we know that they often share a prefix (think about a set of 10_000_000 ctks | |
for instance). | |
Instead of encoding the keys directly, we | |
encode for each key as : | |
(common_prefix_len, added_suffix) | |
Where | |
- common_prefix_len is an integer that describes how many characters | |
from the previous key can be kept. We will trim the previous key | |
to this number of chars. | |
- added_suffix is the suffix that needs to be appended to this | |
trimmed key in order to form our new key. | |
For instance | |
abcd | |
abcdef | |
ag | |
becomes | |
(0, abcd) | |
(4, ef) | |
(1, g) | |
The problem consists in merging different streams encoded using this schema | |
in an efficient way. | |
Currently, we decode the stream, merge it using a binary heap, | |
and reencode the result. | |
The idea would be to avoid decoding the streams and merge the | |
encoded streams directly. | |
I joined a working implementation that is a bit ugly because I | |
don't have a proper grasp at the right way to implement the | |
solution. | |
Your mission is to find a nicer way to implement this. | |
""" | |
def compute_common_prefix_len(s1, s2): | |
""" Returns the length of the longest shared | |
prefix of s1 and s2 | |
""" | |
s = 0 | |
for (l1, l2) in zip(s1, s2): | |
if l1 == l2: | |
s += 1 | |
else: | |
break | |
return s | |
def encode(g): | |
""" | |
Takes a list of strings and delta encode it. | |
The encoded stream is a generator of | |
(int, suffix) | |
""" | |
prev_key = "" | |
for key in g: | |
common_prefix_len = compute_common_prefix_len(prev_key, key) | |
yield (common_prefix_len, key[common_prefix_len:]) | |
prev_key = key | |
def decode(g): | |
""" | |
Takes a delta encoded generator (see encode) | |
and decode it back to a generator of strings. | |
""" | |
key = "" | |
for (common_prefix_len, key_update) in g: | |
key = key[:common_prefix_len] + key_update | |
yield key | |
def most_significant_bit(bitset): | |
""" Returns the highest significant bit. | |
in Rust, we can use "bsr" for this.""" | |
c = 0 | |
while bitset: | |
bitset /= 2 | |
c += 1 | |
return c - 1 | |
class Queues: | |
def __init__(self): | |
self.bitset = 0 | |
self.queues = [[] for i in range(100)] | |
def get(self, common_prefix_len): | |
return self.queues[common_prefix_len] | |
def push(self, common_prefix_len, el): | |
self.bitset |= (1 << common_prefix_len) | |
heappush(self.queues[common_prefix_len], el) | |
def max_common_prefix_len(self): | |
return most_significant_bit(self.bitset) | |
def merge(encoded_streams): | |
""" | |
Merge a list of encoded stream (see encode) | |
and returns an encoded stream. | |
""" | |
queues = Queues() | |
for encoded in encoded_streams: | |
(common_prefix_len, key_prefix) = encoded.next() | |
queues.push(common_prefix_len, (key_prefix, encoded)) | |
while True: | |
common_prefix_len = queues.max_common_prefix_len() | |
if common_prefix_len == -1: | |
break | |
q = queues.get(common_prefix_len) | |
(root_key_prefix, encoded) = heappop(q) | |
try: | |
(fst_el_pop, fst_el_key_prefix) = encoded.next() | |
queues.push(fst_el_pop, (fst_el_key_prefix, encoded)) | |
except StopIteration: | |
pass | |
yield (common_prefix_len, root_key_prefix) | |
while q: | |
# peeking | |
(key_prefix, encoded) = q[0] | |
extra_common_prefix_len = compute_common_prefix_len(key_prefix, root_key_prefix) | |
if extra_common_prefix_len > 0: | |
heappop(q) | |
queues.push(common_prefix_len + extra_common_prefix_len, (key_prefix[extra_common_prefix_len:], encoded)) | |
else: | |
break | |
else: | |
queues.bitset ^= (1 << common_prefix_len) | |
if __name__ == "__main__": | |
from itertools import izip_longest | |
import random | |
def random_key(): | |
N = random.choice(range(2, 10)) | |
return ''.join(random.choice("ABCDEF") for i in range(N)) | |
strings = sorted( | |
random_key() for _ in range(1000) | |
) | |
num_splits = 3 | |
splits = [[] for _ in range(num_splits)] | |
for s in strings: | |
splits[random.choice(range(num_splits))].append(s) | |
assert sorted(sum(splits, [])) == strings | |
splits_encoded = map(encode, splits) | |
for (encode_after_merge, encode_without_split) in izip_longest(merge(splits_encoded), encode(strings)): | |
assert encode_after_merge == encode_without_split | |
print encode_after_merge |
Oops hadn't seen your answer, sending you an email.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello :)
I'm currently learning Rust with the goal of contributing to Tantivy!
Without internet to help me out with rust syntax & libs (during a train trip) I felt back to java.
It's not necessarily more redable and certainly not more efficient but i drop it here for perspective.
`
class PrefixEncodedStringQueue extends LinkedList implements Comparable{
}
`