Last active
August 19, 2020 07:38
-
-
Save beratdogan/c31b2f74f40672b46f5e485eb2075688 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextvars | |
from pprint import pprint as pp | |
__pipeline = contextvars.ContextVar("pipeline") | |
class StartingStepAlreadySet(Exception): | |
pass | |
class StartingStepNotSet(Exception): | |
pass | |
class OutOfPipelineError(Exception): | |
pass | |
def getpipeline(): | |
try: | |
return __pipeline.get() | |
except LookupError as err: | |
raise OutOfPipelineError() from err | |
def setpipeline(pipeline): | |
__pipeline.set(pipeline) | |
class Pipeline: | |
def __init__(self): | |
self.starting_step = None | |
self.steps = [] | |
self.edges = [] | |
def __enter__(self): | |
setpipeline(self) | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
if exc_type is not None: | |
return # to hold exteption | |
self.render() | |
setpipeline(None) | |
def __rshift__(self, other): | |
self.set_starting_step(other) | |
def add_step(self, step): | |
self.steps.append(step) | |
def add_edge(self, edge): | |
self.edges.append(edge) | |
def set_starting_step(self, step): | |
if self.starting_step: | |
raise StartingStepAlreadySet() | |
self.starting_step = step | |
def render(self): | |
if not self.starting_step: | |
raise StartingStepNotSet() | |
pp(self.starting_step) | |
pp(self.steps) | |
pp(self.edges) | |
while self.edges: | |
edge = self.edges.pop() | |
class Step: | |
def __init__(self, name, conditions=None): | |
self.name = name | |
self.conditions = conditions | |
self.pipeline = getpipeline() | |
self.pipeline.add_step(self) | |
def __repr__(self): | |
return f"{self.name} ({self.conditions})" | |
def __rshift__(self, other): | |
return self.next(other) | |
def __pos__(self): | |
self.start_here() | |
def start_here(self): | |
self.pipeline.set_starting_step(self) | |
def when(self, **cond): | |
# copy current step with new conditions | |
# MAYBE: this adds another step to pipeline, do we want this? | |
return self.__class__(name=self.name, conditions=cond) | |
def next(self, other): | |
if not isinstance(other, list): | |
self.pipeline.add_edge(Edge(self, other)) | |
return | |
# handle parallel runs | |
for steps in other: | |
self.pipeline.add_edge(Edge(self, steps, parallel=True)) | |
class Edge: | |
def __init__(self, from_, to, parallel=False): | |
self.from_ = from_ | |
self.to = to | |
self.parallel = parallel | |
def __repr__(self): | |
return f"{self.from_} > {self.to} {self.parallel}" | |
with Pipeline(): | |
# Declare steps | |
a = Step(name='a') | |
b = Step(name='b') | |
b1 = Step(name='b1') | |
b2 = Step(name='b2') | |
c = Step(name='c') | |
success = Step(name='success') | |
fail = Step(name='fail') | |
# Connect steps to each other | |
a.when(status=False).next(fail) # explicit way | |
a.when(status=True) >> b # operators-also supported | |
b.next([b1, b2]) # parallel run | |
b1.next(c) | |
b2.next(c) | |
c >> success | |
c >> fail | |
# Set a as starting step | |
a.start_here() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
repr: