Created
March 22, 2017 16:46
-
-
Save sancau/eb4591de2dfe3d6ce3c0e72a7ac5227a to your computer and use it in GitHub Desktop.
Simple class for building async / blocking execution pipelines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
""" | |
Minimalistic util for building async pipelines of functions in Python | |
(c) github.com/sancau | |
""" | |
from multiprocessing.dummy import Pool as ThreadPool | |
from multiprocessing import Pool as ProcessPool | |
THREAD = ThreadPool | |
PROCESS = ProcessPool | |
BLOCKING = object() | |
class PipeLine: | |
def __init__(self, schema, config): | |
self.config = config | |
self.schema = schema | |
self.state = self.config['initial_state'] | |
self.stream = None | |
if self.config['type'] in [THREAD, PROCESS]: | |
self.pool = self.config['type'](self.config['factor']) | |
else: | |
self.pool = None | |
def __perform(self, foo): | |
try: | |
self.stream = foo(self) | |
except Exception as e: | |
self.errors.append(e) | |
def __execute_step(self, step): | |
if isinstance(step, list): | |
if self.pool: | |
self.pool.map(self.__perform, step) | |
else: | |
for foo in step: | |
self.__perform(foo) | |
elif callable(step): | |
self.__perform(step) | |
else: | |
raise ValueError('Inproper pipeline schema configuration.') | |
def execute(self, stream=None): | |
if stream is not None: | |
self.stream = stream | |
for step in self.schema: | |
self.__execute_step(step) | |
return self.stream, self.state | |
""" | |
EXAMPLE USAGE | |
""" | |
from time import sleep | |
def one(pipe): | |
print('\n executing one') | |
pipe.state = 'one' | |
return pipe.stream + 1 | |
def two(pipe): | |
print('\n executing two') | |
pipe.state = 'two' | |
return pipe.stream + 1 | |
def three(pipe): | |
print('\n executing three') | |
pipe.state = 'three' | |
return pipe.stream + 1 | |
def four(pipe): | |
print('\n executing four') | |
pipe.state = 'four' | |
return pipe.stream + 1 | |
config = { | |
'type': THREAD, | |
'factor': 7, | |
'initial_state': 'INIT' | |
} | |
schema = [ | |
[one, three], | |
three, | |
[two, two, two], | |
four, | |
one | |
] | |
pipe = PipeLine(schema, config) | |
stream, state = pipe.execute(stream=0) | |
print() | |
print('Pipeline DONE') | |
print() | |
print('Stream: ', stream) | |
print('State: ', state) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment