Skip to content

Instantly share code, notes, and snippets.

@pedro-psb
Created April 10, 2025 10:45
Show Gist options
  • Save pedro-psb/36bb643a530fb697f90504a5c9f760e0 to your computer and use it in GitHub Desktop.
Save pedro-psb/36bb643a530fb697f90504a5c9f760e0 to your computer and use it in GitHub Desktop.
Experiment integrating async with select
import asyncio
import os
import select
import time
import socket
from multiprocessing import Process
ITERATIONS = 10
def main(server_r, loop, server_proc):
async_r, async_w = socket.socketpair()
def task_done(task: asyncio.Task):
task_name = task.get_name()
async_w.send(task_name.encode())
task = loop.create_task(immediate_task())
task.add_done_callback(task_done)
done = False
while not done:
print("HEARTBEAT")
readable, _, _ = select.select([server_r, async_r], [], [], 1)
tasks = asyncio.all_tasks(loop)
pending_tasks = {task for task in tasks if not task.done()}
loop.run_until_complete(asyncio.sleep(0))
if not readable and not pending_tasks:
done = True
if server_r in readable:
data = os.read(server_r, 1024)
print(f"Got socket data from server: {data}")
if async_r in readable:
task_name = async_r.recv(1024).decode()
print(f"Got data from async coroutine: {task_name}")
print("DONE")
def mock_server():
r, w = os.pipe()
os.set_blocking(r, False)
os.set_blocking(w, False)
def activity():
for i in range(ITERATIONS):
time.sleep(1)
data = f"server: {i + 1}"
os.write(w, data.encode())
process = Process(target=activity)
process.start()
return r, process
async def immediate_task():
for i in range(ITERATIONS):
print(f"count {i + 1}")
if i % 2 == 0:
await asyncio.sleep(1)
else:
time.sleep(1)
return "task-result"
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
server_r, process = mock_server()
main(server_r, loop, process)
finally:
process.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment