Skip to content

Instantly share code, notes, and snippets.

@diegogslomp
Last active September 15, 2023 23:15
Show Gist options
  • Save diegogslomp/91e43226f769f371418a9b2fd7ba0257 to your computer and use it in GitHub Desktop.
Save diegogslomp/91e43226f769f371418a9b2fd7ba0257 to your computer and use it in GitHub Desktop.
Python worker for sync and async tasks
from asyncio import Queue
import asyncio
import random
import time
async def worker_for_sync_task(name: str, queue: Queue, task) -> None:
while True:
item = await queue.get()
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, task, item)
queue.task_done()
print(f"{item} done by {name}")
async def worker_for_async_task(name: str, queue: Queue, task) -> None:
while True:
item = await queue.get()
await task(item)
queue.task_done()
print(f"{item} done by {name}")
async def main():
queue = Queue()
total_sleep_time = 0
num_of_workers = 5
num_of_tasks = 50
for _ in range(num_of_tasks):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(num_of_workers):
task = asyncio.create_task(
worker_for_async_task(f"worker-{i}", queue, asyncio.sleep)
)
tasks.append(task)
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
print("====")
print(f"{num_of_workers} workers slept in parallel for {total_slept_for:.2f} seconds")
print(f"total expected sleep time: {total_sleep_time:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment