Last active
September 30, 2015 01:47
-
-
Save gchiam/9a8e25cc0679d1ad8e8c to your computer and use it in GitHub Desktop.
Simple Workflow Engine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import logging | |
class SimpleWorkflowEngine(object): | |
"""A simple workflow engine | |
Usage: | |
from functools import partial | |
def initialize(number): | |
return range(number) | |
def square(numbers): | |
return [x * x for x in numbers] | |
def multiply(numbers, multiplier=1): | |
return [x * multiplier for x in numbers] | |
workflow = [ | |
initialize, | |
square, | |
partial(multiply, multiplier=2), | |
sum | |
] | |
engine = SimpleWorkflowEngine(workflow, initial=6) | |
return engine.run() | |
""" | |
def __init__(self, workflow, initial=None): | |
self.initial = initial | |
self.workflow = workflow | |
self.logger = logging.getLogger(__name__) | |
def run(self): | |
"""execute the workflow""" | |
return reduce( | |
lambda data, func: self._task_wrapper(func)(data), | |
[self.initial] + self.workflow | |
) | |
def _task_wrapper(self, func): | |
def get_func_name(): | |
func_name = str(func) | |
try: | |
func_name = func.func_name | |
except AttributeError: | |
try: | |
func_name = func.__name__ | |
except AttributeError: | |
if isinstance(func, functools.partial): | |
func_name = func.func.func_name | |
return str(func_name) | |
def format_for_display(arg, max_len=80): | |
display = str(arg) | |
if len(display) > max_len: | |
display = display[:max_len] + '...' | |
return display | |
def func_wrapper(*args, **kwargs): | |
func_name = get_func_name() | |
self.logger.debug( | |
'executing [%s], args[%s], kwargs[%s]', | |
func_name, | |
format_for_display(args), | |
format_for_display(kwargs), | |
) | |
results = func(*args, **kwargs) | |
self.logger.debug( | |
'executed [%s], results[%s]', | |
func_name, | |
format_for_display(results), | |
) | |
return results | |
return func_wrapper | |
def test(): | |
from functools import partial | |
def initialize(number): | |
return range(number) | |
def square(numbers): | |
return [x * x for x in numbers] | |
def multiply(numbers, multiplier=1): | |
return [x * multiplier for x in numbers] | |
workflow = [ | |
initialize, | |
square, | |
partial(multiply, multiplier=2), | |
sum | |
] | |
engine = SimpleWorkflowEngine(workflow, initial=6) | |
print engine.run() | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment