Created
June 2, 2012 14:18
-
-
Save shiweifu/2858604 to your computer and use it in GitHub Desktop.
spider use threadpool by shiweifu
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: UTF-8 -*- | |
import httplib | |
import urllib2 | |
import socket | |
import traceback | |
import re | |
import BeautifulSoup | |
import threading | |
from threadpool import ThreadPool | |
from Queue import Queue, Empty | |
import threadpool | |
import time | |
import sys | |
# exceptions | |
class NoResultsPending(Exception): | |
"""All work requests have been processed.""" | |
pass | |
class NoWorkersAvailable(Exception): | |
"""No worker threads available to process remaining requests.""" | |
pass | |
class TimeOut(Exception): | |
"""use urllib open web fail with timeout""" | |
pass | |
class ParamsError(Exception): | |
""" 传递进来的参数不够 """ | |
pass | |
FILTER_EXTS = [".exe", ".rar", ".zip", ".apk", ".mp3"] | |
# internal module helper functions | |
def _handle_thread_exception(request, exc_info): | |
"""Default exception handler callback function. | |
This just prints the exception info via ``traceback.print_exception``. | |
""" | |
traceback.print_exception(*exc_info) | |
def urlopen_with_timeout(url, data=None, timeout=3): | |
# Create these two helper classes fresh each time, since | |
# timeout needs to be in the closure. | |
class TimeoutHTTPConnection(httplib.HTTPConnection): | |
def connect(self): | |
"""Connect to the host and port specified in __init__.""" | |
msg = "getaddrinfo returns an empty list" | |
for res in socket.getaddrinfo(self.host, self.port, 0, | |
socket.SOCK_STREAM): | |
af, socktype, proto, canonname, sa = res | |
try: | |
self.sock = socket.socket(af, socktype, proto) | |
if timeout is not None: | |
self.sock.settimeout(timeout) | |
if self.debuglevel > 0: | |
print "connect: (%s, %s)" % (self.host, self.port) | |
self.sock.connect(sa) | |
except socket.error, msg: | |
if self.debuglevel > 0: | |
print 'connect fail:', (self.host, self.port) | |
if self.sock: | |
self.sock.close() | |
self.sock = None | |
continue | |
break | |
if not self.sock: | |
raise socket.error, msg | |
class TimeoutHTTPHandler(urllib2.HTTPHandler): | |
http_request = urllib2.AbstractHTTPHandler.do_request_ | |
def http_open(self, req): | |
return self.do_open(TimeoutHTTPConnection, req) | |
opener = urllib2.build_opener(TimeoutHTTPHandler) | |
try: | |
body = opener.open(url, data) | |
except UnicodeEncodeError, uee: | |
body = opener.open(url.encode("utf8"), data) | |
except urllib2.URLError, ue: | |
print ue | |
raise TimeOut | |
except urllib2.HTTPError, he: | |
print he | |
raise urllib2.HTTPError | |
return body | |
def _get_page(_task, _log_queue): | |
url = _task[0] | |
try: | |
page = urlopen_with_timeout(url).read()#.read() | |
# page = urllib2.urlopen(url)#.read() | |
except TimeOut, to: | |
page = None | |
_log_queue.put((url, to)) | |
except urllib2.HTTPError, he: | |
_log_queue.put((url, he)) | |
page = None | |
return page | |
def _parse_page(_page, _task, _url_list, _deeph, _filter_exts, _root): | |
task_list = [] | |
is_filter = False | |
try: | |
soup = BeautifulSoup.BeautifulSoup(_page) | |
except Exception, ex: | |
#print _task[0] | |
print _task | |
print ex | |
return [] | |
urlall = soup.findAll('a', onclick=None, | |
href=re.compile('^http:|^/')) | |
url = _task[0] | |
if url.endswith('/'): | |
url = url[:-1] | |
for i in urlall: | |
#判断是否是root | |
if i['href'] == "/": | |
continue | |
#相对地址 | |
if i['href'].startswith('/'): | |
i['href'] = "".join((_root, i['href'])) | |
if _task[1] >= _deeph: | |
continue | |
#print "here" | |
#print i['href'] | |
if i["href"] in _url_list: | |
continue | |
else: | |
for ext in _filter_exts: | |
#print _task[0] | |
if i["href"].endswith(ext) == True: | |
print i["href"], "filter" | |
is_filter = True | |
break | |
if is_filter: | |
continue | |
_url_list.append(i['href']) | |
task_list.append((i['href'], _task[1]+1, _task[0])) | |
return task_list | |
def down_page_callable(**kw): | |
if kw.has_key("filter_exts") == False: | |
kw["filter_exts"] = FILTER_EXTS | |
try: | |
_task = kw["task"] | |
_put_request = kw["put_request"] | |
_url_list = kw["url_list"] | |
_deeph = kw["deeph"] | |
_root = kw["root"] | |
_filter_exts = kw["filter_exts"] | |
except KeyError, ke: | |
raise ParamsError | |
_log_queue = Queue(0) | |
page = _get_page(_task, _log_queue) | |
if page == None: | |
# _task_queue.task_done() | |
return | |
task_list = _parse_page(page, _task, _url_list, _deeph, _filter_exts, _root) | |
for _task in task_list: | |
kwds = { | |
"task" : _task, | |
"put_request" : _put_request, | |
"url_list" : _url_list, | |
"deeph" : _deeph , | |
"root" : _root, | |
"filter_exts" : _filter_exts | |
} | |
req = WorkRequest(down_page_callable, callback=write_log_callable(), kwds=kwds) | |
# _request_queue.put(req) | |
_put_request(req) | |
# classes | |
class WorkerThread(threading.Thread): | |
"""Background thread connected to the requests/results queues. | |
A worker thread sits in the background and picks up work requests from | |
one queue and puts the results in another until it is dismissed. | |
""" | |
def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): | |
"""Set up thread in daemonic mode and start it immediatedly. | |
``requests_queue`` and ``results_queue`` are instances of | |
``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new | |
worker thread. | |
""" | |
threading.Thread.__init__(self, **kwds) | |
self.setDaemon(1) | |
self._requests_queue = requests_queue | |
self._results_queue = results_queue | |
self._poll_timeout = poll_timeout | |
self._dismissed = threading.Event() | |
self.start() | |
def run(self): | |
"""Repeatedly process the job queue until told to exit.""" | |
while True: | |
if self._dismissed.isSet(): | |
# we are dismissed, break out of loop | |
break | |
# get next work request. If we don't get a new request from the | |
# queue after self._poll_timout seconds, we jump to the start of | |
# the while loop again, to give the thread a chance to exit. | |
try: | |
request = self._requests_queue.get(True, self._poll_timeout) | |
except Empty: | |
continue | |
else: | |
if self._dismissed.isSet(): | |
# we are dismissed, put back request in queue and exit loop | |
self._requests_queue.put(request) | |
break | |
try: | |
result = request.callable(*request.args, **request.kwds) | |
self._results_queue.put((request, result)) | |
except: | |
request.exception = True | |
self._results_queue.put((request, sys.exc_info())) | |
def dismiss(self): | |
"""Sets a flag to tell the thread to exit when done with current job.""" | |
self._dismissed.set() | |
class WorkRequest: | |
"""A request to execute a callable for putting in the request queue later. | |
See the module function ``makeRequests`` for the common case | |
where you want to build several ``WorkRequest`` objects for the same | |
callable but with different arguments for each call. | |
""" | |
def __init__(self, callable_, args=None, kwds=None, requestID=None, | |
callback=None, exc_callback=_handle_thread_exception): | |
if requestID is None: | |
self.requestID = id(self) | |
else: | |
try: | |
self.requestID = hash(requestID) | |
except TypeError: | |
raise TypeError("requestID must be hashable.") | |
self.exception = False | |
self.callback = callback | |
self.exc_callback = exc_callback | |
self.callable = callable_ | |
self.args = args or [] | |
self.kwds = kwds or {} | |
def __str__(self): | |
return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ | |
(self.requestID, self.args, self.kwds, self.exception) | |
class ThreadPool: | |
"""A thread pool, distributing work requests and collecting results. | |
See the module docstring for more information. | |
""" | |
def __init__(self, num_workers, request_queen=None, result_queen=None, q_size=0, resq_size=0, poll_timeout=5): | |
"""Set up the thread pool and start num_workers worker threads. | |
``num_workers`` is the number of worker threads to start initially. | |
If ``q_size > 0`` the size of the work *request queue* is limited and | |
the thread pool blocks when the queue is full and it tries to put | |
more work requests in it (see ``putRequest`` method), unless you also | |
use a positive ``timeout`` value for ``putRequest``. | |
If ``resq_size > 0`` the size of the *results queue* is limited and the | |
worker threads will block when the queue is full and they try to put | |
new results in it. | |
.. warning: | |
If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is | |
the possibilty of a deadlock, when the results queue is not pulled | |
regularly and too many jobs are put in the work requests queue. | |
To prevent this, always set ``timeout > 0`` when calling | |
``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. | |
""" | |
self._requests_queue = request_queen or Queue(q_size) | |
self._results_queue = result_queen or Queue(resq_size) | |
self.workers = [] | |
self.dismissedWorkers = [] | |
self.workRequests = {} | |
self.createWorkers(num_workers, poll_timeout) | |
def createWorkers(self, num_workers, poll_timeout=5): | |
"""Add num_workers worker threads to the pool. | |
``poll_timout`` sets the interval in seconds (int or float) for how | |
ofte threads should check whether they are dismissed, while waiting for | |
requests. | |
""" | |
for i in range(num_workers): | |
self.workers.append(WorkerThread(self._requests_queue, | |
self._results_queue, poll_timeout=poll_timeout)) | |
def dismissWorkers(self, num_workers, do_join=False): | |
"""Tell num_workers worker threads to quit after their current task.""" | |
dismiss_list = [] | |
for i in range(min(num_workers, len(self.workers))): | |
worker = self.workers.pop() | |
worker.dismiss() | |
dismiss_list.append(worker) | |
if do_join: | |
for worker in dismiss_list: | |
worker.join() | |
else: | |
self.dismissedWorkers.extend(dismiss_list) | |
def joinAllDismissedWorkers(self): | |
"""Perform Thread.join() on all worker threads that have been dismissed. | |
""" | |
for worker in self.dismissedWorkers: | |
worker.join() | |
self.dismissedWorkers = [] | |
def putRequest(self, request, block=True, timeout=None): | |
"""Put work request into work queue and save its id for later.""" | |
assert isinstance(request, WorkRequest) | |
# don't reuse old work requests | |
assert not getattr(request, 'exception', None) | |
self._requests_queue.put(request, block, timeout) | |
self.workRequests[request.requestID] = request | |
def poll(self, block=False): | |
"""Process any new results in the queue.""" | |
while True: | |
# still results pending? | |
if not self.workRequests: | |
raise NoResultsPending | |
# are there still workers to process remaining requests? | |
elif block and not self.workers: | |
raise NoWorkersAvailable | |
try: | |
# get back next results | |
request, result = self._results_queue.get(block=block) | |
# has an exception occured? | |
if request.exception and request.exc_callback: | |
request.exc_callback(request, result) | |
# hand results to callback, if any | |
if request.callback and not \ | |
(request.exception and request.exc_callback): | |
request.callback(request, result) | |
del self.workRequests[request.requestID] | |
except Empty: | |
break | |
def wait(self): | |
"""Wait for results, blocking until all have arrived.""" | |
while 1: | |
try: | |
self.poll(True) | |
except NoResultsPending: | |
break | |
class PageResult(object): | |
"""docstring for PageResult""" | |
def __init__(self, _page, _log_queue): | |
super(PageResult, self).__init__() | |
self.page = _page | |
self.log_queue = _log_queue | |
def initlog(fileName="logfile"): | |
import logging | |
logger = logging.getLogger() | |
hdlr = logging.FileHandler(fileName) | |
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |
hdlr.setFormatter(formatter) | |
logger.addHandler(hdlr) | |
logger.setLevel(logging.NOTSET) | |
return logger | |
logger = initlog() | |
def write_log_callable(): | |
def wrapper(request, result): | |
global logger | |
fname = request.kwds["task"][0] | |
logger.debug(fname) | |
return wrapper | |
def write_callable(request, result): | |
""" 写入网页和写入日志 """ | |
if isinstance(result, PageResult): | |
raise TypeError | |
#写入日志 | |
global logger | |
fname = request.kwds["task"][0] | |
#写入网页 | |
def test(): | |
request_queue = Queue(0) | |
url_list = [] | |
log_queue = Queue(0) | |
deeph = 2 | |
root = "http://m.baidu.com" | |
url_list.append(root) | |
task = (root, 0, "") | |
pool = ThreadPool(10) | |
kwds = { | |
"task" : task, | |
"put_request" : pool.putRequest, | |
"url_list" : url_list, | |
"deeph" : deeph, | |
"root" : root | |
} | |
callback = write_log_callable() | |
print callable(callback) | |
req = WorkRequest(down_page_callable, callback=callback, kwds=kwds) | |
pool.putRequest(req) | |
# pool.wait() | |
# print "done" | |
# print len(url_list) | |
if __name__ == '__main__': | |
test() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment