Skip to content

Instantly share code, notes, and snippets.

@Makman2
Created October 9, 2015 21:54
Show Gist options
  • Save Makman2/be362ec3ab04adb2e1d3 to your computer and use it in GitHub Desktop.
Save Makman2/be362ec3ab04adb2e1d3 to your computer and use it in GitHub Desktop.
Test for InterruptableProcess
import unittest
from multiprocessing import Event
from InterruptableProcess import InterruptableProcess
class InterruptableProcessTest(unittest.TestCase):
def test_normal_run(self):
def worker(e1, e2, e3):
e1.wait()
e2.set()
e3.wait()
e1, e2, e3 = (Event() for i in range(3))
proc = InterruptableProcess(target=worker, args=(e1, e2, e3))
self.assertRaises(RuntimeError, proc.interrupt)
proc.start()
self.assertFalse(e1.is_set())
self.assertFalse(e2.is_set())
self.assertFalse(e3.is_set())
e1.set()
e2.wait()
self.assertTrue(e1.is_set())
self.assertTrue(e2.is_set())
self.assertFalse(e3.is_set())
e3.set()
self.assertTrue(e1.is_set())
self.assertTrue(e2.is_set())
self.assertTrue(e3.is_set())
proc.join()
self.assertRaises(RuntimeError, proc.interrupt)
# Test bubbled up exception with normal run.
# Test interrupt with normal run.
# Test interrupt fail?
# Test already delegating interrupt?
if __name__ == '__main__':
unittest.main(verbosity=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment