Created
October 9, 2015 21:54
-
-
Save Makman2/be362ec3ab04adb2e1d3 to your computer and use it in GitHub Desktop.
Test for InterruptableProcess
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from multiprocessing import Event | |
from InterruptableProcess import InterruptableProcess | |
class InterruptableProcessTest(unittest.TestCase): | |
def test_normal_run(self): | |
def worker(e1, e2, e3): | |
e1.wait() | |
e2.set() | |
e3.wait() | |
e1, e2, e3 = (Event() for i in range(3)) | |
proc = InterruptableProcess(target=worker, args=(e1, e2, e3)) | |
self.assertRaises(RuntimeError, proc.interrupt) | |
proc.start() | |
self.assertFalse(e1.is_set()) | |
self.assertFalse(e2.is_set()) | |
self.assertFalse(e3.is_set()) | |
e1.set() | |
e2.wait() | |
self.assertTrue(e1.is_set()) | |
self.assertTrue(e2.is_set()) | |
self.assertFalse(e3.is_set()) | |
e3.set() | |
self.assertTrue(e1.is_set()) | |
self.assertTrue(e2.is_set()) | |
self.assertTrue(e3.is_set()) | |
proc.join() | |
self.assertRaises(RuntimeError, proc.interrupt) | |
# Test bubbled up exception with normal run. | |
# Test interrupt with normal run. | |
# Test interrupt fail? | |
# Test already delegating interrupt? | |
if __name__ == '__main__': | |
unittest.main(verbosity=2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment