Skip to content

Instantly share code, notes, and snippets.

@djmunro
Created September 27, 2017 17:39
Show Gist options
  • Save djmunro/55aaddef25efe5ac9edb7af61505fe82 to your computer and use it in GitHub Desktop.
Save djmunro/55aaddef25efe5ac9edb7af61505fe82 to your computer and use it in GitHub Desktop.
import asyncio
import ctypes
import sys
import threading
from asyncio.base_futures import _PENDING
from functools import wraps
from typing import Callable, List, Generic, TypeVar
import aioprocessing
class timeout:
"""timeout context manager.
Useful in cases when you want to apply timeout logic around block
of code or in cases when asyncio.wait_for is not suitable. For example:
>>> with timeout(0.001):
... async with aiohttp.get('https://github.com') as r:
... await r.text()
timeout - value in seconds or None to disable timeout logic
loop - asyncio compatible event loop
"""
def __init__(self, timeout, *, loop=None):
if timeout is not None and timeout == 0:
timeout = None
self._timeout = timeout
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._task = None
self._cancelled = False
self._cancel_handler = None
async def __aenter__(self):
if self._timeout is not None:
self._task = current_task(self._loop)
if self._task is None:
raise RuntimeError('Timeout context manager should be used '
'inside a task')
self._cancel_handler = self._loop.call_later(
self._timeout, self._cancel_task)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
raise asyncio.TimeoutError from None
if self._timeout is not None and self._cancel_handler is not None:
self._cancel_handler.cancel()
self._cancel_handler = None
self._task = None
def _cancel_task(self):
self._cancelled = self._task.cancel()
def current_task(loop):
task = asyncio.Task.current_task(loop=loop)
if task is None:
if hasattr(loop, 'current_task'):
task = loop.current_task()
return task
class Timeout(object):
def __init__(self, seconds):
self.seconds = seconds
self.task = asyncio.Task.current_task()
async def __aenter__(self):
async def _timer():
await asyncio.sleep(self.seconds)
self.task.cancel()
#self.task.set_exception(TimeoutError(f'Timed out after {self.seconds} seconds'))
self._timer = asyncio.ensure_future(_timer())
async def __aexit__(self, exc_type, exc_val, exc_tb):
print('leaving!')
self._timer.cancel()
T = TypeVar('T')
class Emitter(Generic[T]):
def __init__(self, loop):
self.listeners: List[Emitter._Listener] = []
self.loop = loop
def emit(self, message : T):
for listener in self.listeners:
self.loop.call_soon_threadsafe(asyncio.async, listener.callback(message))
def listen(self, callback : Callable[[T], None]):
return Emitter._Listener(self.listeners, callback)
class _Listener(object):
def __init__(self, listeners, callback):
self.callback : Callable[[T], None] = callback
self.listeners : List[Emitter._Listener] = listeners
async def __aenter__(self):
self.listeners.append(self)
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.listeners.remove(self)
def async_timeout(s):
def decorator(f):
@wraps(f)
async def wrapper(*args, **kwargs):
async with timeout(s):
return await f(*args, **kwargs)
return wrapper
return decorator
class AioProcessCancel2(asyncio.Future):
def cancel(self, *args, **kwargs):
self.p.terminate()
class AioProcessCancel(object):
def __init__(self, target, *args, **kwargs):
self.queue = aioprocessing.AioQueue()
self.p = aioprocessing.AioProcess(target=_process_wrapper, args=[target, self.queue] + list(args), kwargs=kwargs, daemon=True)
self.p.start()
async def get(self):
try:
return await self.queue.coro_get()
finally:
self.queue.close()
def terminate(self):
self.p.terminate()
self.p.join()
self.queue.close()
def _process_wrapper(f, queue, *args, **kwargs):
queue.put(f(*args, **kwargs))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment