Skip to content

Instantly share code, notes, and snippets.

@asehmi
Created April 19, 2022 16:17
Show Gist options
  • Select an option

  • Save asehmi/ba89a73966620f82ec1f6d8ca850d162 to your computer and use it in GitHub Desktop.

Select an option

Save asehmi/ba89a73966620f82ec1f6d8ca850d162 to your computer and use it in GitHub Desktop.
Asyncio Queue Producer-Consumer Pattern in Streamlit
# See this thread: https://discuss.streamlit.io/t/best-fastest-practice-to-display-live-2d-data/19895
# Code below is lifted from @andfanilo's comment
# https://realpython.com/async-io-python/#using-a-queue
import asyncio
from datetime import datetime
# pip install opencv-python-headless
import cv2
import numpy as np
import streamlit as st
QUEUE_SIZE = 100
SIZE_IMAGE = 512
def get_or_create_eventloop():
try:
return asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()
async def produce_images(queue, delay):
while True:
_ = await asyncio.sleep(delay)
image = np.random.random((SIZE_IMAGE, SIZE_IMAGE)).astype(np.float32)
# Add bars depending on state count
n = st.session_state.produced_images % SIZE_IMAGE
m = st.session_state.produced_images % SIZE_IMAGE
image[n : n + 10] = 0
image[:, m : m + 10] = 1
_ = await queue.put(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR))
st.session_state.produced_images += 1
async def consume_images(image_placeholder, queue_size_placeholder, queue, delay):
while True:
_ = await asyncio.sleep(delay)
if queue.qsize() > 0:
image = await queue.get()
image_placeholder.image(
image,
caption=f"Consumed images: {st.session_state.consumed_images}, {str(datetime.now())}",
)
queue_size_placeholder.metric(
f"In queue (queue size is {QUEUE_SIZE})", st.session_state.queue.qsize()
)
st.session_state.consumed_images += 1
queue.task_done()
async def run_app(
image_placeholder, queue_size_placeholder, queue, produce_delay, consume_delay
):
_ = await asyncio.gather(
produce_images(queue, produce_delay),
consume_images(image_placeholder, queue_size_placeholder, queue, consume_delay),
)
##### ACTUAL APP
if __name__ == "__main__":
st.set_page_config(
layout="wide",
initial_sidebar_state="auto",
page_title="Asyncio test",
page_icon=None,
)
if "event_loop" not in st.session_state:
st.session_state.loop = asyncio.new_event_loop()
asyncio.set_event_loop(st.session_state.loop)
# if "queue" not in st.session_state:
# st.session_state.queue = asyncio.Queue(QUEUE_SIZE)
# if "produced_images" not in st.session_state:
# st.session_state.produced_images = 0
# if "consumed_images" not in st.session_state:
# st.session_state.consumed_images = 0
st.session_state.queue = asyncio.Queue(QUEUE_SIZE)
st.session_state.produced_images = 0
st.session_state.consumed_images = 0
st.title("Hello random image!")
produce_delay = 1 / st.sidebar.slider(
"Produce images Frequency (img / second)", 1, 1000, 100
)
consume_delay = 1 / st.sidebar.slider(
"Display images Frequency (img / second)", 1, 1000, 100
)
c1, c2 = st.columns(2)
image_placeholder = c1.empty()
queue_size_placeholder = c2.empty()
asyncio.run(
run_app(
image_placeholder,
queue_size_placeholder,
st.session_state.queue,
produce_delay,
consume_delay,
)
)
@asehmi
Copy link
Author

asehmi commented Apr 19, 2022

asyncio-queue-producer-consumer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment