Skip to content

Instantly share code, notes, and snippets.

@huangsam
Last active January 7, 2023 08:09
Show Gist options
  • Save huangsam/6e24eecd0b232f548ec931706502ecf5 to your computer and use it in GitHub Desktop.
Save huangsam/6e24eecd0b232f548ec931706502ecf5 to your computer and use it in GitHub Desktop.
Try Thread class and Queue class together
from queue import Queue
from threading import Thread
from time import sleep
# https://docs.python.org/3/library/queue.html
# https://stackoverflow.com/questions/27200674/python-queue-join
# https://stackoverflow.com/questions/7445742/runtimeerror-thread-init-not-called-when-subclassing-threading-thread
_TASK_QUEUE = Queue()
# https://docs.python.org/3/library/threading.html#thread-objects
class SleepThread(Thread):
def __init__(self, *args, **kwargs):
"""Override default initialization ever so slightly."""
super().__init__(*args, **kwargs)
self.num = 0
def setup(self, num):
"""Setup the number that will be printed in run method."""
self.num = num
def run(self):
"""Run the logic that is implicitly invoked by start method."""
_ = _TASK_QUEUE.get() # we don't care which thread it is
sleep(self.num * 0.1)
print(f"Finish run by printing the number {self.num}")
_TASK_QUEUE.task_done() # we just care about communicating the end
def main():
for num in range(5):
th = SleepThread(name=f"sleep-thread-{num}")
th.setup(num)
_TASK_QUEUE.put(th) # let's put the thread in the queue
th.start()
_TASK_QUEUE.join() # block until all tasks are known to be done
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment