Skip to content

Instantly share code, notes, and snippets.

@fulmicoton
Last active September 19, 2017 02:57
Show Gist options
  • Save fulmicoton/21c0433901bfd0309d3238e4fab29baa to your computer and use it in GitHub Desktop.
Save fulmicoton/21c0433901bfd0309d3238e4fab29baa to your computer and use it in GitHub Desktop.
from heapq import heappush, heappop
"""
Our internal analytics system relies heavily on producing sorted streams of
(key: string, payload: bytearray).
In the following, I will forget the payload as they are not important here.
We will only consider a stream of sorted keys.
Different shards typically compute them, send them through the network to another
server that will in turn merge them.
We use delta-encoding as a cheap mean of compression for these streams.
Namingly, we take advantage of the fact that these keys are sorted,
so that we know that they often share a prefix (think about a set of 10_000_000 ctks
for instance).
Instead of encoding the keys directly, we
encode for each key as :
(common_prefix_len, added_suffix)
Where
- common_prefix_len is an integer that describes how many characters
from the previous key can be kept. We will trim the previous key
to this number of chars.
- added_suffix is the suffix that needs to be appended to this
trimmed key in order to form our new key.
For instance
abcd
abcdef
ag
becomes
(0, abcd)
(4, ef)
(1, g)
The problem consists in merging different streams encoded using this schema
in an efficient way.
Currently, we decode the stream, merge it using a binary heap,
and reencode the result.
The idea would be to avoid decoding the streams and merge the
encoded streams directly.
I joined a working implementation that is a bit ugly because I
don't have a proper grasp at the right way to implement the
solution.
Your mission is to find a nicer way to implement this.
"""
def compute_common_prefix_len(s1, s2):
""" Returns the length of the longest shared
prefix of s1 and s2
"""
s = 0
for (l1, l2) in zip(s1, s2):
if l1 == l2:
s += 1
else:
break
return s
def encode(g):
"""
Takes a list of strings and delta encode it.
The encoded stream is a generator of
(int, suffix)
"""
prev_key = ""
for key in g:
common_prefix_len = compute_common_prefix_len(prev_key, key)
yield (common_prefix_len, key[common_prefix_len:])
prev_key = key
def decode(g):
"""
Takes a delta encoded generator (see encode)
and decode it back to a generator of strings.
"""
key = ""
for (common_prefix_len, key_update) in g:
key = key[:common_prefix_len] + key_update
yield key
def most_significant_bit(bitset):
""" Returns the highest significant bit.
in Rust, we can use "bsr" for this."""
c = 0
while bitset:
bitset /= 2
c += 1
return c - 1
class Queues:
def __init__(self):
self.bitset = 0
self.queues = [[] for i in range(100)]
def get(self, common_prefix_len):
return self.queues[common_prefix_len]
def push(self, common_prefix_len, el):
self.bitset |= (1 << common_prefix_len)
heappush(self.queues[common_prefix_len], el)
def max_common_prefix_len(self):
return most_significant_bit(self.bitset)
def merge(encoded_streams):
"""
Merge a list of encoded stream (see encode)
and returns an encoded stream.
"""
queues = Queues()
for encoded in encoded_streams:
(common_prefix_len, key_prefix) = encoded.next()
queues.push(common_prefix_len, (key_prefix, encoded))
while True:
common_prefix_len = queues.max_common_prefix_len()
if common_prefix_len == -1:
break
q = queues.get(common_prefix_len)
(root_key_prefix, encoded) = heappop(q)
try:
(fst_el_pop, fst_el_key_prefix) = encoded.next()
queues.push(fst_el_pop, (fst_el_key_prefix, encoded))
except StopIteration:
pass
yield (common_prefix_len, root_key_prefix)
while q:
# peeking
(key_prefix, encoded) = q[0]
extra_common_prefix_len = compute_common_prefix_len(key_prefix, root_key_prefix)
if extra_common_prefix_len > 0:
heappop(q)
queues.push(common_prefix_len + extra_common_prefix_len, (key_prefix[extra_common_prefix_len:], encoded))
else:
break
else:
queues.bitset ^= (1 << common_prefix_len)
if __name__ == "__main__":
from itertools import izip_longest
import random
def random_key():
N = random.choice(range(2, 10))
return ''.join(random.choice("ABCDEF") for i in range(N))
strings = sorted(
random_key() for _ in range(1000)
)
num_splits = 3
splits = [[] for _ in range(num_splits)]
for s in strings:
splits[random.choice(range(num_splits))].append(s)
assert sorted(sum(splits, [])) == strings
splits_encoded = map(encode, splits)
for (encode_after_merge, encode_without_split) in izip_longest(merge(splits_encoded), encode(strings)):
assert encode_after_merge == encode_without_split
print encode_after_merge
@rclaude
Copy link

rclaude commented Aug 14, 2017

Hello :)
I'm currently learning Rust with the goal of contributing to Tantivy!
Without internet to help me out with rust syntax & libs (during a train trip) I felt back to java.
It's not necessarily more redable and certainly not more efficient but i drop it here for perspective.
`
class PrefixEncodedStringQueue extends LinkedList implements Comparable{

    public PrefixEncodedStringQueue(EncodedString... encodedStrings) {
        for (EncodedString encodedString : encodedStrings) {
            this.add(encodedString);
        }
    }

    /**
     * Compare queues heads
     */
    @Override
    public int compareTo(PrefixEncodedStringQueue that) {
        if (this.isEmpty()) {
            return that.isEmpty() ? 0 : 1;
        }
        if (that.isEmpty()) {
            return -1;
        }
        return this.peek().compareTo(that.peek());
    }


    public PrefixEncodedStringQueue merge(PrefixEncodedStringQueue... queues) {
        PrefixEncodedStringQueue mergedQueue = new PrefixEncodedStringQueue();

        List<PrefixEncodedStringQueue> toMergeQueue = new ArrayList<>();
        toMergeQueue.add(this);
        toMergeQueue.addAll(Arrays.asList(queues));
        toMergeQueue.removeIf(List::isEmpty);

        while (!toMergeQueue.isEmpty()) {
            Collections.sort(toMergeQueue);

            Iterator<PrefixEncodedStringQueue> sortedInQueuesIterator = toMergeQueue.iterator();

            // get smallest head
            PrefixEncodedStringQueue smallestHead = sortedInQueuesIterator.next();
            EncodedString mergedHead = smallestHead.pop();
            if (smallestHead.isEmpty()) {
                sortedInQueuesIterator.remove();
            }

            // add it to output
            mergedQueue.add(mergedHead);

            // rebase other queues on it
            sortedInQueuesIterator.forEachRemaining(queue -> queue.rebaseHead(mergedHead));
        }

        return mergedQueue;
    }

    private void rebaseHead(EncodedString interleavedString) {
        this.peek().baseEdit(interleavedString);
    }
}

class EncodedString implements Comparable<EncodedString> {
    private int common_prefix_len;
    private String added_suffix;

    public EncodedString(int common_prefix_len, String added_suffix) {
        this.common_prefix_len = common_prefix_len;
        this.added_suffix = added_suffix;
    }

    @Override
    public int compareTo(EncodedString that) {
        if (this.common_prefix_len != that.common_prefix_len) {
            return Integer.compare(that.common_prefix_len,this.common_prefix_len);
        } else {
            return this.added_suffix.compareTo(that.added_suffix);
        }
    }

    public void baseEdit(EncodedString baseEdit) {
        if (this.common_prefix_len > baseEdit.common_prefix_len) {
            throw new IllegalStateException("Try to edit base of "+this+" with a bigger value "+baseEdit);
        } else if (this.common_prefix_len == baseEdit.common_prefix_len) {
            String extraCommon = StringUtils.getCommonPrefix(this.added_suffix,baseEdit.added_suffix);
            this.common_prefix_len += extraCommon.length();
            this.added_suffix = this.added_suffix.substring(extraCommon.length());
        } else { // this.common_prefix_len < baseEdit.common_prefix_len
            // no change needed
        }
    }
}

}
`

@fulmicoton
Copy link
Author

Oops hadn't seen your answer, sending you an email.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment