Skip to content

Instantly share code, notes, and snippets.

@cpcloud
Last active November 21, 2018 20:09
Show Gist options
  • Save cpcloud/8c507395f5286c5afd669ac32f630353 to your computer and use it in GitHub Desktop.
Save cpcloud/8c507395f5286c5afd669ac32f630353 to your computer and use it in GitHub Desktop.
A really slow database implemented using generators
import abc
import copy
from collections import defaultdict
from pprint import pprint
from typing import (
Any,
Callable,
Dict,
Hashable,
Iterable,
Iterator,
List,
Optional,
Sequence,
Tuple,
Type,
Union,
)
import toolz
Schema = List[Tuple[str, Type]]
RawRow = Optional[Dict[str, Any]]
class Row(dict):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
@property
def columns(self):
return list(self.keys())
def __repr__(self) -> str:
return "Row({})".format(
", ".join(
"{}={!r}".format(column, self[column])
for column in self.columns
)
)
Rows = Iterable[Row]
class Relation(abc.ABC):
child: "Relation"
def next(self, row: Row) -> Optional[Row]:
result = self.operate(row)
return Row(**result) if result is not None else None
def operate(self, row: Row) -> RawRow:
...
def produce(self, rows: Rows) -> Iterator[Row]:
rowterator = self.child.produce(rows)
things = map(self.next, rowterator)
filts = filter(None, things)
yield from filts
class Table(Relation):
def operate(self, row: Row) -> RawRow:
return toolz.identity(row)
def produce(self, rows: Rows) -> Iterator[Row]:
yield from rows
class Projection(Relation):
def __init__(self, child: Relation, columns: Sequence[str]) -> None:
self.child = child
self.columns = columns
def operate(self, row: Row) -> RawRow:
return {column: row[column] for column in self.columns}
class Rename(Relation):
def __init__(self, child: Relation, **columns: str) -> None:
self.child = child
self.columns = columns
def operate(self, row: Row) -> RawRow:
new_row = copy.deepcopy(row)
for key, value in self.columns.items():
new_row[value] = new_row.pop(key)
return {key: new_row[key] for key in new_row.columns}
class Selection(Relation):
def __init__(
self, child: Relation, predicate: Callable[[Row], bool]
) -> None:
self.child = child
self.predicate = predicate
def operate(self, row: Row) -> RawRow:
if self.predicate(row):
return {column: row[column] for column in row.columns}
return None
class Aggregate(abc.ABC):
@abc.abstractmethod
def step(self, *values: Any) -> None:
...
@abc.abstractmethod
def finalize(self) -> Any:
...
GroupingKey = Callable[[Row], Dict[str, Hashable]]
AggregateSpecification = Dict[
str, Tuple[Type[Aggregate], Callable[[Row], Any]]
]
class GroupBy(Relation):
def __init__(
self,
child: Relation,
group_by: GroupingKey,
aggregates: AggregateSpecification,
) -> None:
self.child = child
self.group_by = group_by
self.aggregates = aggregates
def produce(self, rows: Rows) -> Iterator[Row]:
def default_agg():
result = {
name: (agg(), func)
for name, (agg, func) in self.aggregates.items()
}
return result
aggs = defaultdict(default_agg)
aggregates = self.aggregates
for row in self.child.produce(rows):
keys = tuple(
(name, func(row)) for name, func in self.group_by.items()
)
for name in aggregates.keys():
agg, func = aggs[keys][name]
agg.step(func(row))
for key, agg in dict(aggs).items():
grouping_keys = {k: v for k, v in key}
agg_values = {
key: subagg.finalize() for key, (subagg, _) in agg.items()
}
res = Row(**toolz.merge(grouping_keys, agg_values))
yield res
class Sum(Aggregate):
def __init__(self) -> None:
self.total = 0
self.count: int = 0
def step(self, value: Any) -> None:
if value is not None:
self.total += value
self.count += 1
def finalize(self) -> Optional[Union[int, float]]:
return self.total if self.count else None
class Mean(Aggregate):
def __init__(self) -> None:
self.total: float = 0
self.count: int = 0
def step(self, value: Union[int, float]) -> None:
if value is not None:
self.total += value
self.count += 1
def finalize(self) -> Optional[float]:
count = self.count
if count > 0:
return self.total / count
return None
t = Table()
proj = Projection(t, ["a", "b", "z"])
ren = Rename(proj, a="c", b="d")
filt = Selection(ren, lambda row: True) # type: ignore
gb = GroupBy(
filt,
{
"c": lambda row: row['c'],
"z": lambda row: row['z'],
},
{
"total": (Sum, lambda row: row['d']), # type: ignore
"mean": (Mean, lambda row: row['d']), # type: ignore
},
)
data = [
Row(z='a', a=1, b=2),
Row(z='b', a=2, b=-1),
Row(z='a', a=3, b=4),
Row(z='a', a=4, b=-3),
Row(z='a', a=1, b=-3),
Row(z='b', a=2, b=-3),
Row(z='b', a=3, b=-3),
]
rowz = gb.produce(data)
pprint(list(rowz))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment