Created
September 27, 2017 17:39
-
-
Save djmunro/55aaddef25efe5ac9edb7af61505fe82 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import ctypes | |
import sys | |
import threading | |
from asyncio.base_futures import _PENDING | |
from functools import wraps | |
from typing import Callable, List, Generic, TypeVar | |
import aioprocessing | |
class timeout: | |
"""timeout context manager. | |
Useful in cases when you want to apply timeout logic around block | |
of code or in cases when asyncio.wait_for is not suitable. For example: | |
>>> with timeout(0.001): | |
... async with aiohttp.get('https://github.com') as r: | |
... await r.text() | |
timeout - value in seconds or None to disable timeout logic | |
loop - asyncio compatible event loop | |
""" | |
def __init__(self, timeout, *, loop=None): | |
if timeout is not None and timeout == 0: | |
timeout = None | |
self._timeout = timeout | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
self._loop = loop | |
self._task = None | |
self._cancelled = False | |
self._cancel_handler = None | |
async def __aenter__(self): | |
if self._timeout is not None: | |
self._task = current_task(self._loop) | |
if self._task is None: | |
raise RuntimeError('Timeout context manager should be used ' | |
'inside a task') | |
self._cancel_handler = self._loop.call_later( | |
self._timeout, self._cancel_task) | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
if exc_type is asyncio.CancelledError and self._cancelled: | |
self._cancel_handler = None | |
self._task = None | |
raise asyncio.TimeoutError from None | |
if self._timeout is not None and self._cancel_handler is not None: | |
self._cancel_handler.cancel() | |
self._cancel_handler = None | |
self._task = None | |
def _cancel_task(self): | |
self._cancelled = self._task.cancel() | |
def current_task(loop): | |
task = asyncio.Task.current_task(loop=loop) | |
if task is None: | |
if hasattr(loop, 'current_task'): | |
task = loop.current_task() | |
return task | |
class Timeout(object): | |
def __init__(self, seconds): | |
self.seconds = seconds | |
self.task = asyncio.Task.current_task() | |
async def __aenter__(self): | |
async def _timer(): | |
await asyncio.sleep(self.seconds) | |
self.task.cancel() | |
#self.task.set_exception(TimeoutError(f'Timed out after {self.seconds} seconds')) | |
self._timer = asyncio.ensure_future(_timer()) | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
print('leaving!') | |
self._timer.cancel() | |
T = TypeVar('T') | |
class Emitter(Generic[T]): | |
def __init__(self, loop): | |
self.listeners: List[Emitter._Listener] = [] | |
self.loop = loop | |
def emit(self, message : T): | |
for listener in self.listeners: | |
self.loop.call_soon_threadsafe(asyncio.async, listener.callback(message)) | |
def listen(self, callback : Callable[[T], None]): | |
return Emitter._Listener(self.listeners, callback) | |
class _Listener(object): | |
def __init__(self, listeners, callback): | |
self.callback : Callable[[T], None] = callback | |
self.listeners : List[Emitter._Listener] = listeners | |
async def __aenter__(self): | |
self.listeners.append(self) | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
self.listeners.remove(self) | |
def async_timeout(s): | |
def decorator(f): | |
@wraps(f) | |
async def wrapper(*args, **kwargs): | |
async with timeout(s): | |
return await f(*args, **kwargs) | |
return wrapper | |
return decorator | |
class AioProcessCancel2(asyncio.Future): | |
def cancel(self, *args, **kwargs): | |
self.p.terminate() | |
class AioProcessCancel(object): | |
def __init__(self, target, *args, **kwargs): | |
self.queue = aioprocessing.AioQueue() | |
self.p = aioprocessing.AioProcess(target=_process_wrapper, args=[target, self.queue] + list(args), kwargs=kwargs, daemon=True) | |
self.p.start() | |
async def get(self): | |
try: | |
return await self.queue.coro_get() | |
finally: | |
self.queue.close() | |
def terminate(self): | |
self.p.terminate() | |
self.p.join() | |
self.queue.close() | |
def _process_wrapper(f, queue, *args, **kwargs): | |
queue.put(f(*args, **kwargs)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment