-
-
Save julik/0439916bc8bc440bd5139a7dacdac2a8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "bundler/inline" | |
gemfile do | |
source "https://rubygems.org" | |
gem "async" | |
gem "ferrum" | |
gem "state_machines" | |
gem "breaker_machines" | |
gem "concurrent-ruby" | |
gem "nokogiri" | |
gem "pandoc-ruby" | |
end | |
require "async" | |
require "async/queue" | |
require "async/semaphore" | |
require "ferrum" | |
require "uri" | |
require "json" | |
require "timeout" | |
require "state_machines" | |
require "breaker_machines" | |
# Use concurrent-ruby for thread-safe data structures and atomic operations | |
# Even though Async uses cooperative concurrency, this provides defensive programming | |
# and future-proofing in case threading is added later | |
require "concurrent" | |
require "fileutils" | |
require "digest" | |
require "nokogiri" | |
require "pandoc-ruby" | |
class ScrapedPage | |
attr_accessor :url, :html, :scraped_at, :error_message, :retry_count, :last_error_at | |
def initialize(url) | |
@url = url | |
@html = nil | |
@scraped_at = nil | |
@error_message = nil | |
@retry_count = 0 | |
@last_error_at = nil | |
# Initialize state machine manually | |
self.state = "pending" | |
end | |
state_machine :state, initial: :pending do | |
event :start_processing do | |
transition pending: :processing, retrying: :processing | |
end | |
event :complete do | |
transition processing: :completed | |
end | |
event :mark_failed do | |
transition processing: :failed | |
end | |
event :retry_page do | |
transition failed: :retrying | |
end | |
event :give_up do | |
transition [:failed, :retrying] => :permanently_failed | |
end | |
state :pending do | |
def ready_for_processing? | |
true | |
end | |
end | |
state :processing do | |
def ready_for_processing? | |
false | |
end | |
end | |
state :completed do | |
def ready_for_processing? | |
false | |
end | |
def success? | |
true | |
end | |
end | |
state :failed do | |
def ready_for_processing? | |
can_retry? | |
end | |
def can_retry? | |
retry_count < 3 && (last_error_at.nil? || Time.now - last_error_at > backoff_delay) | |
end | |
def backoff_delay | |
# Exponential backoff: 30s, 60s, 120s | |
30 * (2**retry_count) | |
end | |
end | |
state :retrying do | |
def ready_for_processing? | |
true | |
end | |
end | |
state :permanently_failed do | |
def ready_for_processing? | |
false | |
end | |
def success? | |
false | |
end | |
end | |
end | |
def record_error(error) | |
@error_message = error.message | |
@last_error_at = Time.now | |
@retry_count += 1 | |
end | |
def record_success(html) | |
@html = html | |
@scraped_at = Time.now | |
@error_message = nil | |
end | |
def to_h | |
{ | |
url: @url, | |
html: @html, | |
scraped_at: @scraped_at, | |
state: state, | |
error_message: @error_message, | |
retry_count: @retry_count, | |
last_error_at: @last_error_at | |
} | |
end | |
def to_state_h | |
{ | |
url: @url, | |
scraped_at: @scraped_at, | |
state: state, | |
error_message: @error_message, | |
retry_count: @retry_count, | |
last_error_at: @last_error_at | |
} | |
end | |
def self.from_h(data) | |
page = new(data["url"]) | |
page.html = data["html"] | |
page.scraped_at = data["scraped_at"] | |
page.error_message = data["error_message"] | |
page.retry_count = data["retry_count"] || 0 | |
page.last_error_at = data["last_error_at"] | |
# Restore state | |
target_state = data["state"]&.to_sym || :pending | |
case target_state | |
when :processing then page.start_processing | |
when :completed then page.start_processing && page.complete | |
when :failed then page.start_processing && page.mark_failed | |
when :retrying then page.start_processing && page.mark_failed && page.retry_page | |
when :permanently_failed then page.start_processing && page.mark_failed && page.give_up | |
end | |
page | |
end | |
end | |
class AsyncScraper | |
include BreakerMachines::DSL | |
attr_reader :browser, :claimed_urls, :completed_urls, :url_queue, :results, :pages | |
# Content size limit - 10MB default | |
MAX_CONTENT_SIZE = 10_000_000 | |
def initialize(*urls, mode: :scrape, max_concurrent: 5, timeout: 10, state_container: nil, max_pages: nil, max_content_size: MAX_CONTENT_SIZE) | |
raise ArgumentError, "urls must be provided" if !urls || urls.empty? | |
raise ArgumentError, "mode must be :scrape or :spider" unless [:scrape, :spider].include?(mode) | |
@initial_urls = urls | |
@mode = mode | |
@max_concurrent = max_concurrent | |
@timeout = timeout | |
@state_container = state_container | |
@max_pages = max_pages | |
@max_content_size = max_content_size | |
@claimed_urls = Concurrent::Set.new # URLs currently being processed | |
@completed_urls = Concurrent::Set.new # URLs that have been processed (success or permanent failure) | |
@discovered_urls = Concurrent::Set.new # All URLs discovered during crawling | |
@results = Concurrent::Array.new | |
@pages = Concurrent::Hash.new # URL -> ScrapedPage mapping | |
@semaphore = Async::Semaphore.new(max_concurrent) | |
@pages_scraped = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for scraped pages | |
@active_jobs = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for active jobs | |
@work_available = true # Flag to signal workers when to stop | |
# Initialize state machine manually | |
self.state = "idle" | |
load_state if @state_container | |
end | |
state_machine :state, initial: :idle do | |
event :start do | |
transition idle: :running, stopped: :running, error: :running | |
end | |
event :pause do | |
transition running: :paused | |
end | |
event :resume do | |
transition paused: :running | |
end | |
event :stop do | |
transition [:running, :paused, :error] => :stopped | |
end | |
event :error_occurred do | |
transition [:running, :paused] => :error | |
end | |
event :reset do | |
transition any => :idle | |
end | |
state :idle do | |
def can_start? | |
true | |
end | |
end | |
state :running do | |
def active? | |
true | |
end | |
def can_process_pages? | |
true | |
end | |
end | |
state :paused do | |
def active? | |
true | |
end | |
def can_process_pages? | |
false | |
end | |
end | |
state :stopped do | |
def active? | |
false | |
end | |
def can_process_pages? | |
false | |
end | |
end | |
state :error do | |
def active? | |
false | |
end | |
def can_process_pages? | |
false | |
end | |
end | |
end | |
# Define circuit breaker for page scraping | |
circuit :page_scraping do | |
threshold failures: 5, within: 1.minute # Trip after 5 failures in 1 minute | |
reset_after 10.seconds # Wait 10 seconds before trying again | |
fallback { nil } # Return nil when circuit is open | |
# Additional configuration for robustness | |
on_open { |circuit| puts "Circuit breaker opened for page scraping - too many failures" } | |
on_close { |circuit| puts "Circuit breaker closed for page scraping - service recovered" } | |
on_half_open { |circuit| puts "Circuit breaker half-open for page scraping - testing recovery" } | |
end | |
def run(&block) | |
start # Transition to running state | |
Sync(annotation: "AsyncScraper#run") do |task| | |
# Start browser | |
@browser = Ferrum::Browser.new( | |
headless: true, | |
timeout: @timeout, | |
pending_connection_errors: false | |
) | |
begin | |
@url_queue = Async::Queue.new | |
# Add initial URLs to queue if not resuming | |
if @discovered_urls.empty? | |
@initial_urls.each { |url| @discovered_urls.add(url) } | |
end | |
# Count URLs to process | |
urls_to_process = @discovered_urls.reject { |url| @completed_urls.include?(url) } | |
retryable_pages = @pages.values.select { |page| page.ready_for_processing? } | |
# Push URLs to queue | |
urls_to_process.each { |url| @url_queue.push(url) } | |
retryable_pages.each do |page| | |
@url_queue.push(page.url) | |
@discovered_urls.add(page.url) | |
end | |
# If no work to do, complete immediately | |
total_queued = urls_to_process.size + retryable_pages.size | |
if total_queued == 0 | |
puts "No URLs to process - all work completed" | |
return @results | |
end | |
# Start orchestrator in background | |
orchestrator = Async do | |
orchestrate_work(&block) | |
end | |
# Start worker pool using simplified worker loop | |
workers = @max_concurrent.times.map do |worker_id| | |
Async do | |
worker_loop(worker_id, &block) | |
end | |
end | |
# Wait for orchestrator to signal completion with timeout | |
begin | |
orchestrator.wait | |
rescue => e | |
puts "Orchestrator error: #{e.message}" | |
end | |
# Ensure work is stopped | |
@work_available = false | |
# Send stop signals to all workers | |
@max_concurrent.times { | |
begin | |
@url_queue.push(nil) | |
rescue | |
# Queue might be closed | |
end | |
} | |
# Wait for all workers to complete with individual timeouts | |
workers.each_with_index do |worker, idx| | |
Timeout.timeout(2) do # Reduced to 2 seconds | |
worker.wait | |
end | |
rescue Timeout::Error | |
puts "Worker #{idx} timed out, force stopping" | |
begin | |
worker.stop | |
rescue | |
nil | |
end | |
end | |
# Ensure all async tasks are completed | |
if @active_jobs.value > 0 | |
puts "Waiting for #{@active_jobs.value} active jobs to complete..." | |
start_time = Time.now | |
while @active_jobs.value > 0 && (Time.now - start_time) < 2 # Reduced to 2 seconds | |
sleep(0.1) | |
end | |
if @active_jobs.value > 0 | |
puts "Forcibly stopping #{@active_jobs.value} remaining jobs" | |
# Note: We can't force reset atomic counter, jobs should clean up | |
end | |
end | |
ensure | |
save_state if @state_container | |
# Force close the queue and drain any remaining items | |
begin | |
@url_queue&.close | |
while @url_queue&.pop(timeout: 0.1); end | |
rescue | |
# Ignore errors during cleanup | |
end | |
@browser&.quit | |
stop # Transition to stopped state | |
end | |
@results | |
end | |
end | |
private | |
# Orchestrator - monitors work and decides when to stop | |
def orchestrate_work(&block) | |
loop do | |
sleep(0.1) # Check every 100ms for faster response | |
# Check if we should stop | |
if should_stop_all_work? | |
puts "Orchestrator: Stopping all work" | |
@work_available = false | |
# Clear the queue to prevent workers from picking up more work | |
begin | |
while @url_queue.any? | |
@url_queue.pop | |
end | |
rescue | |
# Queue might be closed | |
end | |
break | |
end | |
end | |
end | |
# Check if all work is complete | |
def should_stop_all_work? | |
# Stop if we've hit page limit and no active jobs | |
if max_pages_reached? | |
return @active_jobs.value == 0 | |
end | |
# Stop if all pages are processed and no active jobs | |
return true if @active_jobs.value == 0 && @url_queue.empty? && should_close_queue? | |
# Otherwise keep going | |
false | |
rescue | |
true # Stop on any error | |
end | |
# Simplified worker loop | |
def worker_loop(worker_id, &block) | |
while @work_available | |
# Check page limit before getting URL | |
if max_pages_reached? | |
puts "Worker #{worker_id}: Page limit reached, stopping" | |
break | |
end | |
url = @url_queue.pop | |
break if url.nil? || url == @stop_signal # Stop signal | |
# Skip if already completed | |
if @completed_urls.include?(url) | |
puts "Worker #{worker_id}: Skipping #{url} (already completed)" | |
next | |
end | |
# Try to claim this URL for processing - atomic operation | |
unless @claimed_urls.add?(url) | |
puts "Worker #{worker_id}: Another worker claimed #{url}" | |
next | |
end | |
# Double-check page limit before processing | |
if max_pages_reached? | |
puts "Worker #{worker_id}: Page limit reached after claiming URL" | |
break | |
end | |
# Get or create page object | |
page = @pages[url] ||= ScrapedPage.new(url) | |
next unless page.ready_for_processing? | |
# Process the page with semaphore | |
@semaphore.async do | |
@active_jobs.increment | |
begin | |
# Final check inside semaphore | |
if !max_pages_reached? | |
scrape_page(page, worker_id, &block) | |
else | |
puts "Worker #{worker_id}: Skipping page due to limit" | |
end | |
ensure | |
@active_jobs.decrement | |
end | |
end | |
end | |
rescue => e | |
puts "Worker #{worker_id} error: #{e.message}" | |
end | |
def scrape_page(page_obj, worker_id, &block) | |
url = page_obj.url | |
# Transition page to processing state | |
page_obj.start_processing | |
# Note: URL already added to visited_urls in worker_loop | |
# Use circuit breaker for page scraping | |
result = circuit(:page_scraping).wrap do | |
browser_page = @browser.create_page | |
browser_page.go_to(url) | |
# Wait for page to load | |
browser_page.network.wait_for_idle | |
# Scroll to bottom to trigger lazy loading | |
browser_page.execute("window.scrollTo(0, document.body.scrollHeight)") | |
sleep(0.5) # Brief pause for content to load | |
# Scroll back to top for consistency | |
browser_page.execute("window.scrollTo(0, 0)") | |
sleep(0.2) # Brief pause | |
# Get page HTML | |
html = browser_page.body | |
# Check content size limit | |
if html.bytesize > @max_content_size | |
raise StandardError.new("Content size exceeds limit: #{html.bytesize} bytes > #{@max_content_size} bytes") | |
end | |
# Record success and transition state | |
page_obj.record_success(html) | |
page_obj.complete | |
# Move from claimed to completed | |
@claimed_urls.delete(url) | |
@completed_urls.add(url) | |
# Increment pages scraped counter | |
@pages_scraped.increment | |
# Create result | |
result_data = { | |
url: url, | |
html: html, | |
scraped_at: page_obj.scraped_at | |
} | |
# Yield to block if provided (for Rails integration) | |
yield(result_data) if block_given? | |
@results << result_data | |
puts "Worker #{worker_id}: Scraped #{url} (#{@pages_scraped.value}#{@max_pages ? "/#{@max_pages}" : ""}) [#{page_obj.state}]" | |
# If spider mode and haven't hit limit, find and queue new URLs | |
if @mode == :spider && !max_pages_reached? | |
extract_and_queue_links(html, url) | |
end | |
result_data | |
ensure | |
browser_page&.close | |
end | |
# Handle circuit breaker result | |
if result.nil? | |
# Circuit breaker is open or call failed | |
page_obj.record_error(StandardError.new("Circuit breaker open or request failed")) | |
page_obj.mark_failed | |
puts "Failed to scrape #{url}: Circuit breaker open or error occurred [#{page_obj.state}]" | |
# Check if page can be retried | |
if page_obj.failed? && page_obj.can_retry? | |
page_obj.retry_page | |
# Release the claim so it can be retried | |
@claimed_urls.delete(url) | |
@url_queue.push(url) # Re-queue for retry | |
puts "Re-queued #{url} for retry (attempt #{page_obj.retry_count + 1}) [#{page_obj.state}]" | |
elsif page_obj.retry_count >= 3 | |
page_obj.give_up | |
# Move from claimed to completed (permanently failed) | |
@claimed_urls.delete(url) | |
@completed_urls.add(url) | |
puts "Giving up on #{url} after #{page_obj.retry_count} attempts [#{page_obj.state}]" | |
end | |
end | |
# Save state periodically | |
save_state if @state_container && @completed_urls.size % 10 == 0 | |
rescue => e | |
# Handle unexpected errors | |
page_obj.record_error(e) | |
page_obj.mark_failed | |
puts "Unexpected error scraping #{url}: #{e.message} [#{page_obj.state}]" | |
# Retry logic for unexpected errors | |
if page_obj.can_retry? | |
page_obj.retry_page | |
# Release the claim so it can be retried | |
@claimed_urls.delete(url) | |
@url_queue.push(url) | |
puts "Re-queued #{url} for retry after error [#{page_obj.state}]" | |
else | |
page_obj.give_up | |
# Move from claimed to completed (permanently failed) | |
@claimed_urls.delete(url) | |
@completed_urls.add(url) | |
puts "Giving up on #{url} after errors [#{page_obj.state}]" | |
end | |
end | |
def extract_and_queue_links(html, current_url) | |
current_uri = URI.parse(current_url) | |
# Use Nokogiri for better performance and accuracy | |
doc = Nokogiri::HTML(html) | |
links = doc.css("a[href]").map { |anchor| anchor["href"] }.compact | |
links.each do |href| | |
next if href.empty? || href.start_with?("#", "javascript:", "mailto:", "tel:", "ftp:") | |
# Convert to absolute URL | |
absolute_url = URI.join(current_url, href).to_s | |
# Remove fragment identifier (#comment-XX, #section, etc.) - treat as same page | |
clean_url = absolute_url.split("#").first | |
# Check if URL is within the same host as current page | |
uri = URI.parse(clean_url) | |
next unless uri.host == current_uri.host | |
# Skip asset files (CSS, JS, images, fonts, etc.) | |
path = uri.path.downcase | |
next if path.match?(/\.(css|js|png|jpg|jpeg|gif|svg|ico|pdf|zip|woff|woff2|ttf|eot|mp4|mp3|avi|mov)(\?|$)/) | |
# Add to discovered set and queue only if it's a new URL | |
if @discovered_urls.add?(clean_url) # Only add to queue if newly discovered | |
@url_queue.push(clean_url) | |
end | |
end | |
rescue => e | |
puts "Error extracting links from #{current_url}: #{e.message}" | |
end | |
def max_pages_reached? | |
@max_pages && @pages_scraped.value >= @max_pages | |
end | |
def should_close_queue? | |
# Check if there are any pages that can still be retried | |
has_retryable_pages = @pages.values.any? { |page| page.ready_for_processing? } | |
if @host | |
# Spider mode: close when no more URLs can be discovered | |
# This happens when all pages are either completed or permanently failed | |
# and no pages are ready for retry | |
all_pages_processed = @pages.values.all? { |page| page.completed? || page.permanently_failed? } | |
all_pages_processed && !has_retryable_pages | |
else | |
# Constrained mode: close when all initial URLs have been processed | |
all_initial_processed = @initial_urls.all? do |url| | |
page = @pages[url] | |
page && (page.completed? || page.permanently_failed?) | |
end | |
all_initial_processed && !has_retryable_pages | |
end | |
end | |
def save_state | |
return unless @state_container | |
state = { | |
host: @host, | |
initial_urls: @initial_urls, | |
completed_urls: @completed_urls.to_a, | |
discovered_urls: @discovered_urls.to_a, | |
max_concurrent: @max_concurrent, | |
timeout: @timeout, | |
max_pages: @max_pages, | |
pages_scraped: @pages_scraped.value, | |
scraper_state: self.state, | |
pages: @pages.transform_values(&:to_state_h) | |
} | |
@state_container.write_state(state) | |
end | |
def load_state | |
return unless @state_container | |
state = @state_container.read_state | |
return unless state | |
@completed_urls = Concurrent::Set.new(state["completed_urls"] || []) | |
# Backward compatibility: check for both new and old key names | |
discovered_urls_data = state["discovered_urls"] || state["pending_urls"] || [] | |
@discovered_urls = Concurrent::Set.new(discovered_urls_data) | |
@pages_scraped = Concurrent::AtomicFixnum.new(state["pages_scraped"] || 0) | |
# Restore page states | |
if state["pages"] | |
@pages = state["pages"].transform_values { |page_data| ScrapedPage.from_h(page_data) } | |
end | |
# Note: scraper state will be restored when run() is called | |
completed_pages = @pages.values.count { |p| p.completed? } | |
failed_pages = @pages.values.count { |p| p.permanently_failed? } | |
retry_pages = @pages.values.count { |p| p.retrying? } | |
puts "Resuming with #{@completed_urls.size} completed, #{@discovered_urls.size} discovered URLs, #{@pages_scraped.value} pages scraped" | |
puts "Page states: #{completed_pages} completed, #{failed_pages} failed, #{retry_pages} retrying" | |
end | |
end | |
class AsyncMarkdownScraper | |
attr_reader :scraper | |
def initialize(*urls, **scraper_options) | |
@scraper = AsyncScraper.new(*urls, **scraper_options) | |
end | |
def run(&block) | |
@scraper.run do |page_data| | |
# Convert to markdown with frontmatter | |
markdown_data = process_page(page_data) | |
# Yield the processed data to the block | |
yield(markdown_data) if block_given? | |
end | |
end | |
private | |
def process_page(page_data) | |
html = page_data[:html] | |
url = page_data[:url] | |
puts "Processing: #{url} (#{html.length} chars)" | |
# Extract frontmatter | |
frontmatter = extract_meta_frontmatter(html, url) | |
puts " → Extracted frontmatter" | |
# Convert HTML to clean markdown | |
markdown_content = html_to_clean_markdown(html) | |
puts " → Converted to markdown (#{markdown_content.length} chars)" | |
# Add markdown and frontmatter keys to the original page data | |
page_data.merge(markdown: markdown_content, frontmatter: frontmatter) | |
end | |
# Helper method to extract meta information as YAML front matter | |
def extract_meta_frontmatter(html, url) | |
doc = Nokogiri::HTML(html) | |
frontmatter = ["---"] | |
# Extract title | |
title = doc.at("title")&.text&.strip | |
frontmatter << "title: \"#{title.gsub('"', '\\"')}\"" if title && !title.empty? | |
# Extract meta description | |
description = doc.at('meta[name="description"]')&.[]("content")&.strip | |
frontmatter << "description: \"#{description.gsub('"', '\\"')}\"" if description && !description.empty? | |
# Extract meta keywords | |
keywords = doc.at('meta[name="keywords"]')&.[]("content")&.strip | |
if keywords && !keywords.empty? | |
keywords_array = keywords.split(",").map(&:strip).reject(&:empty?) | |
frontmatter << "keywords:" | |
keywords_array.each { |keyword| frontmatter << " - \"#{keyword.gsub('"', '\\"')}\"" } | |
end | |
# Extract author information | |
author = doc.at('meta[name="author"]')&.[]("content")&.strip | |
frontmatter << "author: \"#{author.gsub('"', '\\"')}\"" if author && !author.empty? | |
# Extract language | |
lang = doc.at("html")&.[]("lang") || doc.at('meta[http-equiv="content-language"]')&.[]("content") | |
frontmatter << "language: \"#{lang}\"" if lang && !lang.empty? | |
# Extract canonical URL | |
canonical = doc.at('link[rel="canonical"]')&.[]("href")&.strip | |
frontmatter << "canonical_url: \"#{canonical}\"" if canonical && !canonical.empty? | |
# Extract robots meta | |
robots = doc.at('meta[name="robots"]')&.[]("content")&.strip | |
frontmatter << "robots: \"#{robots}\"" if robots && !robots.empty? | |
# Add URL for reference | |
frontmatter << "url: \"#{url}\"" | |
# Add scraped timestamp | |
frontmatter << "scraped_at: \"#{Time.now.iso8601}\"" | |
frontmatter << "---" | |
frontmatter << "" | |
frontmatter.join("\n") | |
end | |
# Helper method to convert HTML to clean markdown | |
def html_to_clean_markdown(html) | |
doc = Nokogiri::HTML(html) | |
# Remove UI and navigation elements first | |
doc.css("script, style, svg, noscript, iframe, object, embed, applet, form, nav, footer").remove | |
doc.css("[data-testid*='nav'], [data-testid*='footer']").remove | |
doc.xpath("//comment()").remove | |
# Use pandoc with HTML reader options to flatten nested divs and spans | |
PandocRuby.convert( | |
doc.to_html, | |
:from => "html-native_divs-native_spans-empty_paragraphs", | |
:to => "markdown-raw_html-auto_identifiers", | |
"wrap" => "none" | |
) | |
end | |
end | |
# Example state containers | |
class FileStateContainer | |
def initialize(file_path) | |
@file_path = file_path | |
end | |
def read_state | |
return nil unless File.exist?(@file_path) | |
JSON.parse(File.read(@file_path)) | |
rescue JSON::ParserError | |
nil | |
end | |
def write_state(state) | |
File.write(@file_path, JSON.pretty_generate(state)) | |
puts "State saved to #{@file_path}" | |
end | |
end | |
class RedisStateContainer | |
def initialize(redis_client, key) | |
@redis = redis_client | |
@key = key | |
end | |
def read_state | |
json = @redis.get(@key) | |
json ? JSON.parse(json) : nil | |
rescue JSON::ParserError | |
nil | |
end | |
def write_state(state) | |
@redis.set(@key, JSON.generate(state)) | |
puts "State saved to Redis key: #{@key}" | |
end | |
end | |
class RailsModelStateContainer | |
def initialize(model_instance) | |
@model = model_instance | |
end | |
def read_state | |
return nil unless @model.scraper_state.present? | |
JSON.parse(@model.scraper_state) | |
rescue JSON::ParserError | |
nil | |
end | |
def write_state(state) | |
@model.update!(scraper_state: JSON.generate(state)) | |
puts "State saved to model ID: #{@model.id}" | |
end | |
end | |
# Example usage demonstrating proper resource management: | |
if __FILE__ == $0 | |
# Helper method that scopes resources properly (like an application would) | |
def run_scraping_example(name, &block) | |
puts "=== #{name} ===" | |
begin | |
block.call | |
puts "✓ #{name} completed successfully" | |
rescue => e | |
puts "✗ #{name} failed: #{e.message}" | |
ensure | |
# Force cleanup of any lingering resources | |
ObjectSpace.each_object(Ferrum::Browser) { |browser| | |
begin | |
browser.quit | |
rescue | |
nil | |
end | |
} | |
ObjectSpace.each_object(Async::Task) { |task| | |
begin | |
task.stop | |
rescue | |
nil | |
end | |
} | |
GC.start | |
sleep(0.1) # Brief pause for cleanup | |
end | |
puts | |
end | |
# Example 1: Basic scraping of specific URLs | |
run_scraping_example("Basic Scraping Example") do | |
results = [] | |
AsyncScraper.new( | |
"https://radioactive-labs.github.io/plutonium-core/", | |
"https://radioactive-labs.github.io/plutonium-core/guide/", | |
max_concurrent: 2, | |
max_pages: 2 | |
).run do |page_data| | |
results << page_data | |
puts "Scraped: #{page_data[:url]} (#{page_data[:html].length} chars)" | |
end | |
puts "Processed #{results.size} pages" | |
end | |
# Example 2: Spider mode with markdown conversion using AsyncMarkdownScraper | |
run_scraping_example("Spider Mode with AsyncMarkdownScraper Example") do | |
state_container = FileStateContainer.new("scraper_state.json") | |
output_folder = "scraped_pages" | |
# Create output folder | |
FileUtils.mkdir_p(output_folder) | |
# Helper method to generate filename from URL | |
def generate_filename(url) | |
uri = URI.parse(url) | |
# Build filename from URL components | |
parts = [] | |
parts << uri.host.gsub(/[^a-z0-9\-_]/i, "-") if uri.host | |
if uri.path && uri.path != "/" | |
path_part = uri.path.gsub(/[^a-z0-9\-_]/i, "-").squeeze("-") | |
parts << path_part unless path_part.empty? | |
else | |
parts << "index" | |
end | |
filename = parts.join("-").gsub(/^-|-$/, "") | |
filename = "page" if filename.empty? | |
"#{filename}.md" | |
end | |
AsyncMarkdownScraper.new( | |
"https://radioactive-labs.github.io/plutonium-core/", | |
mode: :spider, | |
max_concurrent: 3, | |
max_pages: 5, | |
state_container: state_container, | |
max_content_size: 5_000_000 | |
).run do |page_data| | |
# Combine frontmatter and markdown | |
content = page_data[:frontmatter] + page_data[:markdown] | |
# Generate filename and save | |
filename = generate_filename(page_data[:url]) | |
file_path = File.join(output_folder, filename) | |
File.write(file_path, content) | |
puts "Saved: #{page_data[:url]} → #{filename}" | |
end | |
puts "All pages saved to: #{output_folder}/" | |
end | |
# Example 3: Resumable scraping with state persistence | |
run_scraping_example("Resumable Scraping Example") do | |
state_container = FileStateContainer.new("resumable_scraper_state.json") | |
AsyncScraper.new( | |
"https://radioactive-labs.github.io/plutonium-core/", | |
mode: :spider, | |
max_concurrent: 2, | |
max_pages: 3, | |
state_container: state_container | |
).run do |page_data| | |
puts "Processed: #{page_data[:url]}" | |
end | |
puts "State saved - can resume later by running the same command" | |
end | |
puts "🎉 All examples completed!" | |
puts "💡 Notice: Each scraper was scoped to its own block" | |
puts "🔄 Resources automatically cleaned up between examples" | |
# Clean exit (like application shutdown) | |
exit(0) | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "bundler/inline" | |
gemfile do | |
source "https://rubygems.org" | |
gem "minitest" | |
gem "async" | |
gem "ferrum" | |
gem "state_machines" | |
gem "breaker_machines" | |
gem "concurrent-ruby" | |
gem "nokogiri" | |
gem "pandoc-ruby" | |
gem "webmock" | |
gem "timecop" | |
end | |
require "minitest/autorun" | |
require "minitest/pride" | |
require "webmock/minitest" | |
require "timecop" | |
require_relative "async_scraper" | |
class ScrapedPageTest < Minitest::Test | |
def setup | |
@url = "https://example.com" | |
@page = ScrapedPage.new(@url) | |
end | |
def test_initialize | |
assert_equal @url, @page.url | |
assert_nil @page.html | |
assert_nil @page.scraped_at | |
assert_nil @page.error_message | |
assert_equal 0, @page.retry_count | |
assert_nil @page.last_error_at | |
assert_equal "pending", @page.state | |
end | |
def test_state_machine_starts_in_pending_state | |
assert @page.pending? | |
assert @page.ready_for_processing? | |
end | |
def test_transitions_from_pending_to_processing | |
@page.start_processing | |
assert @page.processing? | |
refute @page.ready_for_processing? | |
end | |
def test_transitions_from_processing_to_completed | |
@page.start_processing | |
@page.complete | |
assert @page.completed? | |
assert @page.success? | |
refute @page.ready_for_processing? | |
end | |
def test_transitions_from_processing_to_failed | |
@page.start_processing | |
@page.mark_failed | |
assert @page.failed? | |
assert @page.can_retry? | |
assert @page.ready_for_processing? | |
end | |
def test_transitions_from_failed_to_retrying | |
@page.start_processing | |
@page.mark_failed | |
@page.retry_page | |
assert @page.retrying? | |
assert @page.ready_for_processing? | |
end | |
def test_transitions_to_permanently_failed_after_giving_up | |
@page.start_processing | |
@page.mark_failed | |
@page.give_up | |
assert @page.permanently_failed? | |
refute @page.success? | |
refute @page.ready_for_processing? | |
end | |
def test_record_error | |
error = StandardError.new("Test error") | |
Timecop.freeze do | |
@page.record_error(error) | |
assert_equal "Test error", @page.error_message | |
assert_equal Time.now, @page.last_error_at | |
assert_equal 1, @page.retry_count | |
end | |
end | |
def test_record_success | |
html = "<html><body>Test</body></html>" | |
Timecop.freeze do | |
@page.record_success(html) | |
assert_equal html, @page.html | |
assert_equal Time.now, @page.scraped_at | |
assert_nil @page.error_message | |
end | |
end | |
def test_can_retry_when_count_less_than_3 | |
@page.start_processing | |
@page.mark_failed | |
assert @page.can_retry? | |
end | |
def test_prevents_retry_when_count_reaches_3 | |
@page.start_processing | |
@page.mark_failed | |
@page.instance_variable_set(:@retry_count, 3) | |
refute @page.can_retry? | |
end | |
def test_respects_backoff_delay | |
@page.start_processing | |
@page.mark_failed | |
Timecop.freeze do | |
@page.record_error(StandardError.new("Error")) | |
refute @page.can_retry? | |
# After first error, retry_count is 1, so backoff is 30 * (2**1) = 60 seconds | |
Timecop.travel(61) # Past the 60 second backoff | |
assert @page.can_retry? | |
end | |
end | |
def test_implements_exponential_backoff | |
@page.start_processing | |
@page.mark_failed | |
assert_equal 30, @page.send(:backoff_delay) | |
@page.instance_variable_set(:@retry_count, 1) | |
assert_equal 60, @page.send(:backoff_delay) | |
@page.instance_variable_set(:@retry_count, 2) | |
assert_equal 120, @page.send(:backoff_delay) | |
end | |
def test_recreates_page_from_hash_data | |
data = { | |
"url" => @url, | |
"html" => "<html>test</html>", | |
"scraped_at" => Time.now, | |
"state" => "completed", | |
"error_message" => nil, | |
"retry_count" => 0, | |
"last_error_at" => nil | |
} | |
restored_page = ScrapedPage.from_h(data) | |
assert_equal @url, restored_page.url | |
assert_equal "<html>test</html>", restored_page.html | |
assert restored_page.completed? | |
end | |
end | |
class AsyncScraperTest < Minitest::Test | |
def setup | |
@urls = ["https://example.com", "https://example.com/page2"] | |
@scraper = AsyncScraper.new(*@urls, max_concurrent: 2, timeout: 5) | |
end | |
def test_sets_configuration_correctly | |
assert_equal @urls, @scraper.instance_variable_get(:@initial_urls) | |
assert_equal 2, @scraper.instance_variable_get(:@max_concurrent) | |
assert_equal 5, @scraper.instance_variable_get(:@timeout) | |
assert_equal "idle", @scraper.state | |
end | |
def test_raises_error_for_empty_urls | |
assert_raises(ArgumentError) { AsyncScraper.new } | |
end | |
def test_raises_error_for_invalid_mode | |
assert_raises(ArgumentError) { AsyncScraper.new("https://example.com", mode: :invalid) } | |
end | |
def test_accepts_spider_mode | |
spider_scraper = AsyncScraper.new("https://example.com", mode: :spider) | |
assert_equal :spider, spider_scraper.instance_variable_get(:@mode) | |
end | |
def test_sets_default_values | |
default_scraper = AsyncScraper.new("https://example.com") | |
assert_equal :scrape, default_scraper.instance_variable_get(:@mode) | |
assert_equal 5, default_scraper.instance_variable_get(:@max_concurrent) | |
assert_equal 10, default_scraper.instance_variable_get(:@timeout) | |
assert_equal AsyncScraper::MAX_CONTENT_SIZE, default_scraper.instance_variable_get(:@max_content_size) | |
end | |
def test_starts_in_idle_state | |
assert @scraper.idle? | |
assert @scraper.can_start? | |
end | |
def test_transitions_to_running_when_started | |
@scraper.start | |
assert @scraper.running? | |
assert @scraper.active? | |
assert @scraper.can_process_pages? | |
end | |
def test_can_pause_and_resume | |
@scraper.start | |
@scraper.pause | |
assert @scraper.paused? | |
assert @scraper.active? | |
refute @scraper.can_process_pages? | |
@scraper.resume | |
assert @scraper.running? | |
assert @scraper.can_process_pages? | |
end | |
def test_can_stop_from_any_active_state | |
@scraper.start | |
@scraper.stop | |
assert @scraper.stopped? | |
refute @scraper.active? | |
refute @scraper.can_process_pages? | |
end | |
def test_handles_error_state | |
@scraper.start | |
@scraper.error_occurred | |
assert @scraper.error? | |
refute @scraper.active? | |
refute @scraper.can_process_pages? | |
end | |
def test_can_reset_to_idle | |
@scraper.start | |
@scraper.stop | |
@scraper.reset | |
assert @scraper.idle? | |
end | |
def test_returns_false_when_no_max_pages_set | |
refute @scraper.send(:max_pages_reached?) | |
end | |
def test_returns_true_when_max_pages_reached | |
limited_scraper = AsyncScraper.new("https://example.com", max_pages: 2) | |
limited_scraper.instance_variable_get(:@pages_scraped).increment | |
limited_scraper.instance_variable_get(:@pages_scraped).increment | |
assert limited_scraper.send(:max_pages_reached?) | |
end | |
def test_extracts_and_queues_same_host_links_only | |
html = <<~HTML | |
<html> | |
<body> | |
<a href="/page1">Page 1</a> | |
<a href="https://example.com/page2">Page 2</a> | |
<a href="https://other-site.com/page">Other Site</a> | |
</body> | |
</html> | |
HTML | |
current_url = "https://example.com" | |
@scraper.instance_variable_set(:@url_queue, Async::Queue.new) | |
@scraper.send(:extract_and_queue_links, html, current_url) | |
discovered_urls = @scraper.instance_variable_get(:@discovered_urls) | |
assert_includes discovered_urls, "https://example.com/page1" | |
refute_includes discovered_urls, "https://other-site.com/page" | |
end | |
def test_filters_out_non_content_urls | |
html = <<~HTML | |
<html> | |
<body> | |
<a href="/style.css">CSS File</a> | |
<a href="/image.jpg">Image</a> | |
</body> | |
</html> | |
HTML | |
current_url = "https://example.com" | |
@scraper.instance_variable_set(:@url_queue, Async::Queue.new) | |
@scraper.send(:extract_and_queue_links, html, current_url) | |
discovered_urls = @scraper.instance_variable_get(:@discovered_urls) | |
refute_includes discovered_urls, "https://example.com/style.css" | |
refute_includes discovered_urls, "https://example.com/image.jpg" | |
end | |
def test_ignores_special_link_types | |
html = <<~HTML | |
<html> | |
<body> | |
<a href="#section">Section Link</a> | |
<a href="javascript:void(0)">JS Link</a> | |
<a href="mailto:[email protected]">Email</a> | |
</body> | |
</html> | |
HTML | |
current_url = "https://example.com" | |
@scraper.instance_variable_set(:@url_queue, Async::Queue.new) | |
@scraper.send(:extract_and_queue_links, html, current_url) | |
discovered_urls = @scraper.instance_variable_get(:@discovered_urls) | |
refute_includes discovered_urls, "#section" | |
refute_includes discovered_urls, "javascript:void(0)" | |
refute_includes discovered_urls, "mailto:[email protected]" | |
end | |
end | |
class FileStateContainerTest < Minitest::Test | |
def setup | |
@file_path = "/tmp/test_scraper_state.json" | |
@container = FileStateContainer.new(@file_path) | |
@test_state = {"test" => "data", "number" => 42} | |
end | |
def teardown | |
File.delete(@file_path) if File.exist?(@file_path) | |
end | |
def test_writes_and_reads_state_correctly | |
@container.write_state(@test_state) | |
assert File.exist?(@file_path) | |
read_state = @container.read_state | |
assert_equal @test_state, read_state | |
end | |
def test_returns_nil_when_file_doesnt_exist | |
assert_nil @container.read_state | |
end | |
def test_handles_corrupted_json_gracefully | |
File.write(@file_path, "invalid json") | |
assert_nil @container.read_state | |
end | |
end | |
class AsyncMarkdownScraperTest < Minitest::Test | |
def setup | |
@urls = ["https://example.com"] | |
@markdown_scraper = AsyncMarkdownScraper.new(*@urls, max_concurrent: 1) | |
end | |
def test_creates_underlying_async_scraper | |
assert_instance_of AsyncScraper, @markdown_scraper.scraper | |
end | |
def test_passes_options_to_async_scraper | |
scraper = AsyncMarkdownScraper.new("https://example.com", mode: :spider, max_pages: 5) | |
underlying_scraper = scraper.scraper | |
assert_equal :spider, underlying_scraper.instance_variable_get(:@mode) | |
assert_equal 5, underlying_scraper.instance_variable_get(:@max_pages) | |
end | |
def test_extracts_frontmatter_correctly | |
html = <<~HTML | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<title>Test Page Title</title> | |
<meta name="description" content="Test page description"> | |
<meta name="keywords" content="test, page, example"> | |
<meta name="author" content="Test Author"> | |
<meta name="robots" content="index, follow"> | |
<link rel="canonical" href="https://example.com/canonical"> | |
</head> | |
<body> | |
<h1>Content</h1> | |
</body> | |
</html> | |
HTML | |
url = "https://example.com/test" | |
frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, html, url) | |
assert_includes frontmatter, "title: \"Test Page Title\"" | |
assert_includes frontmatter, "description: \"Test page description\"" | |
assert_includes frontmatter, "- \"test\"" | |
assert_includes frontmatter, "- \"page\"" | |
assert_includes frontmatter, "- \"example\"" | |
assert_includes frontmatter, "author: \"Test Author\"" | |
assert_includes frontmatter, "language: \"en\"" | |
assert_includes frontmatter, "canonical_url: \"https://example.com/canonical\"" | |
assert_includes frontmatter, "robots: \"index, follow\"" | |
assert_includes frontmatter, "url: \"https://example.com/test\"" | |
assert_includes frontmatter, "scraped_at:" | |
assert frontmatter.start_with?("---\n") | |
assert_includes frontmatter, "\n---\n" | |
end | |
def test_handles_missing_meta_tags_gracefully | |
minimal_html = "<html><head><title>Minimal</title></head><body></body></html>" | |
url = "https://example.com/test" | |
frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, minimal_html, url) | |
assert_includes frontmatter, "title: \"Minimal\"" | |
assert_includes frontmatter, "url: \"https://example.com/test\"" | |
refute_includes frontmatter, "description:" | |
refute_includes frontmatter, "keywords:" | |
end | |
def test_escapes_quotes_in_content | |
html_with_quotes = "<html><head><title>Title with \"quotes\"</title></head><body></body></html>" | |
url = "https://example.com/test" | |
frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, html_with_quotes, url) | |
assert_includes frontmatter, "title: \"Title with \\\"quotes\\\"\"" | |
end | |
def test_removes_unwanted_elements_and_converts_to_markdown | |
html = <<~HTML | |
<html> | |
<head> | |
<title>Test</title> | |
<script>alert("remove me");</script> | |
<style>body { color: red; }</style> | |
</head> | |
<body> | |
<nav>Navigation</nav> | |
<h1>Main Title</h1> | |
<p>Paragraph content</p> | |
<footer>Footer content</footer> | |
</body> | |
</html> | |
HTML | |
markdown = @markdown_scraper.send(:html_to_clean_markdown, html) | |
assert_includes markdown, "# Main Title" | |
assert_includes markdown, "Paragraph content" | |
refute_includes markdown, "alert(\"remove me\")" | |
refute_includes markdown, "color: red" | |
refute_includes markdown, "Navigation" | |
refute_includes markdown, "Footer content" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment