Last active
March 1, 2024 16:46
-
-
Save mosquito/d54547f1dc8d2eb32fd8b41c4690cde3 to your computer and use it in GitHub Desktop.
Asyncio PIPE
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import fcntl | |
import os | |
from functools import partial | |
class AsyncPIPE: | |
@staticmethod | |
def create_pipe(): | |
read_fd, write_fd = os.pipe() | |
for fd in (read_fd, write_fd): | |
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK) | |
return read_fd, write_fd | |
def __init__(self, loop=None): | |
self.loop = loop or asyncio.get_event_loop() | |
self.read_fd, self.write_fd = self.create_pipe() | |
self._write_futures = [] | |
self._read_futures = [] | |
self.loop.add_reader(self.read_fd, self.on_read) | |
self.loop.add_writer(self.write_fd, self.on_write) | |
def on_read(self): | |
if not self._read_futures: | |
return | |
f, size = self._read_futures.pop(0) | |
buff = os.read(self.read_fd, size) | |
f.set_result(buff) | |
def on_write(self): | |
if not self._write_futures: | |
return | |
f, data = self._write_futures.pop(0) | |
os.write(self.write_fd, data) | |
f.set_result(True) | |
def __del__(self): | |
self.close() | |
def close(self): | |
if self.read_fd is None: | |
return | |
read_fd, write_fd = self.read_fd, self.write_fd | |
self.read_fd, self.write_fd = None, None | |
self.loop.remove_reader(read_fd) | |
os.close(read_fd) | |
os.close(write_fd) | |
def write(self, data: bytes): | |
f = self.loop.create_future() | |
self._write_futures.append((f, data)) | |
return f | |
def read(self, size): | |
f = self.loop.create_future() | |
self._read_futures.append((f, size)) | |
return f |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
loop = asyncio.get_event_loop() | |
async def main(): | |
pipe = AsyncPIPE(loop) | |
await pipe.write(b'foo' * 1024) | |
print(await pipe.read(1024)) | |
pipe.close() | |
task = loop.create_task(main()) | |
try: | |
loop.run_forever() | |
except: | |
task.cancel() | |
loop.run_until_complete(task) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment