Last active
September 12, 2020 18:45
-
-
Save JayBazuzi/7fcac6f7d2a637901b5873b464522fdd to your computer and use it in GitHub Desktop.
Pipelines in Python proof-of-concept
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /usr/bin/env python3 | |
from typing import Callable | |
import unittest | |
import typing | |
import abc | |
import typing | |
class Tests(unittest.TestCase): | |
def test_canary(self) -> None: | |
input_pipe = InputPipe[int]("age") | |
def triple(value: int) -> int: | |
return value * 3 | |
collector = input_pipe \ | |
.process_function(triple) \ | |
.collect() | |
input_pipe.send(7) | |
self.assertEqual(21, collector.value) | |
TInput = typing.TypeVar("TInput") | |
TOutput = typing.TypeVar("TOutput") | |
class Listener(typing.Generic[TInput], abc.ABC): | |
def receive(self, value: TInput) -> None: | |
assert not "abstract" | |
class Collector(Listener[TInput]): | |
def __init__(self) -> None: | |
super().__init__() | |
def receive(self, value: TInput) -> None: | |
self.value = value | |
class Sender(typing.Generic[TOutput], abc.ABC): | |
def __init__(self) -> None: | |
self._listeners: typing.List[Listener[TOutput]] = [] | |
def send(self, value: TOutput) -> None: | |
for listener in self._listeners: | |
listener.receive(value) | |
def _add_listener(self, listener: Listener[TOutput]) -> typing.Any: | |
self._listeners.append(listener) | |
return listener | |
_TOutput2 = typing.TypeVar("_TOutput2") | |
def process_function(self, function: typing.Callable[[TOutput], _TOutput2]) -> "FunctionPipe[TOutput, _TOutput2]": | |
result = FunctionPipe(function) | |
self._add_listener(result) | |
return result | |
def collect(self) -> Collector[TOutput]: | |
result = Collector[TOutput]() | |
self._add_listener(result) | |
return result | |
class InputPipe(Sender[TOutput]): | |
def __init__(self, name: str): | |
super().__init__() | |
self.name = name | |
class FunctionPipe(Listener[TInput], Sender[TOutput]): | |
def __init__(self, function: typing.Callable[[TInput], TOutput]): | |
super().__init__() | |
self._function = function | |
def receive(self, value: TInput) -> None: | |
self.send(self._function(value)) | |
if __name__ == "__main__": | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment