Skip to content

Instantly share code, notes, and snippets.

@SXHRYU
Forked from vskrachkov/pipe.py
Last active August 14, 2023 11:37
Show Gist options
  • Save SXHRYU/8bc8d364c849a394fd71143db62503c8 to your computer and use it in GitHub Desktop.
Save SXHRYU/8bc8d364c849a394fd71143db62503c8 to your computer and use it in GitHub Desktop.
python, pipe, chain, monad, functor
from __future__ import annotations
from typing import Callable, Union
class Pipe:
def __init__(self, callable_: Callable):
self.callable_ = callable_
def __rshift__(self, then: Callable | Pipe) -> Pipe:
if not isinstance(then, Pipe):
then = Pipe(then)
then(self.result)
return then
def __call__(self, *args, **kwargs) -> Pipe:
self.result = self.callable_(*args, **kwargs)
return self
@Pipe
def put(v):
return v
def pipe(source, *chain: Callable):
for next_ in chain:
source = next_(source)
return source
def pipe_factory(source, *chain: Callable):
def evaluate():
return pipe(source, *chain)
return evaluate
def main():
put(3) >> (lambda x: (x ** 2, x * 10)) >> sum >> print
pipe(3, lambda x: (x ** 2, x * 10), sum, print)
new_pipe = pipe_factory(3, lambda x: (x ** 2, x * 10), sum, print)
new_pipe()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment