Create a base class RedisPipeline
, whichever Pipeline inherits it get a redis connection. Access to the redis conn with self.redis_server
.
Dependency: scrapy_redis
.
# defaults.py
REDIS_ENCODING = "utf-8"
REDIS_FAILED_URLS_KEY = "%(spidername)s:failed_urls"
REDIS_URLS_AS_SET = True
# pipelines.py
import logging
from itemadapter import ItemAdapter
from scrapy_redis import connection
from . import defaults
class BasePipeline:
def __init__(self, *args, **kwargs):
pass
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
obj = cls(*args, **kwargs)
return obj
def open_spider(self, spider):
pass
def close_spider(self, spider):
pass
def process_item(self, item, spider):
pass
class RedisMixin:
"""Mixin class to implement reading urls from a redis queue.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_FAILED_URLS_KEY : str (default: "<spider.name>:failed_urls")
Default Redis key where to fetch start URLs from..
REDIS_URLS_AS_SET : bool (default: True)
Use SET operations to add messages from the redis queue. If False,
the messages are added using the RPUSH command.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
redis_key = None
redis_encoding = None
redis_urls_as_set = None
# Redis client placeholder.
redis_server = None
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.redis_server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, "crawler", None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
if self.redis_key is None:
self.redis_key = settings.get(
"REDIS_FAILED_URLS_KEY", defaults.REDIS_FAILED_URLS_KEY
)
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
if self.redis_encoding is None:
self.redis_encoding = settings.get(
"REDIS_ENCODING", defaults.REDIS_ENCODING
)
self.redis_urls_as_set = settings.get(
"REDIS_URLS_AS_SET", defaults.REDIS_URLS_AS_SET
)
self.redis_server = connection.from_settings(crawler.settings)
def save_failed_urls(self, urls, spider):
if not isinstance(urls, list):
urls = [urls]
use_set = self.redis_urls_as_set
add_to_redis = self.redis_server.sadd if use_set else self.redis_server.rpush
key = self.redis_key % {"spidername": spider.name}
# XXX: Do we need to use a timeout here?
r = add_to_redis(key, *urls)
return r
class RedisPipeline(RedisMixin, BasePipeline):
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
obj = super(RedisPipeline, cls).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
def open_spider(self, spider):
# Note: super() function must be called, in case the class is inherited
super().open_spider(spider)
self.redis_key = self.redis_key % ({"spidername": spider.name})
# TODO: fix logging
logging.info(
"Saving failed URLs into redis key '%(redis_key)s' "
"(encoding: %(redis_encoding)s)",
self.__dict__,
)
def close_spider(self, spider):
super().close_spider(spider)
self.redis_server.close()
def process_item(self, item, spider):
try:
item = self._process_item(item, spider)
except Exception as e:
adapter = ItemAdapter(item)
url = adapter.get("detail_url")
if url:
spider.logger.info(
f"Process failed in {__class__.__name__}, saving to redis: {url}"
)
# TODO: save exce into redis key as well?
self.save_failed_urls(url, spider)
raise e # important
return item
def _process_item(self, item, spider):
raise NotImplementedError
Usage
class MyPipeline(RedisPipeline):
def process_item(self, item, spider):
self.redis_server