Skip to content

Instantly share code, notes, and snippets.

@bewt85
Created March 26, 2015 01:24
Show Gist options
  • Save bewt85/a2b53198c80306a8d405 to your computer and use it in GitHub Desktop.
Save bewt85/a2b53198c80306a8d405 to your computer and use it in GitHub Desktop.
A mix of scala's Future and Try in Python

What

Some sort of hybrid between Scala's Try and Future, in Python

Why?

Sometimes I like to write scripts which pipeline results from one function into another. It would be nice if some of these could run in parallel or if I could simplify the logic so that an Exception raised while calculating one value only messed up other values that relied on it.

Example

attachment = slowly_download_attachment('cat.jpg')
content = get_email_content()
email_server = connect_to_email_server('localhost')
email = Email(content, attachment)
response = email_server.send(email)
errors = collate_errors(content, email_server, email, response)
error_report = create_error_report(errors)
email_server.send(error_report)
print error_report

What if?

  • we can't get content?
    • email not sent
    • error_report sent
  • we can't connect to the email server?
    • we print the error_report
  • we can't email the error_report?
    • we print the error_report
  • there aren't any errors
    • there isn't an error report so it isn't sent or printed

Other benefits

It could be easy to say how long we should wait for something to finish before classing it as Failure. What if I wanted to send the email even if the attachment download failed? Could I write:

response = email_server.send(email, attachment) or email_server.send(email)

TODO

Monkey patch all the things

import gevent
class Try(object):
def __init__(self, result):
self.result = result
class Success(Try):
def __repr__(self):
return "<Success '%s'>" % self.result
class Failure(Try):
def __repr__(self):
return "<Failure '%s'>" % self.result
class Future(object):
def __init__(self, func, *args, **kwargs):
self.result = None
self.job = gevent.spawn(self.wait_for, func, *args, **kwargs)
self.job.start()
def wait_for(self, func, *args, **kwargs):
try_args = self.try_and_join(args)
try_kwargs = self.try_and_join(kwargs)
combined = self.mappable_handler([try_args, try_kwargs])
if isinstance(combined, Failure):
return combined
try:
return Success(func(*try_args.result, **try_kwargs.result))
except Exception as e:
return Failure([e])
def success_handler(self, arg):
return arg
def failure_handler(self, arg):
return arg
def future_handler(self, arg):
arg.join()
return arg.result
def mappable_handler(self, arg):
outcomes = map(self.try_and_join, arg)
failures = self.get_failures(outcomes)
if failures:
return Failure([exception for failure in failures for exception in failure.result])
else:
return Success([outcome.result for outcome in outcomes])
def dict_handler(self, arg):
values = self.mappable_handler(arg.values())
if isinstance(values, Failure):
return values
else:
return Success({k: v for k,v in zip(arg.keys(), values.result)})
def default_handler(self, arg):
return Success(arg)
def try_and_join(self, arg):
handlers = {
Success: self.success_handler,
Failure: self.failure_handler,
Future: self.future_handler,
tuple: self.mappable_handler,
list: self.mappable_handler,
dict: self.dict_handler
}
handler = handlers.get(type(arg), self.default_handler)
return handler(arg)
def get_failures(self, args):
def is_failure(arg):
return isinstance(arg, Failure)
return filter(is_failure, args)
def join(self):
self.job.join()
self.result = self.job.get()
def get(self):
if not self.result:
self.join()
if isinstance(self.result, Failure):
if isinstance(self.result.result, list):
raise self.result.result[0]
else:
raise self.result.result
else:
return self.result.result
def to_future(func):
def create_future(*args, **kwargs):
return Future(func, *args, **kwargs)
return create_future
def to_present(func):
def create_future(*args, **kwargs):
result = Future(func, *args, **kwargs)
return result.get()
return create_future
@to_future
def unit(i):
gevent.sleep(i)
return i
@to_future
def bad_unit(i):
gevent.sleep(i)
raise Exception("bad '%s'" % i)
@to_future
def add(*args):
return sum(args)
five = add(unit(1), unit(1), unit(1), unit(1), unit(1))
seven = bad_unit(7)
ten = unit(10)
twelve = add(five, seven)
fifteen = add(five, ten)
nineteen = add(seven, twelve)
twenty = add(ten, 10)
for n in [five, seven, ten, twelve, fifteen, nineteen, twenty]:
n.join()
for n in [five, seven, ten, twelve, fifteen, nineteen, twenty]:
print "%s = '%s'" % (n, n.result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment