Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Last active January 20, 2020 12:40
Show Gist options
  • Select an option

  • Save maxfischer2781/b8a2b9e21f3737114f4cae5c6e01473b to your computer and use it in GitHub Desktop.

Select an option

Save maxfischer2781/b8a2b9e21f3737114f4cae5c6e01473b to your computer and use it in GitHub Desktop.
Variant of usim.Pipe that allows load monitoring
from usim import Pipe, instant
from usim._primitives.notification import Notification
class MonitoredPipe(Pipe):
def __init__(self, throughput: float):
super().__init__(throughput)
self._monitor = Notification()
async def load(self):
"""
Monitor any changes of the throughput load of the pipe
.. code:: python3
async def report_load(pipe: MonitoredPipe):
async for throughput in pipe.load():
print(f'{time.now:6.0f}: {throughput} \t [{throughput / pipe.throughput * 100:03.0f}%]')
"""
await instant
yield sum(self._subscriptions.values())
while True:
await self._monitor
yield sum(self._subscriptions.values())
def _throttle_subscribers(self):
self._monitor.__awake_all__()
super()._throttle_subscribers()
if __name__ == '__main__':
from usim import time, run, Scope
async def report_load(pipe: MonitoredPipe):
async for throughput in pipe.load():
print(f'{time.now:6.0f}: {throughput} \t [{throughput / pipe.throughput * 100:03.0f}%]')
async def perform_load(pipe: MonitoredPipe, delay, amount):
await (time + delay)
await pipe.transfer(amount, pipe.throughput / 2)
async def main():
pipe = MonitoredPipe(128)
async with Scope() as scope:
scope.do(report_load(pipe), volatile=True)
scope.do(perform_load(pipe, 0, 512))
scope.do(perform_load(pipe, 4, 1024))
scope.do(perform_load(pipe, 6, 128))
scope.do(perform_load(pipe, 12, 1024))
run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment