Skip to content

Instantly share code, notes, and snippets.

@sancau
Created March 22, 2017 16:46
Show Gist options
  • Save sancau/eb4591de2dfe3d6ce3c0e72a7ac5227a to your computer and use it in GitHub Desktop.
Save sancau/eb4591de2dfe3d6ce3c0e72a7ac5227a to your computer and use it in GitHub Desktop.
Simple class for building async / blocking execution pipelines
# coding=utf-8
"""
Minimalistic util for building async pipelines of functions in Python
(c) github.com/sancau
"""
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Pool as ProcessPool
THREAD = ThreadPool
PROCESS = ProcessPool
BLOCKING = object()
class PipeLine:
def __init__(self, schema, config):
self.config = config
self.schema = schema
self.state = self.config['initial_state']
self.stream = None
if self.config['type'] in [THREAD, PROCESS]:
self.pool = self.config['type'](self.config['factor'])
else:
self.pool = None
def __perform(self, foo):
try:
self.stream = foo(self)
except Exception as e:
self.errors.append(e)
def __execute_step(self, step):
if isinstance(step, list):
if self.pool:
self.pool.map(self.__perform, step)
else:
for foo in step:
self.__perform(foo)
elif callable(step):
self.__perform(step)
else:
raise ValueError('Inproper pipeline schema configuration.')
def execute(self, stream=None):
if stream is not None:
self.stream = stream
for step in self.schema:
self.__execute_step(step)
return self.stream, self.state
"""
EXAMPLE USAGE
"""
from time import sleep
def one(pipe):
print('\n executing one')
pipe.state = 'one'
return pipe.stream + 1
def two(pipe):
print('\n executing two')
pipe.state = 'two'
return pipe.stream + 1
def three(pipe):
print('\n executing three')
pipe.state = 'three'
return pipe.stream + 1
def four(pipe):
print('\n executing four')
pipe.state = 'four'
return pipe.stream + 1
config = {
'type': THREAD,
'factor': 7,
'initial_state': 'INIT'
}
schema = [
[one, three],
three,
[two, two, two],
four,
one
]
pipe = PipeLine(schema, config)
stream, state = pipe.execute(stream=0)
print()
print('Pipeline DONE')
print()
print('Stream: ', stream)
print('State: ', state)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment