Last active
November 21, 2018 20:09
-
-
Save cpcloud/8c507395f5286c5afd669ac32f630353 to your computer and use it in GitHub Desktop.
A really slow database implemented using generators
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import copy | |
from collections import defaultdict | |
from pprint import pprint | |
from typing import ( | |
Any, | |
Callable, | |
Dict, | |
Hashable, | |
Iterable, | |
Iterator, | |
List, | |
Optional, | |
Sequence, | |
Tuple, | |
Type, | |
Union, | |
) | |
import toolz | |
Schema = List[Tuple[str, Type]] | |
RawRow = Optional[Dict[str, Any]] | |
class Row(dict): | |
def __init__(self, **kwargs: Any) -> None: | |
super().__init__(**kwargs) | |
@property | |
def columns(self): | |
return list(self.keys()) | |
def __repr__(self) -> str: | |
return "Row({})".format( | |
", ".join( | |
"{}={!r}".format(column, self[column]) | |
for column in self.columns | |
) | |
) | |
Rows = Iterable[Row] | |
class Relation(abc.ABC): | |
child: "Relation" | |
def next(self, row: Row) -> Optional[Row]: | |
result = self.operate(row) | |
return Row(**result) if result is not None else None | |
def operate(self, row: Row) -> RawRow: | |
... | |
def produce(self, rows: Rows) -> Iterator[Row]: | |
rowterator = self.child.produce(rows) | |
things = map(self.next, rowterator) | |
filts = filter(None, things) | |
yield from filts | |
class Table(Relation): | |
def operate(self, row: Row) -> RawRow: | |
return toolz.identity(row) | |
def produce(self, rows: Rows) -> Iterator[Row]: | |
yield from rows | |
class Projection(Relation): | |
def __init__(self, child: Relation, columns: Sequence[str]) -> None: | |
self.child = child | |
self.columns = columns | |
def operate(self, row: Row) -> RawRow: | |
return {column: row[column] for column in self.columns} | |
class Rename(Relation): | |
def __init__(self, child: Relation, **columns: str) -> None: | |
self.child = child | |
self.columns = columns | |
def operate(self, row: Row) -> RawRow: | |
new_row = copy.deepcopy(row) | |
for key, value in self.columns.items(): | |
new_row[value] = new_row.pop(key) | |
return {key: new_row[key] for key in new_row.columns} | |
class Selection(Relation): | |
def __init__( | |
self, child: Relation, predicate: Callable[[Row], bool] | |
) -> None: | |
self.child = child | |
self.predicate = predicate | |
def operate(self, row: Row) -> RawRow: | |
if self.predicate(row): | |
return {column: row[column] for column in row.columns} | |
return None | |
class Aggregate(abc.ABC): | |
@abc.abstractmethod | |
def step(self, *values: Any) -> None: | |
... | |
@abc.abstractmethod | |
def finalize(self) -> Any: | |
... | |
GroupingKey = Callable[[Row], Dict[str, Hashable]] | |
AggregateSpecification = Dict[ | |
str, Tuple[Type[Aggregate], Callable[[Row], Any]] | |
] | |
class GroupBy(Relation): | |
def __init__( | |
self, | |
child: Relation, | |
group_by: GroupingKey, | |
aggregates: AggregateSpecification, | |
) -> None: | |
self.child = child | |
self.group_by = group_by | |
self.aggregates = aggregates | |
def produce(self, rows: Rows) -> Iterator[Row]: | |
def default_agg(): | |
result = { | |
name: (agg(), func) | |
for name, (agg, func) in self.aggregates.items() | |
} | |
return result | |
aggs = defaultdict(default_agg) | |
aggregates = self.aggregates | |
for row in self.child.produce(rows): | |
keys = tuple( | |
(name, func(row)) for name, func in self.group_by.items() | |
) | |
for name in aggregates.keys(): | |
agg, func = aggs[keys][name] | |
agg.step(func(row)) | |
for key, agg in dict(aggs).items(): | |
grouping_keys = {k: v for k, v in key} | |
agg_values = { | |
key: subagg.finalize() for key, (subagg, _) in agg.items() | |
} | |
res = Row(**toolz.merge(grouping_keys, agg_values)) | |
yield res | |
class Sum(Aggregate): | |
def __init__(self) -> None: | |
self.total = 0 | |
self.count: int = 0 | |
def step(self, value: Any) -> None: | |
if value is not None: | |
self.total += value | |
self.count += 1 | |
def finalize(self) -> Optional[Union[int, float]]: | |
return self.total if self.count else None | |
class Mean(Aggregate): | |
def __init__(self) -> None: | |
self.total: float = 0 | |
self.count: int = 0 | |
def step(self, value: Union[int, float]) -> None: | |
if value is not None: | |
self.total += value | |
self.count += 1 | |
def finalize(self) -> Optional[float]: | |
count = self.count | |
if count > 0: | |
return self.total / count | |
return None | |
t = Table() | |
proj = Projection(t, ["a", "b", "z"]) | |
ren = Rename(proj, a="c", b="d") | |
filt = Selection(ren, lambda row: True) # type: ignore | |
gb = GroupBy( | |
filt, | |
{ | |
"c": lambda row: row['c'], | |
"z": lambda row: row['z'], | |
}, | |
{ | |
"total": (Sum, lambda row: row['d']), # type: ignore | |
"mean": (Mean, lambda row: row['d']), # type: ignore | |
}, | |
) | |
data = [ | |
Row(z='a', a=1, b=2), | |
Row(z='b', a=2, b=-1), | |
Row(z='a', a=3, b=4), | |
Row(z='a', a=4, b=-3), | |
Row(z='a', a=1, b=-3), | |
Row(z='b', a=2, b=-3), | |
Row(z='b', a=3, b=-3), | |
] | |
rowz = gb.produce(data) | |
pprint(list(rowz)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment