Skip to content

Instantly share code, notes, and snippets.

@oeway
Last active December 26, 2020 19:57
Show Gist options
  • Save oeway/9a2d5ca5b5920c2929cb05c9ff78e43d to your computer and use it in GitHub Desktop.
Save oeway/9a2d5ca5b5920c2929cb05c9ff78e43d to your computer and use it in GitHub Desktop.
import js
import asyncio
from webloop import WebLoop
# set the event loop explicitly
loop = WebLoop()
asyncio.set_event_loop(loop)
# or get the event loop directly
# loop = asyncio.get_event_loop()
async def sleep_task():
print('start sleeping for 5s')
await asyncio.sleep(5)
print('sleeping done')
loop.run_until_complete(sleep_task())
js.eval('''
async function asyncFetch(url){
const response = await fetch(url)
const data = await response.text()
return data
}
''')
def wrap_promise(promise):
loop = asyncio.get_event_loop()
fut = loop.create_future()
def set_exception(e):
fut.set_exception(Exception(str(e)))
promise.then(fut.set_result).catch(set_exception)
return fut
async def fetch_task():
print('fetching data...')
promise = asyncFetch('https://raw.githubusercontent.com/imjoy-team/ImJoy/master/web/package.json')
result = await wrap_promise(promise)
print('finished: ',dict(result))
# loop.run_until_complete(fetch_task())
asyncio.ensure_future(fetch_task())
loop.run_forever()
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# flake8: noqa
"""
A custom event loop for running asyncio in Pyodide
Version: 0.1.2
Author: Wei OUYANG (@oeway)
Adapted from the EventSimulator made by @damonjw
https://gist.github.com/damonjw/35aac361ca5d313ee9bf79e00261f4ea
Version History:
* support contextvars
* add event loop policy
* throw error when the loop is already running
* support Python 3.8
* fix run_until_complete
"""
import heapq
import asyncio
import time
import traceback
import contextvars
import js
from typing import Dict, Tuple, Optional, Awaitable, Callable
class WebLoop(asyncio.AbstractEventLoop):
"""A custom event loop for running asyncio in Pyodide
It works by utilizing the browser event loop via the setTimeout function
"""
def __init__(self, debug: Optional[bool] = False, interval: Optional[int] = 10):
"""
Instatiate the web loop
"""
self._running = False
self._immediate = []
self._scheduled = []
self._futures = []
self._next_handle = None
self._debug = debug
self._stop = False
self._interval = interval
self._timeout_promise = js.eval(
"self._timeoutPromise = function(time){return new Promise((resolve)=>{setTimeout(resolve, time);});}"
)
self._exception_handler = None
self._task_factory = self._default_task_factory
self._until_complete = None
self._result = None
self._exception = None
def get_debug(self):
"""
Get the debug mode (bool) of the event loop.
"""
return self._debug
def set_debug(self, enabled: bool):
"""
Set the debug mode of the event loop.
"""
self._debug = enabled
def time(self):
"""
Return the current time, as a float value, according to the time module (time.time()).
"""
return time.time()
def run_forever(self):
"""
Run the event loop until stop() is called.
Note that this function is different from the standard asyncio loop implementation, it won't block the execution
"""
if self._running:
raise RuntimeError("This event loop is already running")
self._stop = False
if asyncio.get_event_loop() == self:
asyncio._set_running_loop(self)
if not self._running:
self._do_tasks(forever=True)
def run_until_complete(self, future: Awaitable):
"""
Run until the future (an instance of Future) has completed.
Note that this function is different from the standard asyncio loop in two ways:
1) It won't block the execution
2) It returns a Promise object
Parameters
----------
future
A future or coroutine object
Returns
-------
A Promise object with the returned result or exception
"""
if self._running:
raise RuntimeError("This event loop is already running")
def run(resolve, reject):
asyncio.ensure_future(future)
if asyncio.get_event_loop() == self:
asyncio._set_running_loop(self)
self._stop = False
if not self._running:
self._do_tasks(
until_complete=(
resolve,
reject,
)
)
return js.Promise.new(run)
def _do_tasks(
self,
until_complete: Optional[Tuple] = None,
forever: Optional[bool] = False,
):
"""
Do the tasks
"""
self._until_complete = until_complete
self._exception = None
self._result = None
self._running = True
if self._stop:
self._quit_running()
return
while len(self._immediate) > 0:
h = self._immediate[0]
self._immediate = self._immediate[1:]
if not h._cancelled:
h._run()
if self._stop:
self._quit_running()
return
if self._next_handle is not None:
if self._next_handle._cancelled:
self._next_handle = None
if self._scheduled and self._next_handle is None:
h = heapq.heappop(self._scheduled)
h._scheduled = True
self._next_handle = h
if self._next_handle is not None and self._next_handle._when <= self.time():
h = self._next_handle
self._next_handle = None
self._immediate.append(h)
not_finished = (
self._immediate or self._scheduled or self._next_handle or self._futures
)
if forever or (until_complete and not_finished):
self._timeout_promise(self._interval).then(
lambda x: self._do_tasks(until_complete=until_complete, forever=forever)
)
else:
self._quit_running()
def _quit_running(self):
"""
Quit running
"""
if asyncio.get_event_loop() == self:
asyncio._set_running_loop(None)
self._running = False
if self._until_complete:
resolve, reject = self._until_complete
self._until_complete = None
if self._exception:
reject(self._exception)
else:
resolve(self._result)
def _timer_handle_cancelled(self, handle: Callable):
"""
Timer handle canlled
"""
pass
def is_running(self) -> bool:
"""
Return True if the event loop is currently running.
"""
return self._running
def is_closed(self) -> bool:
"""
Return True if the event loop was closed.
"""
return not self._running
def stop(self):
"""
Stop the event loop.
"""
self._stop = True
self._quit_running()
def close(self):
"""
Close the event loop.
"""
self._stop = True
self._quit_running()
def shutdown_asyncgens(self):
raise NotImplementedError
def shutdown_default_executor(self):
raise NotImplementedError
def default_exception_handler(self, context: Dict) -> Callable:
"""
Default exception handler.
"""
js.console.error(context.get("message"))
def call_exception_handler(self, context: Dict):
"""
Call the current event loop exception handler.
Parameters
----------
context
context is a dict object containing the following keys (new keys may be introduced in future Python versions):
‘message’: Error message;
‘exception’ (optional): Exception object;
‘future’ (optional): asyncio.Future instance;
‘handle’ (optional): asyncio.Handle instance;
‘protocol’ (optional): Protocol instance;
‘transport’ (optional): Transport instance;
‘socket’ (optional): socket.socket instance.
"""
if self._exception_handler:
self._exception_handler(self, context)
else:
self.default_exception_handler(context)
def set_exception_handler(self, handler: Optional[Callable]):
"""
Set handler as the new event loop exception handler.
Parameters
----------
handler
If handler is None, the default exception handler will be set.
Otherwise, handler must be a callable with the signature matching (loop, context),
where loop is a reference to the active event loop, and context is a dict object
containing the details of the exception (see call_exception_handler() documentation
for details about context).
"""
self._exception_handler = handler
def get_exception_handler(self):
"""
Return the current exception handler, or None if no custom exception handler was set.
"""
return self._exception_handler
def call_soon(self, callback: Callable, *args, context: contextvars.Context = None):
"""
Schedule the callback callback to be called with args arguments at the next iteration of the event loop.
"""
h = asyncio.Handle(callback, args, self, context=context)
self._immediate.append(h)
return h
def call_soon_threadsafe(
callback: Callable, *args, context: contextvars.Context = None
):
"""
A thread-safe variant of call_soon().
Note this function is different from the standard asyncio loop implementation, it is current exactly the same as call_soon
"""
return self.call_soon(callback, *args, context=context)
def call_later(
self,
delay: float,
callback: Callable,
*args,
context: contextvars.Context = None
):
"""
Schedule callback to be called after the given delay number of seconds (can be either an int or a float).
"""
if delay < 0:
raise Exception("Can't schedule in the past")
return self.call_at(self.time() + delay, callback, *args, context=context)
def call_at(
self,
when: float,
callback: Callable,
*args,
context: contextvars.Context = None
):
"""
Schedule callback to be called at the given absolute timestamp when (an int or a float), using the same time reference as loop.time().
"""
if when < self.time():
raise Exception("Can't schedule in the past")
h = asyncio.TimerHandle(when, callback, args, self, context=context)
heapq.heappush(self._scheduled, h)
h._scheduled = True
return h
def create_task(self, coro: Awaitable, name: Optional[str] = None) -> asyncio.Task:
"""
Schedule the execution of a Coroutines. Return a Task object.
"""
return self._task_factory(self, coro, name=name)
def _default_task_factory(
self,
loop: asyncio.AbstractEventLoop,
coro: Awaitable,
name: Optional[str] = None,
):
"""
The default task factory
"""
async def wrapper():
try:
self._result = await coro
except Exception as e:
self._exception = e
self.call_exception_handler(
{"message": traceback.format_exc(), "exception": e}
)
return asyncio.Task(wrapper(), loop=self, name=name)
def create_future(self):
"""
Create an asyncio.Future object attached to the event loop.
"""
fut = asyncio.Future(loop=self)
def remove_fut(*args):
self._futures.remove(fut)
fut.add_done_callback(remove_fut)
self._futures.append(fut)
return fut
def set_task_factory(self, factory: Callable):
"""
Set the task factory
"""
self._task_factory = factory
def get_task_factory(
self,
):
"""
Return a task factory or None if the default one is in use.
"""
if self._task_factory == self._default_task_factory:
return None
return self._task_factory
class WebLoopPolicy(asyncio.DefaultEventLoopPolicy):
"""
A simple event loop policy for managing WebLoop based event loops.
"""
def __init__(self):
"""
Instantiate the web loop policy
"""
self._default_loop = None
def get_event_loop(self):
"""
Get the current event loop
"""
if self._default_loop is None:
self._default_loop = WebLoop()
return self._default_loop
def new_event_loop(self):
"""
Create a new event loop
"""
self._default_loop = WebLoop()
return self._default_loop
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
"""
Set the current event loop
"""
self._default_loop = loop
def get_child_watcher(self):
"""
Get the child watcher
"""
raise NotImplementedError
def set_child_watcher(self):
"""
Set the child wather
"""
raise NotImplementedError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment