Skip to content

Instantly share code, notes, and snippets.

@theY4Kman
Created October 4, 2018 04:53
Show Gist options
  • Select an option

  • Save theY4Kman/03006b2142e8160fb77a7c7028f7e5c1 to your computer and use it in GitHub Desktop.

Select an option

Save theY4Kman/03006b2142e8160fb77a7c7028f7e5c1 to your computer and use it in GitHub Desktop.
import sys
import threading
import time
if sys.version_info < (3, 4):
def is_main_thread(thread):
return isinstance(thread, threading._MainThread)
else:
def is_main_thread(thread):
return threading.current_thread() == threading.main_thread()
def filter_main_thread(threads):
for thread in threads:
if not is_main_thread(thread):
yield thread
def enumerate_threads(include_main=False):
threads = threading.enumerate()
if not include_main:
threads = filter_main_thread(threads)
for thread in threads:
yield thread
def join_all_threads(timeout=2):
timed_out = threading.Event()
timeout_thread = threading.Timer(timeout, lambda: timed_out.set() or print('timed oot set'))
timeout_thread.start()
while not timed_out.is_set():
time.sleep(0.05)
join_timed_out = False
for thread in enumerate_threads():
print('Joining', thread, '...', end=' ')
thread.join(timeout=0)
if thread.is_alive():
print('BUT IT WOULD NOT LISTEN')
join_timed_out = True
else:
print('Done')
if not join_timed_out:
timeout_thread.cancel()
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment