Last active
June 16, 2024 05:25
-
-
Save mightymercado/4efba1f070a6ba6526c3e237f0eb0443 to your computer and use it in GitHub Desktop.
Send data to a multiprocessing.Process running an asyncio event loop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A rare scenario: Communicate to a child process that's running an event loop | |
import asyncio | |
from asyncio import StreamReader, StreamReaderProtocol | |
from multiprocessing import Process | |
import os | |
class Worker: | |
def __init__(self): | |
self.read_fd, self.write_fd = os.pipe() | |
self.writer = os.fdopen(self.write_fd, 'wb', 0) | |
def start(self): | |
os.set_inheritable(self.read_fd, True) | |
Process(target=Worker.worker, | |
args=(self.read_fd, )) \ | |
.start() | |
os.close(self.read_fd) | |
@staticmethod | |
async def do_task(task_number): | |
pass | |
@staticmethod | |
async def accept_tasks(read_fd): | |
loop = asyncio.get_running_loop() | |
reader = StreamReader() | |
protocol = lambda: StreamReaderProtocol(reader) | |
transport, _ = await loop.connect_read_pipe( | |
protocol, os.fdopen(read_fd, 'rb', 0)) | |
while True: | |
task_number = int(await reader.readline()) | |
loop.create_task(Worker.do_task(task_number)) | |
transport.close() | |
@staticmethod | |
def worker(read_fd): | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(Worker.accept_tasks(read_fd)) | |
def add(self, task): | |
self.writer.write((str(task) + '\n').encode()) | |
worker = Worker() | |
worker.start() | |
worker.add('1') | |
worker.add('2') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment