Last active
January 8, 2021 20:24
-
-
Save nurettin/14e077f88b8c1edb22bc810bdbbac49e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from asyncio.tasks import Task | |
from typing import Dict | |
import asyncio | |
from uuid import uuid4 | |
class PiQueConnection: | |
def __init__(self, pique: 'PiQue', uid: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): | |
self.pique = pique | |
self.uid = uid | |
self.reader = reader | |
self.writer = writer | |
self.sub_tasks: Dict[str, Task] = {} | |
print(f"Client({self.uid}) connected.") | |
async def recv(self): | |
while True: | |
bytes = await self.reader.readline() | |
if bytes == b"": | |
self.pique.clients.pop(self.uid) | |
for name, task in list(self.sub_tasks.items()): | |
task.cancel() | |
self.sub_tasks.pop(name, None) | |
print(f"Client({self.uid}) disconnected.") | |
break | |
data = bytes.decode() | |
print(f"Client({self.uid}) -> {data}") | |
if data.startswith("CREATE_QUEUE "): | |
_, name = data.split() | |
self.pique.create_queue(name=name) | |
elif data.startswith("QUEUE "): | |
_, name, message = data.split() | |
self.pique.queue(name=name, message=message) | |
elif data.startswith("SUB "): | |
_, name = data.split() | |
task = self.sub_tasks.get(name) | |
if task is None: | |
queue = self.pique.queues.get(name) | |
if queue is not None: | |
self.sub_tasks[name] = self.pique.loop.create_task( | |
self.sub(name, queue)) | |
elif data.startswith("UNSUB "): | |
_, name = data.split() | |
task = self.sub_tasks.get(name) | |
if task is not None: | |
task.cancel() | |
self.sub_tasks.pop(name, None) | |
async def sub(self, name: str, queue: asyncio.Queue): | |
while True: | |
data = await queue.get() | |
await self.send(message=f"QUEUE {name} {data}") | |
async def send(self, message: str): | |
print(f"Client({self.uid}) <- {message}") | |
self.writer.write(message.encode()) | |
await self.writer.drain() | |
class PiQue: | |
def __init__(self, host: str, port: str, loop=None): | |
self.host = host | |
self.port = port | |
self.loop = asyncio.get_event_loop() if loop is None else loop | |
self.queues: Dict[str, asyncio.Queue] = {} | |
self.clients: Dict[str, PiQueConnection] = {} | |
def run(self): | |
print(f"Starting PiQue v3.141592 {self.host}:{self.port}") | |
self.loop.create_task(asyncio.start_server(client_connected_cb=self.client_connected_cb, | |
host=self.host, port=self.port, loop=self.loop)) | |
self.loop.create_task(self.ping()) | |
self.loop.run_forever() | |
def client_connected_cb(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): | |
new_client = PiQueConnection(pique=self, uid=str( | |
uuid4()), reader=reader, writer=writer) | |
self.clients[new_client.uid] = new_client | |
self.loop.create_task(new_client.recv()) | |
async def ping(self): | |
while True: | |
for _, client in list(self.clients.items()): | |
await client.send("ping") | |
await asyncio.sleep(30) | |
def create_queue(self, name: str): | |
queue = self.queues.get(name) | |
if queue is None: | |
self.queues[name] = asyncio.Queue() | |
def queue(self, name: str, message: str): | |
queue = self.queues.get(name) | |
if queue is not None: | |
queue.put_nowait(message) | |
if __name__ == "__main__": | |
pique = PiQue(host="127.0.0.1", port="31415") | |
try: | |
pique.run() | |
except KeyboardInterrupt: | |
print("Bye.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
tab 1: telnet localhost 31415
tab 2: telnet localhost 31415
tab 1: