Created
April 19, 2022 16:17
-
-
Save asehmi/ba89a73966620f82ec1f6d8ca850d162 to your computer and use it in GitHub Desktop.
Asyncio Queue Producer-Consumer Pattern in Streamlit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # See this thread: https://discuss.streamlit.io/t/best-fastest-practice-to-display-live-2d-data/19895 | |
| # Code below is lifted from @andfanilo's comment | |
| # https://realpython.com/async-io-python/#using-a-queue | |
| import asyncio | |
| from datetime import datetime | |
| # pip install opencv-python-headless | |
| import cv2 | |
| import numpy as np | |
| import streamlit as st | |
| QUEUE_SIZE = 100 | |
| SIZE_IMAGE = 512 | |
| def get_or_create_eventloop(): | |
| try: | |
| return asyncio.get_event_loop() | |
| except RuntimeError as ex: | |
| if "There is no current event loop in thread" in str(ex): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| return asyncio.get_event_loop() | |
| async def produce_images(queue, delay): | |
| while True: | |
| _ = await asyncio.sleep(delay) | |
| image = np.random.random((SIZE_IMAGE, SIZE_IMAGE)).astype(np.float32) | |
| # Add bars depending on state count | |
| n = st.session_state.produced_images % SIZE_IMAGE | |
| m = st.session_state.produced_images % SIZE_IMAGE | |
| image[n : n + 10] = 0 | |
| image[:, m : m + 10] = 1 | |
| _ = await queue.put(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)) | |
| st.session_state.produced_images += 1 | |
| async def consume_images(image_placeholder, queue_size_placeholder, queue, delay): | |
| while True: | |
| _ = await asyncio.sleep(delay) | |
| if queue.qsize() > 0: | |
| image = await queue.get() | |
| image_placeholder.image( | |
| image, | |
| caption=f"Consumed images: {st.session_state.consumed_images}, {str(datetime.now())}", | |
| ) | |
| queue_size_placeholder.metric( | |
| f"In queue (queue size is {QUEUE_SIZE})", st.session_state.queue.qsize() | |
| ) | |
| st.session_state.consumed_images += 1 | |
| queue.task_done() | |
| async def run_app( | |
| image_placeholder, queue_size_placeholder, queue, produce_delay, consume_delay | |
| ): | |
| _ = await asyncio.gather( | |
| produce_images(queue, produce_delay), | |
| consume_images(image_placeholder, queue_size_placeholder, queue, consume_delay), | |
| ) | |
| ##### ACTUAL APP | |
| if __name__ == "__main__": | |
| st.set_page_config( | |
| layout="wide", | |
| initial_sidebar_state="auto", | |
| page_title="Asyncio test", | |
| page_icon=None, | |
| ) | |
| if "event_loop" not in st.session_state: | |
| st.session_state.loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(st.session_state.loop) | |
| # if "queue" not in st.session_state: | |
| # st.session_state.queue = asyncio.Queue(QUEUE_SIZE) | |
| # if "produced_images" not in st.session_state: | |
| # st.session_state.produced_images = 0 | |
| # if "consumed_images" not in st.session_state: | |
| # st.session_state.consumed_images = 0 | |
| st.session_state.queue = asyncio.Queue(QUEUE_SIZE) | |
| st.session_state.produced_images = 0 | |
| st.session_state.consumed_images = 0 | |
| st.title("Hello random image!") | |
| produce_delay = 1 / st.sidebar.slider( | |
| "Produce images Frequency (img / second)", 1, 1000, 100 | |
| ) | |
| consume_delay = 1 / st.sidebar.slider( | |
| "Display images Frequency (img / second)", 1, 1000, 100 | |
| ) | |
| c1, c2 = st.columns(2) | |
| image_placeholder = c1.empty() | |
| queue_size_placeholder = c2.empty() | |
| asyncio.run( | |
| run_app( | |
| image_placeholder, | |
| queue_size_placeholder, | |
| st.session_state.queue, | |
| produce_delay, | |
| consume_delay, | |
| ) | |
| ) |
Author
asehmi
commented
Apr 19, 2022

Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment