Created
November 10, 2019 06:35
-
-
Save james4388/0fbb24d20663257f87427499cfdc2969 to your computer and use it in GitHub Desktop.
Skip list data structure, used for sliding window. Implement by Raymond Hettinger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# http://code.activestate.com/recipes/576930-efficient-running-median-using-an-indexable-skipli/ | |
# Updated to run with python | |
from random import random | |
from math import log, ceil | |
class Node(object): | |
__slots__ = 'value', 'next', 'width' | |
def __init__(self, value, next, width): | |
self.value, self.next, self.width = value, next, width | |
class End(object): | |
'Sentinel object that always compares greater than another object' | |
def __cmp__(self, other): | |
return 1 | |
def __lte__(self, other): | |
return False | |
def __gt__(self, other): | |
return True | |
NIL = Node(End(), [], []) # Singleton terminator node | |
class IndexableSkiplist: | |
'Sorted collection supporting O(lg n) insertion, removal, and lookup by rank.' | |
def __init__(self, expected_size=100): | |
self.size = 0 | |
self.maxlevels = int(1 + log(expected_size, 2)) | |
self.head = Node('HEAD', [NIL]*self.maxlevels, [1]*self.maxlevels) | |
def __len__(self): | |
return self.size | |
def __getitem__(self, i): | |
node = self.head | |
i += 1 | |
for level in reversed(range(self.maxlevels)): | |
while node.width[level] <= i: | |
i -= node.width[level] | |
node = node.next[level] | |
return node.value | |
def insert(self, value): | |
# find first node on each level where node.next[levels].value > value | |
chain = [None] * self.maxlevels | |
steps_at_level = [0] * self.maxlevels | |
node = self.head | |
for level in reversed(range(self.maxlevels)): | |
while node.next[level].value <= value: | |
steps_at_level[level] += node.width[level] | |
node = node.next[level] | |
chain[level] = node | |
# insert a link to the newnode at each level | |
d = min(self.maxlevels, 1 - int(log(random(), 2.0))) | |
newnode = Node(value, [None]*d, [None]*d) | |
steps = 0 | |
for level in range(d): | |
prevnode = chain[level] | |
newnode.next[level] = prevnode.next[level] | |
prevnode.next[level] = newnode | |
newnode.width[level] = prevnode.width[level] - steps | |
prevnode.width[level] = steps + 1 | |
steps += steps_at_level[level] | |
for level in range(d, self.maxlevels): | |
chain[level].width[level] += 1 | |
self.size += 1 | |
def remove(self, value): | |
# find first node on each level where node.next[levels].value >= value | |
chain = [None] * self.maxlevels | |
node = self.head | |
for level in reversed(range(self.maxlevels)): | |
while node.next[level].value < value: | |
node = node.next[level] | |
chain[level] = node | |
if value != chain[0].next[0].value: | |
raise KeyError('Not Found') | |
# remove one link at each level | |
d = len(chain[0].next[0].next) | |
for level in range(d): | |
prevnode = chain[level] | |
prevnode.width[level] += prevnode.next[level].width[level] - 1 | |
prevnode.next[level] = prevnode.next[level].next[level] | |
for level in range(d, self.maxlevels): | |
chain[level].width[level] -= 1 | |
self.size -= 1 | |
def __iter__(self): | |
'Iterate over values in sorted order' | |
node = self.head.next[0] | |
while node is not NIL: | |
yield node.value | |
node = node.next[0] | |
from collections import deque | |
from itertools import islice | |
class RunningMedian: | |
'Fast running median with O(lg n) updates where n is the window size' | |
def __init__(self, n, iterable): | |
self.it = iter(iterable) | |
self.queue = deque(islice(self.it, n)) | |
self.skiplist = IndexableSkiplist(n) | |
for elem in self.queue: | |
self.skiplist.insert(elem) | |
def __iter__(self): | |
queue = self.queue | |
skiplist = self.skiplist | |
midpoint = len(queue) // 2 | |
yield skiplist[midpoint] | |
for newelem in self.it: | |
oldelem = queue.popleft() | |
skiplist.remove(oldelem) | |
queue.append(newelem) | |
skiplist.insert(newelem) | |
yield skiplist[midpoint] | |
if __name__ == '__main__': | |
######################################################################### | |
# Demonstrate the RunningMedian() class | |
# Compare results to alternative class implemented using a regular list | |
from bisect import insort | |
from random import randrange | |
class RunningMedianSlow: | |
'Slow running-median with O(n) updates where n is the window size' | |
def __init__(self, n, iterable): | |
self.it = iter(iterable) | |
self.queue = deque(islice(self.it, n)) | |
self.sortedlist = sorted(self.queue) | |
def __iter__(self): | |
queue = self.queue | |
sortedlist = self.sortedlist | |
midpoint = len(queue) // 2 | |
yield sortedlist[midpoint] | |
for newelem in self.it: | |
oldelem = queue.popleft() | |
sortedlist.remove(oldelem) | |
queue.append(newelem) | |
insort(sortedlist, newelem) | |
yield sortedlist[midpoint] | |
M, N, window = 5000, 8000, 1001 | |
data = [randrange(M) for i in range(N)] | |
result = list(RunningMedian(window, data)) | |
expected = list(RunningMedianSlow(window, data)) | |
assert result == expected | |
print 'Successful test of RunningMedian() with', N, | |
print 'items and a window of size', window, '\n' | |
######################################################################### | |
# Demonstrate and test the IndexableSkiplist() collection class | |
from random import shuffle | |
demo = list('FOURSQUARE') | |
excluded = list('XZ') | |
print 'Building an indexable skiplist containing %r:' % ''.join(demo) | |
s = IndexableSkiplist(len(demo)) # exercise __init__() | |
shuffle(demo) | |
for char in demo: | |
s.insert(char) # check insert() | |
print 'Adding: ', char, list(s), len(s) | |
for char in excluded: | |
print 'Attempting to remove:', char, '-->', | |
try: | |
s.remove(char) # check remove() item not in skiplist | |
except KeyError: | |
print 'Successfully not found' | |
assert len(s) == len(demo) # check __len__() | |
assert list(s) == sorted(demo) # check __iter__() | |
assert [s[i] for i in range(len(s))] == sorted(demo) # check __getitem__() | |
shuffle(demo) | |
for char in demo: | |
s.remove(char) # check remove() item in skiplist | |
print 'Removing: ', char, list(s), len(s) | |
assert list(s) == [] and len(s) == 0 | |
############################################################################# | |
# Show the IndexableSkiplist's internal structure and describe its invariants | |
def iter_levels(skiplist, level): | |
'Iterate through nodes on a given level.' | |
node = skiplist.head | |
while node is not NIL: | |
yield node | |
node = node.next[level] | |
demo = list('HOPSCOTCH') # create a sample skiplist | |
s = IndexableSkiplist(len(demo)) | |
for char in demo: | |
s.insert(char) | |
print '\nStructure of an indexable skiplist containing %r:' % ''.join(demo) | |
for i in reversed(range(s.maxlevels)): # show the link structure on each level | |
print 'Level %d: ' % i, | |
line = '' | |
for n in iter_levels(s, i): | |
line += '%s%d %s' % (n.value, n.width[i], ' ' * (n.width[i] - 1)) | |
print line + 'NIL' | |
print '\nNote the following structure invariants:' | |
print '* Level 0 has widths all equal to one.' | |
print '* The sum of widths on each level == len(skiplist)+1.' | |
print '* Entries on higher levels are subsets of the lower levels.' | |
print '* Widths of entries on higer levels sum to the widths below.' | |
print ' For example, a P --> T link of two steps on one level' | |
print ' corresponds to P --> S --> T links of one step each on level 0.' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment