Last active
March 7, 2024 09:27
-
-
Save josephg/053c7e42ffebf7e91e966fe3b0cdcebc to your computer and use it in GitHub Desktop.
Graph diff
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
from enum import Enum | |
import heapq | |
Version = set[int] # The numbers in this set are indexes into the list of events in the graph. | |
@dataclass | |
class GraphEntry: | |
parents: set[int] # Set of indexes... | |
id: any | |
# And the event data. | |
@dataclass | |
class DiffResult: | |
a_only: list[int] | |
b_only: list[int] | |
class CausalGraph: | |
entries: list[GraphEntry] | |
def diff(self, a: Version, b: Version) -> DiffResult: | |
class DiffFlag(Enum): | |
A = 0 | |
B = 1 | |
SHARED = 2 | |
flags: dict[int, DiffFlag] = {} | |
# We need a max queue, so all entries are negative. | |
queue: list[int] = [] | |
# number of items in the queue in state `SHARED` | |
num_shared = 0 | |
def enqueue(lv: int, f: DiffFlag): | |
nonlocal num_shared, flags | |
if lv not in flags: | |
heapq.heappush(queue, -lv) | |
flags[lv] = f | |
if f == DiffFlag.SHARED: | |
num_shared += 1 | |
else: | |
current_flag = flags[lv] | |
if f != current_flag and current_flag != DiffFlag.SHARED: | |
# This is sneaky. If the two types are different they have to be {A,B}, | |
# {A,Shared} or {B,Shared}. In any of those cases the final result is | |
# Shared. If the current type isn't shared, set it as such. | |
flags[lv] = DiffFlag.SHARED | |
num_shared += 1 | |
for v in a: | |
enqueue(v, DiffFlag.A) | |
for v in b: | |
enqueue(v, DiffFlag.B) | |
a_only: list[int] = [] | |
b_only: list[int] = [] | |
while len(queue) > num_shared: | |
v = -heapq.heappop(queue) | |
flag = flags[v] | |
if flag == DiffFlag.SHARED: | |
num_shared -= 1 | |
# If there are other heap entries pointing to the same place, consume them. | |
while queue and queue[0] == -v: | |
heapq.heappop(queue) | |
if flag == DiffFlag.SHARED: | |
num_shared -= 1 | |
if flag != DiffFlag.SHARED: | |
if flag == DiffFlag.A: | |
a_only.append(v) | |
else: | |
b_only.append(v) | |
e = self.entries[v] | |
for p in e.parents: | |
enqueue(p, flag) | |
a_only.reverse() | |
b_only.reverse() | |
return DiffResult(a_only, b_only) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment