Created
December 23, 2016 05:52
-
-
Save limboinf/0afc01a5a01470372a0e3399322d233d to your computer and use it in GitHub Desktop.
Python线程池
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
""" | |
线程池. | |
:copyright: (c) 2015 by fangpeng. | |
:license: MIT, see LICENSE for more details. | |
""" | |
import sys | |
import Queue | |
import threading | |
class Worker(threading.Thread): | |
def __init__(self, in_queue, out_queue, err_queue): | |
"""初始化并启动一个工作线程(work thread), | |
:param in_queue: 等待执行的队列 | |
:param out_queue: 任务执行结果的队列 | |
:param err_queue: 任务错误信息的队列 | |
""" | |
threading.Thread.__init__(self) | |
self.setDaemon(True) | |
self.in_queue = in_queue | |
self.out_queue = out_queue | |
self.err_queue = err_queue | |
self.start() | |
def run(self): | |
while 1: | |
# 在in_queue队列中处理任务直到"command"为"stop"状态. | |
command, callback, args, kwargs = self.in_queue.get() | |
if command == 'stop': | |
break | |
try: | |
if command != 'process': | |
raise ValueError('Unknown command %r' % command) | |
except: | |
self.report_error() | |
else: | |
self.out_queue.put(callback(*args, **kwargs)) | |
def dismiss(self): | |
command = 'stop' | |
self.in_queue.put((command, None, None, None)) | |
def report_error(self): | |
"""通过添加错误信息到err_queue报告错误""" | |
self.err_queue.put(sys.exc_info()[:2]) | |
class ThreadPool(object): | |
"""Manager thread pool.""" | |
max_threads = 32 | |
def __init__(self, num_threads, pool_size=0): | |
"""在线程池中生成num_threads个线程并初始化上述三个队列. | |
:param num_threads: 线程个数 | |
:param pool_size: 线程池缓冲区大小,0表示缓冲区是无限。 | |
""" | |
num_threads = ThreadPool.max_threads if num_threads > ThreadPool.max_threads else num_threads | |
self.in_queue = Queue.Queue(pool_size) | |
self.out_queue = Queue.Queue(pool_size) | |
self.err_queue = Queue.Queue(pool_size) | |
self.workers = {} | |
for i in range(num_threads): | |
worker = Worker(self.in_queue, self.out_queue, self.err_queue) | |
self.workers[i] = worker | |
def add_task(self, callback, *args, **kwargs): | |
command = "process" | |
self.in_queue.put((command, callback, args, kwargs)) | |
def _get_results(self, queue): | |
"""Generator to yield one after the others all items currently | |
in the queue, without any waiting | |
""" | |
try: | |
while True: | |
# Equivalent to get(False) | |
yield queue.get_nowait() # Remove and return an item from the queue without blocking. | |
except Queue.Empty: | |
raise StopIteration | |
def get_task(self): | |
return self.out_queue.get() | |
def show_results(self): | |
for result in self._get_results(self.out_queue): | |
print 'Result:', result | |
def show_errors(self): | |
for etyp, err in self._get_results(self.err_queue): | |
print 'Error:', etyp, err | |
def destroy(self): | |
# 顺序很重要,第一,要停止所有的线程...: | |
for i in self.workers: | |
self.workers[i].dismiss() | |
# ...然后, 等待每个线程的终止: | |
for i in self.workers: | |
self.workers[i].join() | |
# clean up the workers from now-unused thread objects | |
del self.workers | |
if __name__ == '__main__': | |
import time | |
def stuff(arg): | |
time.sleep(90) | |
print 'thread:', arg | |
pool = ThreadPool(3) | |
pool.add_task(stuff, 'ts') | |
# Join and destroy all threads | |
pool.destroy() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment