Skip to content

Instantly share code, notes, and snippets.

@shiweifu
Created June 2, 2012 14:18
Show Gist options
  • Save shiweifu/2858604 to your computer and use it in GitHub Desktop.
Save shiweifu/2858604 to your computer and use it in GitHub Desktop.
spider use threadpool by shiweifu
# -*- coding: UTF-8 -*-
import httplib
import urllib2
import socket
import traceback
import re
import BeautifulSoup
import threading
from threadpool import ThreadPool
from Queue import Queue, Empty
import threadpool
import time
import sys
# exceptions
class NoResultsPending(Exception):
"""All work requests have been processed."""
pass
class NoWorkersAvailable(Exception):
"""No worker threads available to process remaining requests."""
pass
class TimeOut(Exception):
"""use urllib open web fail with timeout"""
pass
class ParamsError(Exception):
""" 传递进来的参数不够 """
pass
FILTER_EXTS = [".exe", ".rar", ".zip", ".apk", ".mp3"]
# internal module helper functions
def _handle_thread_exception(request, exc_info):
"""Default exception handler callback function.
This just prints the exception info via ``traceback.print_exception``.
"""
traceback.print_exception(*exc_info)
def urlopen_with_timeout(url, data=None, timeout=3):
# Create these two helper classes fresh each time, since
# timeout needs to be in the closure.
class TimeoutHTTPConnection(httplib.HTTPConnection):
def connect(self):
"""Connect to the host and port specified in __init__."""
msg = "getaddrinfo returns an empty list"
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
if timeout is not None:
self.sock.settimeout(timeout)
if self.debuglevel > 0:
print "connect: (%s, %s)" % (self.host, self.port)
self.sock.connect(sa)
except socket.error, msg:
if self.debuglevel > 0:
print 'connect fail:', (self.host, self.port)
if self.sock:
self.sock.close()
self.sock = None
continue
break
if not self.sock:
raise socket.error, msg
class TimeoutHTTPHandler(urllib2.HTTPHandler):
http_request = urllib2.AbstractHTTPHandler.do_request_
def http_open(self, req):
return self.do_open(TimeoutHTTPConnection, req)
opener = urllib2.build_opener(TimeoutHTTPHandler)
try:
body = opener.open(url, data)
except UnicodeEncodeError, uee:
body = opener.open(url.encode("utf8"), data)
except urllib2.URLError, ue:
print ue
raise TimeOut
except urllib2.HTTPError, he:
print he
raise urllib2.HTTPError
return body
def _get_page(_task, _log_queue):
url = _task[0]
try:
page = urlopen_with_timeout(url).read()#.read()
# page = urllib2.urlopen(url)#.read()
except TimeOut, to:
page = None
_log_queue.put((url, to))
except urllib2.HTTPError, he:
_log_queue.put((url, he))
page = None
return page
def _parse_page(_page, _task, _url_list, _deeph, _filter_exts, _root):
task_list = []
is_filter = False
try:
soup = BeautifulSoup.BeautifulSoup(_page)
except Exception, ex:
#print _task[0]
print _task
print ex
return []
urlall = soup.findAll('a', onclick=None,
href=re.compile('^http:|^/'))
url = _task[0]
if url.endswith('/'):
url = url[:-1]
for i in urlall:
#判断是否是root
if i['href'] == "/":
continue
#相对地址
if i['href'].startswith('/'):
i['href'] = "".join((_root, i['href']))
if _task[1] >= _deeph:
continue
#print "here"
#print i['href']
if i["href"] in _url_list:
continue
else:
for ext in _filter_exts:
#print _task[0]
if i["href"].endswith(ext) == True:
print i["href"], "filter"
is_filter = True
break
if is_filter:
continue
_url_list.append(i['href'])
task_list.append((i['href'], _task[1]+1, _task[0]))
return task_list
def down_page_callable(**kw):
if kw.has_key("filter_exts") == False:
kw["filter_exts"] = FILTER_EXTS
try:
_task = kw["task"]
_put_request = kw["put_request"]
_url_list = kw["url_list"]
_deeph = kw["deeph"]
_root = kw["root"]
_filter_exts = kw["filter_exts"]
except KeyError, ke:
raise ParamsError
_log_queue = Queue(0)
page = _get_page(_task, _log_queue)
if page == None:
# _task_queue.task_done()
return
task_list = _parse_page(page, _task, _url_list, _deeph, _filter_exts, _root)
for _task in task_list:
kwds = {
"task" : _task,
"put_request" : _put_request,
"url_list" : _url_list,
"deeph" : _deeph ,
"root" : _root,
"filter_exts" : _filter_exts
}
req = WorkRequest(down_page_callable, callback=write_log_callable(), kwds=kwds)
# _request_queue.put(req)
_put_request(req)
# classes
class WorkerThread(threading.Thread):
"""Background thread connected to the requests/results queues.
A worker thread sits in the background and picks up work requests from
one queue and puts the results in another until it is dismissed.
"""
def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
"""Set up thread in daemonic mode and start it immediatedly.
``requests_queue`` and ``results_queue`` are instances of
``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
worker thread.
"""
threading.Thread.__init__(self, **kwds)
self.setDaemon(1)
self._requests_queue = requests_queue
self._results_queue = results_queue
self._poll_timeout = poll_timeout
self._dismissed = threading.Event()
self.start()
def run(self):
"""Repeatedly process the job queue until told to exit."""
while True:
if self._dismissed.isSet():
# we are dismissed, break out of loop
break
# get next work request. If we don't get a new request from the
# queue after self._poll_timout seconds, we jump to the start of
# the while loop again, to give the thread a chance to exit.
try:
request = self._requests_queue.get(True, self._poll_timeout)
except Empty:
continue
else:
if self._dismissed.isSet():
# we are dismissed, put back request in queue and exit loop
self._requests_queue.put(request)
break
try:
result = request.callable(*request.args, **request.kwds)
self._results_queue.put((request, result))
except:
request.exception = True
self._results_queue.put((request, sys.exc_info()))
def dismiss(self):
"""Sets a flag to tell the thread to exit when done with current job."""
self._dismissed.set()
class WorkRequest:
"""A request to execute a callable for putting in the request queue later.
See the module function ``makeRequests`` for the common case
where you want to build several ``WorkRequest`` objects for the same
callable but with different arguments for each call.
"""
def __init__(self, callable_, args=None, kwds=None, requestID=None,
callback=None, exc_callback=_handle_thread_exception):
if requestID is None:
self.requestID = id(self)
else:
try:
self.requestID = hash(requestID)
except TypeError:
raise TypeError("requestID must be hashable.")
self.exception = False
self.callback = callback
self.exc_callback = exc_callback
self.callable = callable_
self.args = args or []
self.kwds = kwds or {}
def __str__(self):
return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
(self.requestID, self.args, self.kwds, self.exception)
class ThreadPool:
"""A thread pool, distributing work requests and collecting results.
See the module docstring for more information.
"""
def __init__(self, num_workers, request_queen=None, result_queen=None, q_size=0, resq_size=0, poll_timeout=5):
"""Set up the thread pool and start num_workers worker threads.
``num_workers`` is the number of worker threads to start initially.
If ``q_size > 0`` the size of the work *request queue* is limited and
the thread pool blocks when the queue is full and it tries to put
more work requests in it (see ``putRequest`` method), unless you also
use a positive ``timeout`` value for ``putRequest``.
If ``resq_size > 0`` the size of the *results queue* is limited and the
worker threads will block when the queue is full and they try to put
new results in it.
.. warning:
If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
the possibilty of a deadlock, when the results queue is not pulled
regularly and too many jobs are put in the work requests queue.
To prevent this, always set ``timeout > 0`` when calling
``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
"""
self._requests_queue = request_queen or Queue(q_size)
self._results_queue = result_queen or Queue(resq_size)
self.workers = []
self.dismissedWorkers = []
self.workRequests = {}
self.createWorkers(num_workers, poll_timeout)
def createWorkers(self, num_workers, poll_timeout=5):
"""Add num_workers worker threads to the pool.
``poll_timout`` sets the interval in seconds (int or float) for how
ofte threads should check whether they are dismissed, while waiting for
requests.
"""
for i in range(num_workers):
self.workers.append(WorkerThread(self._requests_queue,
self._results_queue, poll_timeout=poll_timeout))
def dismissWorkers(self, num_workers, do_join=False):
"""Tell num_workers worker threads to quit after their current task."""
dismiss_list = []
for i in range(min(num_workers, len(self.workers))):
worker = self.workers.pop()
worker.dismiss()
dismiss_list.append(worker)
if do_join:
for worker in dismiss_list:
worker.join()
else:
self.dismissedWorkers.extend(dismiss_list)
def joinAllDismissedWorkers(self):
"""Perform Thread.join() on all worker threads that have been dismissed.
"""
for worker in self.dismissedWorkers:
worker.join()
self.dismissedWorkers = []
def putRequest(self, request, block=True, timeout=None):
"""Put work request into work queue and save its id for later."""
assert isinstance(request, WorkRequest)
# don't reuse old work requests
assert not getattr(request, 'exception', None)
self._requests_queue.put(request, block, timeout)
self.workRequests[request.requestID] = request
def poll(self, block=False):
"""Process any new results in the queue."""
while True:
# still results pending?
if not self.workRequests:
raise NoResultsPending
# are there still workers to process remaining requests?
elif block and not self.workers:
raise NoWorkersAvailable
try:
# get back next results
request, result = self._results_queue.get(block=block)
# has an exception occured?
if request.exception and request.exc_callback:
request.exc_callback(request, result)
# hand results to callback, if any
if request.callback and not \
(request.exception and request.exc_callback):
request.callback(request, result)
del self.workRequests[request.requestID]
except Empty:
break
def wait(self):
"""Wait for results, blocking until all have arrived."""
while 1:
try:
self.poll(True)
except NoResultsPending:
break
class PageResult(object):
"""docstring for PageResult"""
def __init__(self, _page, _log_queue):
super(PageResult, self).__init__()
self.page = _page
self.log_queue = _log_queue
def initlog(fileName="logfile"):
import logging
logger = logging.getLogger()
hdlr = logging.FileHandler(fileName)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.NOTSET)
return logger
logger = initlog()
def write_log_callable():
def wrapper(request, result):
global logger
fname = request.kwds["task"][0]
logger.debug(fname)
return wrapper
def write_callable(request, result):
""" 写入网页和写入日志 """
if isinstance(result, PageResult):
raise TypeError
#写入日志
global logger
fname = request.kwds["task"][0]
#写入网页
def test():
request_queue = Queue(0)
url_list = []
log_queue = Queue(0)
deeph = 2
root = "http://m.baidu.com"
url_list.append(root)
task = (root, 0, "")
pool = ThreadPool(10)
kwds = {
"task" : task,
"put_request" : pool.putRequest,
"url_list" : url_list,
"deeph" : deeph,
"root" : root
}
callback = write_log_callable()
print callable(callback)
req = WorkRequest(down_page_callable, callback=callback, kwds=kwds)
pool.putRequest(req)
# pool.wait()
# print "done"
# print len(url_list)
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment