Created
December 8, 2015 22:29
-
-
Save atemate/c3afc210befef15633d2 to your computer and use it in GitHub Desktop.
threads_testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread | |
from time import sleep | |
class Test: | |
def __init__(self): | |
self.my_thread = Thread(target=self.do_loop, name='infinite_loop') | |
self.my_thread.start() | |
exit() | |
def do_loop(self): | |
while True: | |
try: | |
print('a') | |
sleep(1) | |
except KeyboardInterrupt as e: | |
print('loop') | |
print(e) | |
exit() | |
def pprint(self, msg): | |
print(msg) | |
if __name__ == '__main__': | |
try: | |
test = Test() | |
test.pprint('blablabla') | |
except KeyboardInterrupt as e: | |
print('main') | |
print(e) | |
exit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread | |
from time import sleep | |
class ReadingLoopThread(Thread): | |
def __init__(self): | |
Thread.__init__(self) | |
def run(self): | |
while True: | |
try: | |
print('a') | |
sleep(0.5) | |
except KeyboardInterrupt as e: | |
print('in loop: interrupt') | |
raise(e) | |
class Test: | |
def __init__(self): | |
self.my_thread = ReadingLoopThread() | |
self.my_thread.start() | |
def while_loop_function(self, msg): | |
print(msg) | |
if __name__ == '__main__': | |
try: | |
test = Test() | |
test.while_loop_function('blablabla') | |
except Exception as e: | |
print('in main') | |
print(e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import threading | |
import queue | |
from time import sleep | |
class ExcThread(threading.Thread): | |
def __init__(self, bucket): | |
threading.Thread.__init__(self) | |
self.bucket = bucket | |
def run(self): | |
while True: | |
try: | |
print('bla') | |
sleep(0.5) | |
#raise Exception('An error occured here.') | |
except KeyboardInterrupt as e: | |
print('loop') | |
self.bucket.put(sys.exc_info()) | |
break | |
def main(): | |
bucket = queue.Queue() | |
thread_obj = ExcThread(bucket) | |
thread_obj.start() | |
while True: | |
try: | |
exc = bucket.get(block=False) | |
except queue.Empty: | |
pass | |
else: | |
exc_type, exc_obj, exc_trace = exc | |
# deal with the exception | |
print(exc_type, exc_obj) | |
print(exc_trace) | |
thread_obj.join(0.1) | |
if thread_obj.isAlive(): | |
continue | |
#else: | |
# break | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment