-
-
Save tdamsma/37fe65cee356ce6c0fc00801932d044e to your computer and use it in GitHub Desktop.
Event driven simulator in Python, using async/await
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2021 Damon Wischik. See LICENSE for permissions. | |
import heapq | |
import asyncio | |
class EventSimulator(asyncio.AbstractEventLoop): | |
'''A simple event-driven simulator, using async/await''' | |
def __init__(self): | |
self._time = 0 | |
self._running = False | |
self._immediate = [] | |
self._scheduled = [] | |
self._exc = None | |
def get_debug(self): | |
# A silly hangover from the way asyncio is written | |
return False | |
def time(self): | |
return self._time | |
# Methods for running the event loop. | |
# For a real asyncio system you need to worry a lot more about running+closed. | |
# For a simple simulator, we completely control the passage of time, so most of | |
# these functions are trivial. | |
# | |
# I've split the pending events into _immediate and _scheduled. | |
# I could put them all in _scheduled; but if there are lots of | |
# _immediate calls there's no need for them to clutter up the heap. | |
def run_forever(self): | |
self._running = True | |
while (self._immediate or self._scheduled) and self._running: | |
if self._immediate: | |
h = self._immediate[0] | |
self._immediate = self._immediate[1:] | |
else: | |
h = heapq.heappop(self._scheduled) | |
self._time = h._when | |
h._scheduled = False # just for asyncio.TimerHandle debugging? | |
if not h._cancelled: | |
h._run() | |
if self._exc is not None: | |
raise self._exc | |
def run_until_complete(self, future): | |
raise NotImplementedError | |
def _timer_handle_cancelled(self, handle): | |
# We could remove the handle from _scheduled, but instead we'll just skip | |
# over it in the "run_forever" method. | |
pass | |
def is_running(self): | |
return self._running | |
def is_closed(self): | |
return not self._running | |
def stop(self): | |
self._running = False | |
def close(self): | |
self._running = False | |
def shutdown_asyncgens(self): | |
pass | |
def call_exception_handler(self, context): | |
# If there is any exception in a callback, this method gets called. | |
# I'll store the exception in self._exc, so that the main simulation loop picks it up. | |
self._exc = context.get('exception', None) | |
# Methods for scheduling jobs. | |
# | |
# If the job is a coroutine, the end-user should call asyncio.ensure_future(coro()). | |
# The asyncio machinery will invoke loop.create_task(). Asyncio will then | |
# run the coroutine in pieces, breaking it apart at async/await points, and every time it | |
# will construct an appropriate callback and call loop.call_soon(cb). | |
# | |
# If the job is a plain function, the end-user should call one of the loop.call_*() | |
# methods directly. | |
def call_soon(self, callback, *args): | |
h = asyncio.Handle(callback, args, self) | |
self._immediate.append(h) | |
return h | |
def call_later(self, delay, callback, *args): | |
if delay < 0: | |
raise Exception("Can't schedule in the past") | |
return self.call_at(self._time + delay, callback, *args) | |
def call_at(self, when, callback, *args): | |
if when < self._time: | |
raise Exception("Can't schedule in the past") | |
h = asyncio.TimerHandle(when, callback, args, self) | |
heapq.heappush(self._scheduled, h) | |
h._scheduled = True # perhaps just for debugging in asyncio.TimerHandle? | |
return h | |
def create_task(self, coro): | |
# Since this is a simulator, I'll run a plain simulated-time event loop, and | |
# if there are any exceptions inside any coroutine I'll pass them on to the | |
# event loop, so it can halt and print them out. To achieve this, I need the | |
# exception to be caught in "async mode" i.e. by a coroutine, and then | |
# use self._exc to pass it on to "regular execution mode". | |
async def wrapper(): | |
try: | |
await coro | |
except Exception as e: | |
print("Wrapped exception") | |
self._exc = e | |
return asyncio.Task(wrapper(), loop=self) | |
def create_future(self): | |
# Not sure why this is here rather than in AbstractEventLoop. | |
return asyncio.Future(loop=self) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2021 Damon Wischik. See LICENSE for permissions. | |
import asyncio, heapq | |
from eventsim import EventSimulator | |
class ProcessorSharingQueue: | |
def __init__(self, service_rate=1, loop=None): | |
self._service_rate = service_rate | |
self._queue = [] | |
self._loop = loop if loop else asyncio.get_event_loop() | |
self._done = None | |
self._work_done = 0 | |
self._last_time = self._loop.time() | |
def process(self, work): | |
t = self._advance_clock() | |
fut = self._loop.create_future() | |
w = work / self._service_rate | |
heapq.heappush(self._queue, (self._work_done+w, t, fut)) | |
if self._done: | |
self._done.cancel() | |
self._schedule() | |
return fut | |
def complete(self): | |
t = self._advance_clock() | |
(_, tstart, fut) = heapq.heappop(self._queue) | |
fut.set_result(t - tstart) | |
self._schedule() | |
def _advance_clock(self): | |
t = self._loop.time() | |
if self._queue: | |
self._work_done += (t - self._last_time) / len(self._queue) | |
self._last_time = t | |
return t | |
def _schedule(self): | |
if not self._queue: | |
self._done = None | |
else: | |
w,_,_ = self._queue[0] | |
dt = (w - self._work_done) * len(self._queue) | |
self._done = self._loop.call_later(dt, self.complete) | |
class FIFOQueue: | |
def __init__(self, service_rate=1, loop=None): | |
self._service_rate = service_rate | |
self._queue = [] | |
self._loop = loop if loop else asyncio.get_event_loop() | |
self._done = None | |
def process(self, work): | |
fut = self._loop.create_future() | |
w = work / self._service_rate | |
self._queue.append((w, fut)) | |
if not self._done: | |
self._done = self._loop.call_later(w, self.complete) | |
return fut | |
def complete(self): | |
w,fut = self._queue[0] | |
fut.set_result(w) | |
self._queue = self._queue[1:] | |
if self._queue: | |
w,_ = self._queue[0] | |
self._done = self._loop.call_later(w, self.complete) | |
else: | |
self._done = None | |
#------------------------------------------------ | |
loop = EventSimulator() | |
asyncio.set_event_loop(loop) | |
q = ProcessorSharingQueue() | |
async def queueing_job(i=1): | |
print(loop.time(), "Start job {}".format(i)) | |
await asyncio.sleep(i) | |
print(loop.time(), "Sending job {}".format(i)) | |
xmit = q.process(work=4) | |
await xmit | |
print(loop.time(), "Done job {} in time {}".format(i, xmit.result())) | |
asyncio.ensure_future(queueing_job(1)) | |
asyncio.ensure_future(queueing_job(3)) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment