Skip to content

Instantly share code, notes, and snippets.

@guilledk
Last active February 20, 2020 17:51
Show Gist options
  • Save guilledk/cf54fc80a2116ab42b09939b6bbb3c85 to your computer and use it in GitHub Desktop.
Save guilledk/cf54fc80a2116ab42b09939b6bbb3c85 to your computer and use it in GitHub Desktop.
Actor queue implementation, with history, and pub-sub
import math
import trio
class AsyncQueue:
def __init__(self):
self.history = []
self.inport, self.outport = trio.open_memory_channel(math.inf)
self.subs = {}
async def match_sub(self, name) -> bool:
sub = self.subs[name]
matched = False
for msg in self.history[sub["rptr"]:]:
if sub["match"](msg, sub["*args"]):
await sub["queue"].send(msg)
matched = True
sub["rptr"] += 1
return matched
async def sub(self, name, match_cb, args=None):
self.subs[name] = {
"rptr": 0,
"match": match_cb,
"*args": args,
"queue": AsyncQueue()
}
await self.match_sub(name)
def unsub(self, name):
del self.subs[name]
async def send(self, msg):
self.history.append(msg)
propagated = False
for sub in self.subs:
propagated = propagated or (await self.match_sub(sub))
if not propagated:
await self.inport.send(msg)
async def receive(self):
return await self.outport.receive()
async def receive_sub(self, name):
return await self.subs[name]["queue"].receive()
#!/usr/bin/env python3.8
import trio
import AsyncQueue
def type_matcher(*args):
msg = args[0]
msg_type = args[1]
return isinstance(msg, msg_type)
def msg_matcher(*args):
a = args[0]
b = args[1]
return a == b
async def publisher(queue):
msgs = [
0,
"stage-0",
1,
"stage-1",
2,
"stage-2",
3,
"stage-4",
4
]
for msg in msgs:
await queue.send(msg)
async def subscriber(queue, end):
while True:
msg = await queue.receive()
if msg == end:
break
print(queue)
async def main():
async with trio.open_nursery() as nursery:
main_queue = AsyncQueue()
int_queue = await main_queue.sub(
"integer",
type_matcher,
args=int
)
str_queue = await main_queue.sub(
"string",
type_matcher,
args=str
)
nursery.start_soon(publisher, main_queue)
nursery.start_soon(subscriber, str_queue, "stage-4")
nursery.start_soon(subscriber, int_queue, 4)
await trio.sleep_forever()
try:
trio.run(main)
except KeyboardInterrupt:
pass
@guilledk
Copy link
Author

This test creates an async queue tree with a main queue and two sub-queues, one for str objects and one for ints.

Then it creates a publisher task that sends some test objects.
Two subscribers are created, one for each sub-queue.

The output should show how the async tree correctly matched the messages with their respective queues.

This is high perfomance focused async queue system, can be used for async pub sub sockets to build a networking library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment