Skip to content

Instantly share code, notes, and snippets.

@frodo821
Last active January 7, 2020 06:38
Show Gist options
  • Select an option

  • Save frodo821/a39ee24cea52e96a9495574a0e25bade to your computer and use it in GitHub Desktop.

Select an option

Save frodo821/a39ee24cea52e96a9495574a0e25bade to your computer and use it in GitHub Desktop.
reimplementation of javascript Promise API with Python
from functools import wraps
from time import sleep, time
from concurrent.futures import ThreadPoolExecutor
__all__ = ['Cancelled', 'Promise', 'asyncfun', 'promisify']
class Cancelled(BaseException):
pass
def asyncfun(cor):
"""
Create promise async function from Python coroutine function.
```
@asyncfun
async def heavyTask(somearg, somearg2):
return do_something()
heavyTask(some, some1)\
.then(someResultHandler)\
.catch(someErrorHandler)
```
"""
@wraps(cor)
def wait(*args, **kwargs):
@wraps(cor)
def _internal(res, rej):
from asyncio import new_event_loop
try:
res(new_event_loop().run_until_complete(cor(*args, **kwargs)))
except BaseException as err:
rej(err)
return Promise(_internal)
return wait
def promisify(fun):
"""
Create promise async function from Python function.
```
@promisify
def heavyTask(somearg, somearg2):
return do_something()
heavyTask(some, some1)\
.then(someResultHandler)\
.catch(someErrorHandler)
```
"""
@wraps(fun)
def wait(*args, **kwargs):
@wraps(fun)
def _internal(res, rej):
try:
res(fun(*args, **kwargs))
except BaseException as err:
rej(err)
return Promise(_internal)
return wait
class Promise:
"""
Asynchronously execute heavy tasks.
This class is alternative for any means of async operations of Python standard library.
constructor parameters:
task: callable takes two argments, first is resolve function and second is reject function.
examples:
```
def heavy_task(resolve, reject):
result = do_heavy_task()
# 'resolves' with result.
# When you call this function, promise object
# will be changed state to 'resolved'
# In other words, the task will marked as 'successfully done'
resolve(result)
task = Promise(heavy_task)
task.then(print)
def heavy_task2(resolve, reject):
result = do_heavy_task2()
if result == "some unexpected value":
# 'rejects' with reason.
# When you call this function, promise object
# will be changed state to 'rejected'
# This means the task will marked as 'failed'
reject(RuntimeError("got unexpected value"))
return
resolve(result)
task = Promise(heavy_task)
# 'catch' method is used to set error handler.
# And 'then' method and 'catch' method can be chained.
task.then(print).catch(print)
```
"""
pool = ThreadPoolExecutor()
def __init__(self, task):
self.task = task
self.state = { '.': 'prepared', 'result': None }
self.resolve_handlers = []
self.reject_handlers = []
self._future = Promise.pool.submit(self._run)
@property
def resolved(self):
return self.state['.'] == 'done'
@property
def rejected(self):
return self.state['.'] == 'rejected'
@property
def pending(self):
return self.state['.'] == 'pending'
@property
def cancelled(self):
return self.state['.'] == 'cancelled'
@property
def result(self):
return self.state['result']
def _run(self):
self.state['.'] = 'pending'
try:
self.task(self.resolve, self.reject)
except BaseException as e:
self.reject(e)
async def __await__(self):
while self.pending:
pass
if self.resolved:
return self.state['result']
raise self.state['result']
def resolve(self, result=None):
if not self.pending:
return
self.state['.'] = 'done'
self.state['result'] = result
for hdr in self.resolve_handlers:
hdr(result)
def reject(self, error):
if not self.pending:
return
self.state['.'] = 'rejected'
self.state['result'] = error
for hdr in self.reject_handlers:
hdr(error)
def then(self, handler, errorHandler=None):
"""
Return new Promise which is rejected
when this Promise is marked as rejected,
or resolved when this Promise is marked as resolved.
If the result handler returns a new Promise,
returned Promise is resolved when the Promise is marked as resolved,
or rejected when this Promise or the Promise is marked as rejected.
"""
if errorHandler:
self.catch(errorHandler)
if self.resolved:
handler(self.state['result'])
return self
prom = Promise.never()
def _handler(value):
try:
val = handler(value)
except BaseException as err:
prom.reject(err)
return
if isinstance(val, Promise):
val.then(prom.resolve).catch(prom.reject)
return
prom.resolve(value)
self.resolve_handlers.append(_handler)
self.reject_handlers.append(prom.reject)
return prom
def catch(self, handler):
"""
Return new Promise which is rejected
when this Promise is marked as rejected,
or resolved when this Promise is marked as resolved.
"""
if self.rejected:
handler(self.state['result'])
return self
prom = Promise.never()
def _handler(err):
try:
prom.reject(err)
except:
pass
handler(err)
self.reject_handlers.append(_handler)
self.resolve_handlers.append(prom.resolve)
return prom
def finally_(self, handler):
handler = lambda _: handler()
return self.catch(handler).then(handler)
def cancel(self):
if not self.pending:
raise ValueError(f"task '{self.task.__qualname__}' is already {self.state['.']}.")
self._future.cancel()
try:
self.reject(Cancelled('task was cancelled from main or another thread.'))
except Cancelled:
pass
self.state['.'] = 'cancelled'
def wait(self):
self.catch(lambda x: None)
while self.pending:
sleep(0.01)
if self.rejected:
raise self.result
return self.result
def wait_until_timeout(self, timeout=5):
self.catch(lambda x: None)
start = time()
while self.pending:
if time() < start + timeout * 1000:
self.cancel()
raise TimeoutError()
sleep(0.01)
if self.rejected:
raise self.result
return self.result
def __repr__(self):
return f"<Promise object({self.state['.']}) on task '{self.task.__qualname__}' at {hex(id(self))}>"
def __str__(self):
return f"<Promise object({self.state['.']}) on task '{self.task.__qualname__}' at {hex(id(self))}>"
@classmethod
def never(cls):
return Promise(lambda *_: None)
@classmethod
def none(cls):
return cls(lambda res, _: res())
@classmethod
def fail(cls):
return cls(lambda x, rej: rej(Exception()))
@classmethod
def all(cls, promises):
def wait(resolve, reject):
error = None
def catch(err):
nonlocal error
error = err
for p in promises:
p.catch(catch)
while not error and any(p.pending for p in promises):
pass
if error:
reject(error)
else:
resolve([p.result for p in promises])
return cls(wait)
@classmethod
def allSettled(cls, promises):
def wait(resolve, reject):
for p in promises:
p.catch(lambda _: None)
while any(p.pending for p in promises):
pass
resolve([{ "settle": p.state['.'], "result": p.result } for p in promises])
return cls(wait)
@classmethod
def race(cls, promises):
def wait(resolve, reject):
for p in promises:
p.then(resolve).catch(reject)
return cls(wait)
from promise import Promise, promisify
@promisify
def test_task_1():
a = 0
for i in range(1000):
for j in range(1000):
a += i * j
return a
p1 = test_task_1().then(print)
@promisify
def test_task_2(value):
x = hex(value)[2:]
a = []
for c in x:
for d in x:
for e in x:
a.append(c+d+e)
return a
p2 = test_task_2(8043).then(print)
p3 = test_task_1().then(test_task_2).then(print)
Promise.all([p1, p2, p3]).wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment