Skip to content

Instantly share code, notes, and snippets.

@yaoyi
Created February 27, 2014 17:29
Show Gist options
  • Save yaoyi/9254844 to your computer and use it in GitHub Desktop.
Save yaoyi/9254844 to your computer and use it in GitHub Desktop.
from datetime import datetime
from hashlib import md5
from scrapy import log
from scrapy.exceptions import DropItem
from twisted.enterprise import adbapi
class FilterWordsPipeline(object):
"""A pipeline for filtering out items which contain certain words in their
description"""
# put all words in lowercase
words_to_filter = ['politics', 'religion']
def process_item(self, item, spider):
for word in self.words_to_filter:
desc = item.get('description') or ''
if word in desc.lower():
raise DropItem("Contains forbidden word: %s" % word)
else:
return item
class RequiredFieldsPipeline(object):
"""A pipeline to ensure the item have the required fields."""
required_fields = ('name', 'description', 'url')
def process_item(self, item, spider):
for field in self.required_fields:
if not item.get(field):
raise DropItem("Field '%s' missing: %r" % (field, item))
return item
class MySQLStorePipeline(object):
"""A pipeline to store the item in a MySQL database.
This implementation uses Twisted's asynchronous database API.
"""
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbargs = dict(
host=settings['MYSQL_HOST'],
db=settings['MYSQL_DBNAME'],
user=settings['MYSQL_USER'],
passwd=settings['MYSQL_PASSWD'],
charset='utf8',
use_unicode=True,
)
dbpool = adbapi.ConnectionPool('MySQLdb', **dbargs)
return cls(dbpool)
def process_item(self, item, spider):
# run db query in the thread pool
d = self.dbpool.runInteraction(self._do_upsert, item, spider)
d.addErrback(self._handle_error, item, spider)
# at the end return the item in case of success or failure
d.addBoth(lambda _: item)
# return the deferred instead the item. This makes the engine to
# process next item (according to CONCURRENT_ITEMS setting) after this
# operation (deferred) has finished.
return d
def _do_upsert(self, conn, item, spider):
"""Perform an insert or update."""
guid = self._get_guid(item)
now = datetime.utcnow().replace(microsecond=0).isoformat(' ')
conn.execute("""SELECT EXISTS(
SELECT 1 FROM website WHERE guid = %s
)""", (guid, ))
ret = conn.fetchone()[0]
if ret:
conn.execute("""
UPDATE website
SET name=%s, description=%s, url=%s, updated=%s
WHERE guid=%s
""", (item['name'], item['description'], item['url'], now, guid))
spider.log("Item updated in db: %s %r" % (guid, item))
else:
conn.execute("""
INSERT INTO website (guid, name, description, url, updated)
VALUES (%s, %s, %s, %s, %s)
""", (guid, item['name'], item['description'], item['url'], now))
spider.log("Item stored in db: %s %r" % (guid, item))
def _handle_error(self, failure, item, spider):
"""Handle occurred on db interaction."""
# do nothing, just log
log.err(failure)
def _get_guid(self, item):
"""Generates an unique identifier for a given item."""
# hash based solely in the url field
return md5(item['url']).hexdigest()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment