Skip to content

Instantly share code, notes, and snippets.

@nakamuray
Last active March 2, 2016 13:56
Show Gist options
  • Save nakamuray/8c59e6fb79ade6540e80 to your computer and use it in GitHub Desktop.
Save nakamuray/8c59e6fb79ade6540e80 to your computer and use it in GitHub Desktop.
asyncio な関数を同期的なコードから呼びやすくするようなやつ
'''library to support calling asyncio functions from synchronous code
'''
import asyncio
import functools
import queue
import threading
def create_and_start_loop():
loop = asyncio.new_event_loop()
def _start_loop():
asyncio.set_event_loop(loop)
loop.run_forever()
t = threading.Thread(target=_start_loop)
t.daemon = True
t.start()
return loop
loop = create_and_start_loop()
class AsyncResult(object):
def __init__(self, target, args, kwargs):
self._target = target
self._args = args
self._kwargs = kwargs
self._queue = queue.Queue(1)
loop.call_soon_threadsafe(self._run)
def _run(self):
asyncio.ensure_future(self._async_run())
async def _async_run(self):
# TODO: error handling
result = await self._target(*self._args, **self._kwargs)
self._queue.put(result)
def wait(self, timeout=None):
# TODO: what should this function do if called more than once?
return self._queue.get(timeout=timeout)
def run_in_async(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return AsyncResult(func, args, kwargs)
return wrapper
def test():
@run_in_async
async def heavy_process():
await asyncio.sleep(3)
return 2
hs = [heavy_process() for _ in range(21)]
print('waiting...')
results = [h.wait() for h in hs]
print(sum(results))
import aiohttp
@run_in_async
async def get_page(url):
resp = await aiohttp.get(url)
body = await resp.read()
return body
html = get_page('http://localhost/').wait()
print(html.decode('utf-8'))
if __name__ == '__main__':
#test()
threading.Thread(target=test).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment