Skip to content

Instantly share code, notes, and snippets.

@laggardkernel
Created July 9, 2020 05:05
Show Gist options
  • Save laggardkernel/0489ac1ef6ee74839ef545c7d72e085e to your computer and use it in GitHub Desktop.
Save laggardkernel/0489ac1ef6ee74839ef545c7d72e085e to your computer and use it in GitHub Desktop.
RedisPipeline for scrapy #python #scrapy

Create a base class RedisPipeline, whichever Pipeline inherits it get a redis connection. Access to the redis conn with self.redis_server.

Dependency: scrapy_redis.

# defaults.py
REDIS_ENCODING = "utf-8"
REDIS_FAILED_URLS_KEY = "%(spidername)s:failed_urls"

REDIS_URLS_AS_SET = True
# pipelines.py
import logging
from itemadapter import ItemAdapter
from scrapy_redis import connection
from . import defaults


class BasePipeline:
    def __init__(self, *args, **kwargs):
        pass

    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        obj = cls(*args, **kwargs)
        return obj

    def open_spider(self, spider):
        pass

    def close_spider(self, spider):
        pass

    def process_item(self, item, spider):
        pass


class RedisMixin:
    """Mixin class to implement reading urls from a redis queue.

    Attributes
    ----------
    redis_key : str (default: REDIS_START_URLS_KEY)
        Redis key where to fetch start URLs from..
    redis_encoding : str (default: REDIS_ENCODING)
        Encoding to use when decoding messages from redis queue.

    Settings
    --------
    REDIS_FAILED_URLS_KEY : str (default: "<spider.name>:failed_urls")
        Default Redis key where to fetch start URLs from..
    REDIS_URLS_AS_SET : bool (default: True)
        Use SET operations to add messages from the redis queue. If False,
        the messages are added using the RPUSH command.
    REDIS_ENCODING : str (default: "utf-8")
        Default encoding to use when decoding messages from redis queue.
    """

    redis_key = None
    redis_encoding = None
    redis_urls_as_set = None

    # Redis client placeholder.
    redis_server = None

    def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.

        This should be called after the spider has set its crawler object.
        """
        if self.redis_server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            # XXX: Raise a deprecation warning.
            crawler = getattr(self, "crawler", None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        if self.redis_key is None:
            self.redis_key = settings.get(
                "REDIS_FAILED_URLS_KEY", defaults.REDIS_FAILED_URLS_KEY
            )

        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        if self.redis_encoding is None:
            self.redis_encoding = settings.get(
                "REDIS_ENCODING", defaults.REDIS_ENCODING
            )
        self.redis_urls_as_set = settings.get(
            "REDIS_URLS_AS_SET", defaults.REDIS_URLS_AS_SET
        )

        self.redis_server = connection.from_settings(crawler.settings)

    def save_failed_urls(self, urls, spider):
        if not isinstance(urls, list):
            urls = [urls]
        use_set = self.redis_urls_as_set
        add_to_redis = self.redis_server.sadd if use_set else self.redis_server.rpush
        key = self.redis_key % {"spidername": spider.name}
        # XXX: Do we need to use a timeout here?
        r = add_to_redis(key, *urls)
        return r


class RedisPipeline(RedisMixin, BasePipeline):
    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        obj = super(RedisPipeline, cls).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj

    def open_spider(self, spider):
        # Note: super() function must be called, in case the class is inherited
        super().open_spider(spider)
        self.redis_key = self.redis_key % ({"spidername": spider.name})
        # TODO: fix logging
        logging.info(
            "Saving failed URLs into redis key '%(redis_key)s' "
            "(encoding: %(redis_encoding)s)",
            self.__dict__,
        )

    def close_spider(self, spider):
        super().close_spider(spider)
        self.redis_server.close()

    def process_item(self, item, spider):
        try:
            item = self._process_item(item, spider)
        except Exception as e:
            adapter = ItemAdapter(item)
            url = adapter.get("detail_url")
            if url:
                spider.logger.info(
                    f"Process failed in {__class__.__name__}, saving to redis: {url}"
                )
                # TODO: save exce into redis key as well?
                self.save_failed_urls(url, spider)
                raise e  # important
        return item

    def _process_item(self, item, spider):
        raise NotImplementedError

Usage

class MyPipeline(RedisPipeline):
    def process_item(self, item, spider):
        self.redis_server
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment