Created
February 24, 2014 06:06
-
-
Save flyer103/9182713 to your computer and use it in GitHub Desktop.
Prove that a class is not thread-safe.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python2.7 | |
| # -*- coding: utf-8 -*- | |
| import threading | |
| import Queue | |
| import time | |
| class MySingletonNTSafe(object): | |
| """implement Singleton pattern, not-thread-safe""" | |
| _instance = None | |
| def __new__(cls, *args, **kwargs): | |
| print('Time to sleep: %f' % (kwargs['stime'],)) | |
| if MySingletonNTSafe._instance is None: | |
| time.sleep(kwargs['stime']) | |
| MySingletonNTSafe._instance = super(MySingletonNTSafe, cls).__new__(cls) | |
| return MySingletonNTSafe._instance | |
| def __init__(self, *args,**kwargs): | |
| MySingletonNTSafe._instance = self | |
| class Test(object): | |
| """Test whether the class MySingleton works""" | |
| def __init__(self, tnum=3): | |
| self.tnum = tnum | |
| self.queue_st = Queue.Queue() | |
| self.queue_nst = Queue.Queue() | |
| def run(self): | |
| threads_ns = [] | |
| threads_ns.append(threading.Thread(target=self._run_nst, kwargs={'stime': 2})) | |
| threads_ns.append(threading.Thread(target=self._run_nst, kwargs={'stime': 0})) | |
| for t in threads_ns: | |
| t.start() | |
| for t in threads_ns: | |
| t.join() | |
| def _run_nst(self, stime=0): | |
| # not-thread-safe case | |
| obj = MySingletonNTSafe(stime=stime) | |
| self.queue_nst.put(obj) | |
| if __name__ == '__main__': | |
| num_try = 0 | |
| flag = False | |
| while True: | |
| test = Test() | |
| test.run() | |
| objs_nst = [] | |
| while not test.queue_nst.empty(): | |
| objs_nst.append(test.queue_nst.get()) | |
| last = objs_nst.pop() | |
| for obj in objs_nst: | |
| if not last == obj: | |
| print('NOT IDENTICAL') | |
| flag = True | |
| break | |
| num_try += 1 | |
| print('Number of try: #%d' % (num_try,)) | |
| if flag: | |
| break | |
| print('\n') | |
| time.sleep(0.5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment