Skip to content

Instantly share code, notes, and snippets.

@bml1g12
Last active January 1, 2021 09:08
Show Gist options
  • Save bml1g12/69e92e04c235f7670d796b5b9c47ed95 to your computer and use it in GitHub Desktop.
Save bml1g12/69e92e04c235f7670d796b5b9c47ed95 to your computer and use it in GitHub Desktop.
Example consumer
def prepare_frame(np_arr_shape, frames_written):
"""Emulate an I/O limited step that returns a frame,
e.g. reading from a quick-to-decode file. Note that we could instead simulate
a CPU intensive task here and the resulting benchmark runs much faster
when using multiprocessing than multithreading.
:param Tuple[int, int] np_arr_shape: Dimension of the numpy array to be produced.
:param int frames_written:
"""
frame = np.ones(np_arr_shape) * frames_written
time.sleep(0.001)
return frame
def worker_producer(np_arr_shape, queue, n_frames):
"""A frame producer function, e.g. for a worker thread or process
:param Tuple[int, int] np_arr_shape: The shape of the numpy array to generate
:param int n_frames: the number of frames to produce
"""
for _ in range(n_frames):
np_arr = prepare_random_frame(np_arr_shape)
queue.put(np_arr)
def consumer(n_frames, queue):
"""A frame consumer function, which draws frames from the worker thread/process via a queue
and does a dummy calculation on the result."""
for _ in range(n_frames):
np_arr = queue.get()
# example of some processing done on the array:
_ = np_arr.astype("uint8").copy() * 2
import threading
import queue
queue = queue.Queue()
thread = threading.Thread(target=worker_producer,
args=(np_arr_shape, queue, n_frames))
thread.start()
# will consume n_frames from producer
consumer(n_frames, queue)
import multiprocessing
import queue
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=worker_producer,
args=(np_arr_shape, queue, n_frames))
process.start()
# will consume n_frames from producer
consumer(n_frames, queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment