Skip to content

Instantly share code, notes, and snippets.

@jaycosaur
Created January 27, 2020 21:39
Show Gist options
  • Save jaycosaur/5c77104653b81bbc26dd2477999e83d5 to your computer and use it in GitHub Desktop.
Save jaycosaur/5c77104653b81bbc26dd2477999e83d5 to your computer and use it in GitHub Desktop.
Working with asyncio and sync worker processes is always a pain, not anymore with Janus and this convertor. Async -> Sync is also trivial to implement.
from queue import Queue
from threading import Thread
from typing import cast
import asyncio
import janus
# note that this will never cleanup the thread. You will need to implement your own shutdown logic / methods.
def sync_to_async_queue(queue: Queue, loop: asyncio.AbstractEventLoop) -> asyncio.Queue:
conversion_queue: janus.Queue = janus.Queue(loop=loop)
def listen_and_forward(queue_to_watch: Queue) -> None:
while True:
try:
conversion_queue.sync_q.put(queue_to_watch.get())
except Exception:
return
thread = Thread(target=listen_and_forward, args=(queue,), daemon=True)
thread.start()
return cast(asyncio.Queue, conversion_queue.async_q)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment