Skip to content

Instantly share code, notes, and snippets.

@jimbaker
Created September 9, 2016 16:29
Show Gist options
  • Save jimbaker/ef6449e99cd092376d9acbc380301196 to your computer and use it in GitHub Desktop.
Save jimbaker/ef6449e99cd092376d9acbc380301196 to your computer and use it in GitHub Desktop.
Test to demonstrate that pure Python OrderedDict in Jython 2.7 is not threadsafe
from collections import OrderedDict
from test import test_support
import threading
import time
import unittest
class ThreadSafetyTestCase(unittest.TestCase):
def run_threads(self, f, num=10):
threads = []
for i in xrange(num):
t = threading.Thread(target=f)
t.start()
threads.append(t)
timeout = 10. # be especially generous
for t in threads:
t.join(timeout)
timeout = 0.
for t in threads:
self.assertFalse(t.isAlive())
def test_ordereddict(self):
from collections import OrderedDict
d = OrderedDict()
def tester():
ct = threading.currentThread()
for i in range(1000):
for j in range(10):
d[(ct, j)] = ct.ident
time.sleep(0.0001)
d.items()
for j in range(10):
del d[(ct, j)]
self.run_threads(tester, num=100)
self.assertEqual(d, {})
def test_main():
test_support.run_unittest(ThreadSafetyTestCase)
if __name__ == "__main__":
test_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment