Created
September 15, 2017 04:20
-
-
Save henry232323/455c142590e42dd3f4c27d31a982f1d8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from types import coroutine | |
from collections import deque | |
from heapq import heappop, heappush | |
from traceback import print_exc | |
from inspect import iscoroutine | |
import time | |
@coroutine | |
def ayield(): | |
yield | |
@coroutine | |
def sleep(seconds): | |
yield ("sleep", time.time() + seconds) | |
@coroutine | |
def sleeperator(): | |
yield | |
return 5 | |
async def sleeper(tim): | |
await sleep(tim) | |
print("banane") | |
return "Bon hommie" | |
class Task: | |
def __init__(self, task, data): | |
self._task = task | |
self._data = data | |
self.result = None | |
self.__set = False | |
self.complete = False | |
def result(self): | |
if not self.__set: | |
raise RuntimeError("Result isn't ready!") | |
return self.result | |
def set_result(self, data): | |
self.complete = True | |
self.result = data | |
class Loop: | |
running = False | |
def __init__(self): | |
self._queue = deque() | |
self._timers = list() | |
def run_until_complete(self, task): | |
tasks = deque([Task(task, None)]) | |
while tasks or self._timers: | |
self._queue.extend(tasks) | |
tasks.clear() | |
if not tasks and self._timers: | |
time.sleep(max(0.0, self._timers[0][0] - time.time())) | |
while self._timers and self._timers[0][0] < time.time(): | |
_, task = heappop(self._timers) | |
tasks.append(task) | |
while self._queue: | |
task = self._queue.popleft() | |
try: | |
task._data = task._task.send(task._data) | |
except StopIteration as err: | |
task.set_result(err.value) | |
except Exception: | |
print_exc() | |
else: | |
if task._data and task._data[0] == 'sleep': | |
heappush(self._timers, (task._data[1], task)) | |
else: | |
if iscoroutine(task._data): | |
tasks.append(task._data) | |
tasks.append(task) | |
return task.result | |
def create_task(self, task): | |
self._queue.append((task, None)) | |
def close(self): | |
self.running = False | |
loop = Loop() | |
loop.run_until_complete(sleeper(5)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment