Last active
February 20, 2020 17:51
-
-
Save guilledk/cf54fc80a2116ab42b09939b6bbb3c85 to your computer and use it in GitHub Desktop.
Actor queue implementation, with history, and pub-sub
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import trio | |
class AsyncQueue: | |
def __init__(self): | |
self.history = [] | |
self.inport, self.outport = trio.open_memory_channel(math.inf) | |
self.subs = {} | |
async def match_sub(self, name) -> bool: | |
sub = self.subs[name] | |
matched = False | |
for msg in self.history[sub["rptr"]:]: | |
if sub["match"](msg, sub["*args"]): | |
await sub["queue"].send(msg) | |
matched = True | |
sub["rptr"] += 1 | |
return matched | |
async def sub(self, name, match_cb, args=None): | |
self.subs[name] = { | |
"rptr": 0, | |
"match": match_cb, | |
"*args": args, | |
"queue": AsyncQueue() | |
} | |
await self.match_sub(name) | |
def unsub(self, name): | |
del self.subs[name] | |
async def send(self, msg): | |
self.history.append(msg) | |
propagated = False | |
for sub in self.subs: | |
propagated = propagated or (await self.match_sub(sub)) | |
if not propagated: | |
await self.inport.send(msg) | |
async def receive(self): | |
return await self.outport.receive() | |
async def receive_sub(self, name): | |
return await self.subs[name]["queue"].receive() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.8 | |
import trio | |
import AsyncQueue | |
def type_matcher(*args): | |
msg = args[0] | |
msg_type = args[1] | |
return isinstance(msg, msg_type) | |
def msg_matcher(*args): | |
a = args[0] | |
b = args[1] | |
return a == b | |
async def publisher(queue): | |
msgs = [ | |
0, | |
"stage-0", | |
1, | |
"stage-1", | |
2, | |
"stage-2", | |
3, | |
"stage-4", | |
4 | |
] | |
for msg in msgs: | |
await queue.send(msg) | |
async def subscriber(queue, end): | |
while True: | |
msg = await queue.receive() | |
if msg == end: | |
break | |
print(queue) | |
async def main(): | |
async with trio.open_nursery() as nursery: | |
main_queue = AsyncQueue() | |
int_queue = await main_queue.sub( | |
"integer", | |
type_matcher, | |
args=int | |
) | |
str_queue = await main_queue.sub( | |
"string", | |
type_matcher, | |
args=str | |
) | |
nursery.start_soon(publisher, main_queue) | |
nursery.start_soon(subscriber, str_queue, "stage-4") | |
nursery.start_soon(subscriber, int_queue, 4) | |
await trio.sleep_forever() | |
try: | |
trio.run(main) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This test creates an async queue tree with a main queue and two sub-queues, one for str objects and one for ints.
Then it creates a publisher task that sends some test objects.
Two subscribers are created, one for each sub-queue.
The output should show how the async tree correctly matched the messages with their respective queues.
This is high perfomance focused async queue system, can be used for async pub sub sockets to build a networking library.