Created
October 9, 2021 22:45
-
-
Save bmwalters/668e19cbbc068d386331e7d36b040482 to your computer and use it in GitHub Desktop.
First attempt at a skip list
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Optional | |
import random | |
class Node: | |
value: str | |
timestamp: int | |
next: Optional['Node'] | |
lower: Optional['Node'] | |
def __init__(self, value: str, timestamp: int): | |
self.value = value | |
self.timestamp = timestamp | |
self.next = None | |
self.lower = None | |
class SkipList: | |
def __init__(self, P: int, value: str, timestamp: int): | |
assert P > 0 | |
self.P = P | |
self.head = Node(value, timestamp) | |
current = self.head | |
for _ in range(P - 1): | |
new = Node(value, timestamp) | |
current.lower = new | |
current = new | |
def insert(self, value: str, timestamp: int) -> None: | |
# helper function: propagate upwards | |
def do_insert(last_visited, current, value, timestamp): | |
new = Node(value, timestamp) | |
new.next = current.next | |
current.next = new | |
# propagate upwards probabilistically | |
for i in range(self.P - 2, -1, -1): | |
if not random.choice((True, False)): | |
break | |
upper_new = Node(value, timestamp) | |
upper_new.next = last_visited[i].next | |
last_visited[i].next = upper_new | |
upper_new.lower = new | |
new = upper_new | |
if timestamp < self.head.timestamp: | |
old_head_value = self.head.value | |
old_head_timestamp = self.head.timestamp | |
# update special head nodes | |
current = self.head | |
level = 0 | |
heads: List[Optional[Node]] = [None for _ in range(self.P)] | |
while current is not None: | |
heads[level] = current | |
current.value = value | |
current.timestamp = timestamp | |
current = current.lower | |
level += 1 | |
# re-insert old head node | |
do_insert(heads, heads[len(heads) - 1], old_head_value, old_head_timestamp) | |
else: | |
current = self.head | |
level = 0 | |
last_visited: List[Optional[Node]] = [None for _ in range(self.P)] | |
# at this point we know timestamp >= current.timestamp | |
while True: | |
if current.next is None or timestamp < current.next.timestamp: | |
if current.lower is None: | |
do_insert(last_visited, current, value, timestamp) | |
break | |
else: | |
last_visited[level] = current | |
current = current.lower | |
level += 1 | |
else: | |
current = current.next | |
def query(self, timestamp: int) -> Optional[str]: | |
current = self.head | |
while True: | |
if current.timestamp <= timestamp: | |
if current.next is None or current.next.timestamp > timestamp: | |
if current.lower is None: | |
return current.value | |
else: | |
current = current.lower | |
else: | |
current = current.next | |
else: | |
return None | |
def __str__(self) -> str: | |
out = "" | |
head = self.head | |
while head is not None: | |
current = head | |
while current is not None: | |
out += str(current.timestamp) + " " | |
current = current.next | |
head = head.lower | |
out += "\n" | |
return out | |
if __name__ == "__main__": pass | |
if __name__ == "__main__": | |
ns = iter([16, 1, 15, 3, 9, 8, 2, 20]) | |
fn = next(ns) | |
print("inserting", fn) | |
l = SkipList(4, str(fn), fn) | |
for n in ns: | |
print("inserting", n) | |
l.insert(str(n), n) | |
print(str(l)) | |
for i in range(21): | |
print(i, l.query(i)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment