Skip to content

Instantly share code, notes, and snippets.

@kylemcdonald
Created December 8, 2023 09:33
Show Gist options
  • Save kylemcdonald/57ac35baa3e69e70f7ef28217d1a4684 to your computer and use it in GitHub Desktop.
Save kylemcdonald/57ac35baa3e69e70f7ef28217d1a4684 to your computer and use it in GitHub Desktop.
Example of a threaded pipeline in Python.
import threading
import queue
class ThreadedWorker():
def __init__(self):
self.input_queue = queue.Queue()
self.output_queue = queue.Queue()
self.should_exit = False
self.thread = threading.Thread(target=self.run)
def feed(self, feeder):
self.input_queue = feeder.output_queue
return self
def start(self):
self.thread.start()
return self
def process(self, value):
return value
def run(self):
while not self.should_exit:
input = self.input_queue.get()
if input is None:
break # setinel
self.output_queue.put(self.process(input))
def stop(self):
self.should_exit = True
self.input_queue.put(None) # setinel
self.thread.join()
class AddOne(ThreadedWorker):
def process(self, value):
return value + 1
add_one_0 = AddOne()
add_one_1 = AddOne().feed(add_one_0)
add_one_2 = AddOne().feed(add_one_1)
add_one_0.start()
add_one_1.start()
add_one_2.start()
add_one_0.input_queue.put(0)
print(add_one_2.output_queue.get())
add_one_0.stop()
add_one_1.stop()
add_one_2.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment