Created
July 23, 2014 21:28
-
-
Save mholubowski/466d6aea4467cf68c3b5 to your computer and use it in GitHub Desktop.
KMR KeywordQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module KeywordQueue | |
extend self | |
QUEUE_KEY = 'keyword-queue' | |
LOCK_KEY = 'keyword-queue:lock' | |
DELAY_TIME = 1 # Time in seconds between scrape batches | |
KW_FAILURES_KEY = 'keyword_queue:keyword_failures' | |
KW_FAILED_QUEUE_KEY = 'keyword_queue:failed_keywords' | |
KW_FAILURE_THRESHOLD = 5 | |
# Add a single Keyword to the queue for scraping | |
def add(keyword, priority: :low) | |
add_many([keyword], priority: priority) | |
end | |
# Queue jobs for keyword scraping | |
# | |
# keywords - Array[Keyword] | |
# priority - Symbol indicating scrape priority (:high or :low) | |
# | |
# Returns nothing | |
def add_many(keywords, priority: :low) | |
case priority | |
when :high | |
keywords.reverse.each { |kw| add_keyword_to_front(kw) } | |
when :low | |
keywords.each { |kw| add_keyword_to_back(kw) } | |
end | |
end | |
# All Keyword IDs currently queued for scraping | |
def ids | |
$redis.lrange(QUEUE_KEY, 0, -1).map(&:to_i) | |
end | |
# All Keywords currently queued for scraping | |
def keywords | |
Keyword.find(ids) | |
end | |
def queue_length | |
$redis.llen(QUEUE_KEY) | |
end | |
# Call out to our proxy pool and scrape as many keywords | |
# as available proxies. Failed scrapes are requed by the RubyScraper | |
# | |
# Return nothing | |
def scrape_next_batch(max=100) | |
num_ips = queue_length > max ? max : queue_length | |
ips = ProxyPool.reserve(num_ips, false)[:ips] | |
ips.each do |ip| | |
if keyword_id = $redis.lpop(QUEUE_KEY) | |
KeywordQueueWorker.perform_async(keyword_id, ip) | |
end | |
end | |
schedule_next_batch unless locked? | |
end | |
def schedule_next_batch | |
delay_for(DELAY_TIME.seconds, retry: false).scrape_next_batch | |
end | |
def abort_all! | |
$redis.del(QUEUE_KEY) | |
end | |
# Mark the KeywordQueue as locked and prevent scrapes from running | |
# Returns nothing | |
def lock! | |
$redis.set(LOCK_KEY, 1) | |
end | |
def locked? | |
$redis.get(LOCK_KEY).present? | |
end | |
def unlock! | |
$redis.del(LOCK_KEY) | |
schedule_next_batch | |
end | |
# Retrieve a list of IDs of failed keywords | |
# | |
# Returns Array[Integer] | |
def failed_keywords | |
$redis.lrange(KW_FAILED_QUEUE_KEY, 0, -1).map(&:to_i) | |
end | |
# Exceptions with a keyword exceeded a threshold - | |
# remove from scrape queue and move to dead queue for inspection | |
# | |
# keyword - Keyword instance | |
# | |
# Returns nothing | |
def mark_keyword_failed!(keyword) | |
$redis.multi do | |
$redis.lpush(KW_FAILED_QUEUE_KEY, keyword.id) | |
$redis.lrem(QUEUE_KEY, 0, keyword.id) | |
end | |
end | |
# Track a failure (exception) associated with a keyword and take action | |
# as appropriate, either re-queueing it or moving it to a dead queue | |
# for inspection if exceptions exceed a threshold | |
# | |
# keyword - Keyword instance | |
# | |
# Returns nothing | |
def handle_keyword_failure(keyword) | |
date = Time.now.strftime('%Y-%m-%d') | |
failure_key = "#{KW_FAILURES_KEY}:#{date}" | |
failure_count = $redis.hincrby(failure_key, keyword.id.to_s, 1) | |
if failure_count > KW_FAILURE_THRESHOLD | |
mark_keyword_failed!(keyword) | |
else | |
add(keyword, priority: :high) | |
end | |
end | |
private | |
def add_keyword_to_front(keyword) | |
$redis.lpush(QUEUE_KEY, keyword.id) | |
end | |
def add_keyword_to_back(keyword) | |
$redis.rpush(QUEUE_KEY, keyword.id) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment