Skip to content

Instantly share code, notes, and snippets.

@ericdevries
Created October 30, 2019 13:41
Show Gist options
  • Select an option

  • Save ericdevries/c0c7b0c63c5f07b147c464a0a1b0c03a to your computer and use it in GitHub Desktop.

Select an option

Save ericdevries/c0c7b0c63c5f07b147c464a0a1b0c03a to your computer and use it in GitHub Desktop.
"""
test data coming from a thread, to be processed by an asyncio process
"""
import asyncio
import threading
import time
some_queue = asyncio.Queue()
event = asyncio.Event()
async def add_item_to_queue(queue, value):
queue.put_nowait(value)
def thread1(loop):
while True:
loop.call_soon_threadsafe(event.set)
time.sleep(2)
def thread2(loop):
while True:
loop.call_soon_threadsafe(some_queue.put_nowait, "t2")
time.sleep(1)
async def run():
while True:
item = await some_queue.get()
print("got item: {}".format(item))
async def run_event():
while True:
await event.wait()
print("awaited event, could do something now")
event.clear()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
t1 = threading.Thread(target=thread1, args=(loop,))
t1.daemon = True
t1.start()
# t2 = threading.Thread(target=thread2, args=(loop,))
# t2.daemon = True
# t2.start()
loop.create_task(run_event())
loop.run_until_complete(run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment