Created
July 13, 2015 19:01
-
-
Save jck/0d3549435a8e60e11995 to your computer and use it in GitHub Desktop.
Experiment to implement MyHDL style simulations using python 3.5 and asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import heapq | |
class StopSim(BaseException): | |
"""Raised to stop the simulation""" | |
class SimEventLoop(asyncio.AbstractEventLoop): | |
def __init__(self): | |
self._time = 0 | |
self._scheduled = [] | |
self._ready = [] | |
self._debug = False | |
def get_debug(self): | |
return self._debug | |
def create_task(self, coro): | |
task = asyncio.Task(coro, loop=self) | |
return task | |
def time(self): | |
return self._time | |
def call_soon(self, callback, *args): | |
# print('call soon', callback) | |
handle = asyncio.Handle(callback, args, self) | |
handle._run() | |
def call_later(self, delay, callback, *args): | |
# print('call_later', delay, callback) | |
timer = self.call_at(self.time() + delay, callback, *args) | |
return timer | |
def call_at(self, when, callback, *args): | |
# print('call_at', when, callback) | |
timer = asyncio.TimerHandle(when, callback, args, self) | |
heapq.heappush(self._scheduled, timer) | |
timer._scheduled = True | |
return timer | |
def _timer_handle_cancelled(self, handle): | |
# print('hc') | |
pass | |
def _run_once(self): | |
# print(self.time(), self._scheduled) | |
while self._scheduled: | |
handle = self._scheduled[0] | |
if handle._when > self._time: | |
break | |
handle = heapq.heappop(self._scheduled) | |
handle._scheduled = False | |
handle._run() | |
self._time += 1 | |
def run_forever(self): | |
while True: | |
try: | |
self._run_once() | |
except StopSim: | |
break | |
def run_until_complete(self, future): | |
new_task = not isinstance(future, asyncio.Future) | |
future = asyncio.ensure_future(future, loop=self) | |
if new_task: | |
future._log_destroy_pending = False | |
def lol(fut): | |
raise StopSim | |
# print('done') | |
future.add_done_callback(lol) | |
try: | |
self.run_forever() | |
except: | |
if new_task and future.done() and not future.cancelled(): | |
print('canc') | |
future.exception() | |
raise | |
future.remove_done_callback(lol) | |
return future.result() | |
def delay(time, result=None, *, loop=None): | |
async def _delay(): | |
"""Coroutine that completes after a given time (in seconds).""" | |
# based on asyncio.sleep | |
future = asyncio.Future(loop=loop) | |
h = future._loop.call_later(time, | |
future._set_result_unless_cancelled, result) | |
try: | |
return (await future) | |
finally: | |
h.cancel() | |
return _delay | |
def now(loop=None): | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
return loop.time() | |
def always(cond): | |
def decorator(func): | |
async def wrapper(): | |
while True: | |
await cond() | |
func() | |
return wrapper | |
return decorator | |
@always(delay(3)) | |
def hi(): | |
t = asyncio.get_event_loop().time() | |
print(now(), 'hi') | |
def main(): | |
loop = SimEventLoop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(hi()) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi jck, I recently had the same idea to attempt our TLM with asyncio, mostly to gain access to the existing features like locks and semaphores. Did this experiment work? Any chance that myhdl might move to asyncio?