Skip to content

Instantly share code, notes, and snippets.

@julik
Forked from thedumbtechguy/async_scraper.rb
Created August 1, 2025 11:13
Show Gist options
  • Save julik/0439916bc8bc440bd5139a7dacdac2a8 to your computer and use it in GitHub Desktop.
Save julik/0439916bc8bc440bd5139a7dacdac2a8 to your computer and use it in GitHub Desktop.
require "bundler/inline"
gemfile do
source "https://rubygems.org"
gem "async"
gem "ferrum"
gem "state_machines"
gem "breaker_machines"
gem "concurrent-ruby"
gem "nokogiri"
gem "pandoc-ruby"
end
require "async"
require "async/queue"
require "async/semaphore"
require "ferrum"
require "uri"
require "json"
require "timeout"
require "state_machines"
require "breaker_machines"
# Use concurrent-ruby for thread-safe data structures and atomic operations
# Even though Async uses cooperative concurrency, this provides defensive programming
# and future-proofing in case threading is added later
require "concurrent"
require "fileutils"
require "digest"
require "nokogiri"
require "pandoc-ruby"
class ScrapedPage
attr_accessor :url, :html, :scraped_at, :error_message, :retry_count, :last_error_at
def initialize(url)
@url = url
@html = nil
@scraped_at = nil
@error_message = nil
@retry_count = 0
@last_error_at = nil
# Initialize state machine manually
self.state = "pending"
end
state_machine :state, initial: :pending do
event :start_processing do
transition pending: :processing, retrying: :processing
end
event :complete do
transition processing: :completed
end
event :mark_failed do
transition processing: :failed
end
event :retry_page do
transition failed: :retrying
end
event :give_up do
transition [:failed, :retrying] => :permanently_failed
end
state :pending do
def ready_for_processing?
true
end
end
state :processing do
def ready_for_processing?
false
end
end
state :completed do
def ready_for_processing?
false
end
def success?
true
end
end
state :failed do
def ready_for_processing?
can_retry?
end
def can_retry?
retry_count < 3 && (last_error_at.nil? || Time.now - last_error_at > backoff_delay)
end
def backoff_delay
# Exponential backoff: 30s, 60s, 120s
30 * (2**retry_count)
end
end
state :retrying do
def ready_for_processing?
true
end
end
state :permanently_failed do
def ready_for_processing?
false
end
def success?
false
end
end
end
def record_error(error)
@error_message = error.message
@last_error_at = Time.now
@retry_count += 1
end
def record_success(html)
@html = html
@scraped_at = Time.now
@error_message = nil
end
def to_h
{
url: @url,
html: @html,
scraped_at: @scraped_at,
state: state,
error_message: @error_message,
retry_count: @retry_count,
last_error_at: @last_error_at
}
end
def to_state_h
{
url: @url,
scraped_at: @scraped_at,
state: state,
error_message: @error_message,
retry_count: @retry_count,
last_error_at: @last_error_at
}
end
def self.from_h(data)
page = new(data["url"])
page.html = data["html"]
page.scraped_at = data["scraped_at"]
page.error_message = data["error_message"]
page.retry_count = data["retry_count"] || 0
page.last_error_at = data["last_error_at"]
# Restore state
target_state = data["state"]&.to_sym || :pending
case target_state
when :processing then page.start_processing
when :completed then page.start_processing && page.complete
when :failed then page.start_processing && page.mark_failed
when :retrying then page.start_processing && page.mark_failed && page.retry_page
when :permanently_failed then page.start_processing && page.mark_failed && page.give_up
end
page
end
end
class AsyncScraper
include BreakerMachines::DSL
attr_reader :browser, :claimed_urls, :completed_urls, :url_queue, :results, :pages
# Content size limit - 10MB default
MAX_CONTENT_SIZE = 10_000_000
def initialize(*urls, mode: :scrape, max_concurrent: 5, timeout: 10, state_container: nil, max_pages: nil, max_content_size: MAX_CONTENT_SIZE)
raise ArgumentError, "urls must be provided" if !urls || urls.empty?
raise ArgumentError, "mode must be :scrape or :spider" unless [:scrape, :spider].include?(mode)
@initial_urls = urls
@mode = mode
@max_concurrent = max_concurrent
@timeout = timeout
@state_container = state_container
@max_pages = max_pages
@max_content_size = max_content_size
@claimed_urls = Concurrent::Set.new # URLs currently being processed
@completed_urls = Concurrent::Set.new # URLs that have been processed (success or permanent failure)
@discovered_urls = Concurrent::Set.new # All URLs discovered during crawling
@results = Concurrent::Array.new
@pages = Concurrent::Hash.new # URL -> ScrapedPage mapping
@semaphore = Async::Semaphore.new(max_concurrent)
@pages_scraped = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for scraped pages
@active_jobs = Concurrent::AtomicFixnum.new(0) # Thread-safe counter for active jobs
@work_available = true # Flag to signal workers when to stop
# Initialize state machine manually
self.state = "idle"
load_state if @state_container
end
state_machine :state, initial: :idle do
event :start do
transition idle: :running, stopped: :running, error: :running
end
event :pause do
transition running: :paused
end
event :resume do
transition paused: :running
end
event :stop do
transition [:running, :paused, :error] => :stopped
end
event :error_occurred do
transition [:running, :paused] => :error
end
event :reset do
transition any => :idle
end
state :idle do
def can_start?
true
end
end
state :running do
def active?
true
end
def can_process_pages?
true
end
end
state :paused do
def active?
true
end
def can_process_pages?
false
end
end
state :stopped do
def active?
false
end
def can_process_pages?
false
end
end
state :error do
def active?
false
end
def can_process_pages?
false
end
end
end
# Define circuit breaker for page scraping
circuit :page_scraping do
threshold failures: 5, within: 1.minute # Trip after 5 failures in 1 minute
reset_after 10.seconds # Wait 10 seconds before trying again
fallback { nil } # Return nil when circuit is open
# Additional configuration for robustness
on_open { |circuit| puts "Circuit breaker opened for page scraping - too many failures" }
on_close { |circuit| puts "Circuit breaker closed for page scraping - service recovered" }
on_half_open { |circuit| puts "Circuit breaker half-open for page scraping - testing recovery" }
end
def run(&block)
start # Transition to running state
Sync(annotation: "AsyncScraper#run") do |task|
# Start browser
@browser = Ferrum::Browser.new(
headless: true,
timeout: @timeout,
pending_connection_errors: false
)
begin
@url_queue = Async::Queue.new
# Add initial URLs to queue if not resuming
if @discovered_urls.empty?
@initial_urls.each { |url| @discovered_urls.add(url) }
end
# Count URLs to process
urls_to_process = @discovered_urls.reject { |url| @completed_urls.include?(url) }
retryable_pages = @pages.values.select { |page| page.ready_for_processing? }
# Push URLs to queue
urls_to_process.each { |url| @url_queue.push(url) }
retryable_pages.each do |page|
@url_queue.push(page.url)
@discovered_urls.add(page.url)
end
# If no work to do, complete immediately
total_queued = urls_to_process.size + retryable_pages.size
if total_queued == 0
puts "No URLs to process - all work completed"
return @results
end
# Start orchestrator in background
orchestrator = Async do
orchestrate_work(&block)
end
# Start worker pool using simplified worker loop
workers = @max_concurrent.times.map do |worker_id|
Async do
worker_loop(worker_id, &block)
end
end
# Wait for orchestrator to signal completion with timeout
begin
orchestrator.wait
rescue => e
puts "Orchestrator error: #{e.message}"
end
# Ensure work is stopped
@work_available = false
# Send stop signals to all workers
@max_concurrent.times {
begin
@url_queue.push(nil)
rescue
# Queue might be closed
end
}
# Wait for all workers to complete with individual timeouts
workers.each_with_index do |worker, idx|
Timeout.timeout(2) do # Reduced to 2 seconds
worker.wait
end
rescue Timeout::Error
puts "Worker #{idx} timed out, force stopping"
begin
worker.stop
rescue
nil
end
end
# Ensure all async tasks are completed
if @active_jobs.value > 0
puts "Waiting for #{@active_jobs.value} active jobs to complete..."
start_time = Time.now
while @active_jobs.value > 0 && (Time.now - start_time) < 2 # Reduced to 2 seconds
sleep(0.1)
end
if @active_jobs.value > 0
puts "Forcibly stopping #{@active_jobs.value} remaining jobs"
# Note: We can't force reset atomic counter, jobs should clean up
end
end
ensure
save_state if @state_container
# Force close the queue and drain any remaining items
begin
@url_queue&.close
while @url_queue&.pop(timeout: 0.1); end
rescue
# Ignore errors during cleanup
end
@browser&.quit
stop # Transition to stopped state
end
@results
end
end
private
# Orchestrator - monitors work and decides when to stop
def orchestrate_work(&block)
loop do
sleep(0.1) # Check every 100ms for faster response
# Check if we should stop
if should_stop_all_work?
puts "Orchestrator: Stopping all work"
@work_available = false
# Clear the queue to prevent workers from picking up more work
begin
while @url_queue.any?
@url_queue.pop
end
rescue
# Queue might be closed
end
break
end
end
end
# Check if all work is complete
def should_stop_all_work?
# Stop if we've hit page limit and no active jobs
if max_pages_reached?
return @active_jobs.value == 0
end
# Stop if all pages are processed and no active jobs
return true if @active_jobs.value == 0 && @url_queue.empty? && should_close_queue?
# Otherwise keep going
false
rescue
true # Stop on any error
end
# Simplified worker loop
def worker_loop(worker_id, &block)
while @work_available
# Check page limit before getting URL
if max_pages_reached?
puts "Worker #{worker_id}: Page limit reached, stopping"
break
end
url = @url_queue.pop
break if url.nil? || url == @stop_signal # Stop signal
# Skip if already completed
if @completed_urls.include?(url)
puts "Worker #{worker_id}: Skipping #{url} (already completed)"
next
end
# Try to claim this URL for processing - atomic operation
unless @claimed_urls.add?(url)
puts "Worker #{worker_id}: Another worker claimed #{url}"
next
end
# Double-check page limit before processing
if max_pages_reached?
puts "Worker #{worker_id}: Page limit reached after claiming URL"
break
end
# Get or create page object
page = @pages[url] ||= ScrapedPage.new(url)
next unless page.ready_for_processing?
# Process the page with semaphore
@semaphore.async do
@active_jobs.increment
begin
# Final check inside semaphore
if !max_pages_reached?
scrape_page(page, worker_id, &block)
else
puts "Worker #{worker_id}: Skipping page due to limit"
end
ensure
@active_jobs.decrement
end
end
end
rescue => e
puts "Worker #{worker_id} error: #{e.message}"
end
def scrape_page(page_obj, worker_id, &block)
url = page_obj.url
# Transition page to processing state
page_obj.start_processing
# Note: URL already added to visited_urls in worker_loop
# Use circuit breaker for page scraping
result = circuit(:page_scraping).wrap do
browser_page = @browser.create_page
browser_page.go_to(url)
# Wait for page to load
browser_page.network.wait_for_idle
# Scroll to bottom to trigger lazy loading
browser_page.execute("window.scrollTo(0, document.body.scrollHeight)")
sleep(0.5) # Brief pause for content to load
# Scroll back to top for consistency
browser_page.execute("window.scrollTo(0, 0)")
sleep(0.2) # Brief pause
# Get page HTML
html = browser_page.body
# Check content size limit
if html.bytesize > @max_content_size
raise StandardError.new("Content size exceeds limit: #{html.bytesize} bytes > #{@max_content_size} bytes")
end
# Record success and transition state
page_obj.record_success(html)
page_obj.complete
# Move from claimed to completed
@claimed_urls.delete(url)
@completed_urls.add(url)
# Increment pages scraped counter
@pages_scraped.increment
# Create result
result_data = {
url: url,
html: html,
scraped_at: page_obj.scraped_at
}
# Yield to block if provided (for Rails integration)
yield(result_data) if block_given?
@results << result_data
puts "Worker #{worker_id}: Scraped #{url} (#{@pages_scraped.value}#{@max_pages ? "/#{@max_pages}" : ""}) [#{page_obj.state}]"
# If spider mode and haven't hit limit, find and queue new URLs
if @mode == :spider && !max_pages_reached?
extract_and_queue_links(html, url)
end
result_data
ensure
browser_page&.close
end
# Handle circuit breaker result
if result.nil?
# Circuit breaker is open or call failed
page_obj.record_error(StandardError.new("Circuit breaker open or request failed"))
page_obj.mark_failed
puts "Failed to scrape #{url}: Circuit breaker open or error occurred [#{page_obj.state}]"
# Check if page can be retried
if page_obj.failed? && page_obj.can_retry?
page_obj.retry_page
# Release the claim so it can be retried
@claimed_urls.delete(url)
@url_queue.push(url) # Re-queue for retry
puts "Re-queued #{url} for retry (attempt #{page_obj.retry_count + 1}) [#{page_obj.state}]"
elsif page_obj.retry_count >= 3
page_obj.give_up
# Move from claimed to completed (permanently failed)
@claimed_urls.delete(url)
@completed_urls.add(url)
puts "Giving up on #{url} after #{page_obj.retry_count} attempts [#{page_obj.state}]"
end
end
# Save state periodically
save_state if @state_container && @completed_urls.size % 10 == 0
rescue => e
# Handle unexpected errors
page_obj.record_error(e)
page_obj.mark_failed
puts "Unexpected error scraping #{url}: #{e.message} [#{page_obj.state}]"
# Retry logic for unexpected errors
if page_obj.can_retry?
page_obj.retry_page
# Release the claim so it can be retried
@claimed_urls.delete(url)
@url_queue.push(url)
puts "Re-queued #{url} for retry after error [#{page_obj.state}]"
else
page_obj.give_up
# Move from claimed to completed (permanently failed)
@claimed_urls.delete(url)
@completed_urls.add(url)
puts "Giving up on #{url} after errors [#{page_obj.state}]"
end
end
def extract_and_queue_links(html, current_url)
current_uri = URI.parse(current_url)
# Use Nokogiri for better performance and accuracy
doc = Nokogiri::HTML(html)
links = doc.css("a[href]").map { |anchor| anchor["href"] }.compact
links.each do |href|
next if href.empty? || href.start_with?("#", "javascript:", "mailto:", "tel:", "ftp:")
# Convert to absolute URL
absolute_url = URI.join(current_url, href).to_s
# Remove fragment identifier (#comment-XX, #section, etc.) - treat as same page
clean_url = absolute_url.split("#").first
# Check if URL is within the same host as current page
uri = URI.parse(clean_url)
next unless uri.host == current_uri.host
# Skip asset files (CSS, JS, images, fonts, etc.)
path = uri.path.downcase
next if path.match?(/\.(css|js|png|jpg|jpeg|gif|svg|ico|pdf|zip|woff|woff2|ttf|eot|mp4|mp3|avi|mov)(\?|$)/)
# Add to discovered set and queue only if it's a new URL
if @discovered_urls.add?(clean_url) # Only add to queue if newly discovered
@url_queue.push(clean_url)
end
end
rescue => e
puts "Error extracting links from #{current_url}: #{e.message}"
end
def max_pages_reached?
@max_pages && @pages_scraped.value >= @max_pages
end
def should_close_queue?
# Check if there are any pages that can still be retried
has_retryable_pages = @pages.values.any? { |page| page.ready_for_processing? }
if @host
# Spider mode: close when no more URLs can be discovered
# This happens when all pages are either completed or permanently failed
# and no pages are ready for retry
all_pages_processed = @pages.values.all? { |page| page.completed? || page.permanently_failed? }
all_pages_processed && !has_retryable_pages
else
# Constrained mode: close when all initial URLs have been processed
all_initial_processed = @initial_urls.all? do |url|
page = @pages[url]
page && (page.completed? || page.permanently_failed?)
end
all_initial_processed && !has_retryable_pages
end
end
def save_state
return unless @state_container
state = {
host: @host,
initial_urls: @initial_urls,
completed_urls: @completed_urls.to_a,
discovered_urls: @discovered_urls.to_a,
max_concurrent: @max_concurrent,
timeout: @timeout,
max_pages: @max_pages,
pages_scraped: @pages_scraped.value,
scraper_state: self.state,
pages: @pages.transform_values(&:to_state_h)
}
@state_container.write_state(state)
end
def load_state
return unless @state_container
state = @state_container.read_state
return unless state
@completed_urls = Concurrent::Set.new(state["completed_urls"] || [])
# Backward compatibility: check for both new and old key names
discovered_urls_data = state["discovered_urls"] || state["pending_urls"] || []
@discovered_urls = Concurrent::Set.new(discovered_urls_data)
@pages_scraped = Concurrent::AtomicFixnum.new(state["pages_scraped"] || 0)
# Restore page states
if state["pages"]
@pages = state["pages"].transform_values { |page_data| ScrapedPage.from_h(page_data) }
end
# Note: scraper state will be restored when run() is called
completed_pages = @pages.values.count { |p| p.completed? }
failed_pages = @pages.values.count { |p| p.permanently_failed? }
retry_pages = @pages.values.count { |p| p.retrying? }
puts "Resuming with #{@completed_urls.size} completed, #{@discovered_urls.size} discovered URLs, #{@pages_scraped.value} pages scraped"
puts "Page states: #{completed_pages} completed, #{failed_pages} failed, #{retry_pages} retrying"
end
end
class AsyncMarkdownScraper
attr_reader :scraper
def initialize(*urls, **scraper_options)
@scraper = AsyncScraper.new(*urls, **scraper_options)
end
def run(&block)
@scraper.run do |page_data|
# Convert to markdown with frontmatter
markdown_data = process_page(page_data)
# Yield the processed data to the block
yield(markdown_data) if block_given?
end
end
private
def process_page(page_data)
html = page_data[:html]
url = page_data[:url]
puts "Processing: #{url} (#{html.length} chars)"
# Extract frontmatter
frontmatter = extract_meta_frontmatter(html, url)
puts " → Extracted frontmatter"
# Convert HTML to clean markdown
markdown_content = html_to_clean_markdown(html)
puts " → Converted to markdown (#{markdown_content.length} chars)"
# Add markdown and frontmatter keys to the original page data
page_data.merge(markdown: markdown_content, frontmatter: frontmatter)
end
# Helper method to extract meta information as YAML front matter
def extract_meta_frontmatter(html, url)
doc = Nokogiri::HTML(html)
frontmatter = ["---"]
# Extract title
title = doc.at("title")&.text&.strip
frontmatter << "title: \"#{title.gsub('"', '\\"')}\"" if title && !title.empty?
# Extract meta description
description = doc.at('meta[name="description"]')&.[]("content")&.strip
frontmatter << "description: \"#{description.gsub('"', '\\"')}\"" if description && !description.empty?
# Extract meta keywords
keywords = doc.at('meta[name="keywords"]')&.[]("content")&.strip
if keywords && !keywords.empty?
keywords_array = keywords.split(",").map(&:strip).reject(&:empty?)
frontmatter << "keywords:"
keywords_array.each { |keyword| frontmatter << " - \"#{keyword.gsub('"', '\\"')}\"" }
end
# Extract author information
author = doc.at('meta[name="author"]')&.[]("content")&.strip
frontmatter << "author: \"#{author.gsub('"', '\\"')}\"" if author && !author.empty?
# Extract language
lang = doc.at("html")&.[]("lang") || doc.at('meta[http-equiv="content-language"]')&.[]("content")
frontmatter << "language: \"#{lang}\"" if lang && !lang.empty?
# Extract canonical URL
canonical = doc.at('link[rel="canonical"]')&.[]("href")&.strip
frontmatter << "canonical_url: \"#{canonical}\"" if canonical && !canonical.empty?
# Extract robots meta
robots = doc.at('meta[name="robots"]')&.[]("content")&.strip
frontmatter << "robots: \"#{robots}\"" if robots && !robots.empty?
# Add URL for reference
frontmatter << "url: \"#{url}\""
# Add scraped timestamp
frontmatter << "scraped_at: \"#{Time.now.iso8601}\""
frontmatter << "---"
frontmatter << ""
frontmatter.join("\n")
end
# Helper method to convert HTML to clean markdown
def html_to_clean_markdown(html)
doc = Nokogiri::HTML(html)
# Remove UI and navigation elements first
doc.css("script, style, svg, noscript, iframe, object, embed, applet, form, nav, footer").remove
doc.css("[data-testid*='nav'], [data-testid*='footer']").remove
doc.xpath("//comment()").remove
# Use pandoc with HTML reader options to flatten nested divs and spans
PandocRuby.convert(
doc.to_html,
:from => "html-native_divs-native_spans-empty_paragraphs",
:to => "markdown-raw_html-auto_identifiers",
"wrap" => "none"
)
end
end
# Example state containers
class FileStateContainer
def initialize(file_path)
@file_path = file_path
end
def read_state
return nil unless File.exist?(@file_path)
JSON.parse(File.read(@file_path))
rescue JSON::ParserError
nil
end
def write_state(state)
File.write(@file_path, JSON.pretty_generate(state))
puts "State saved to #{@file_path}"
end
end
class RedisStateContainer
def initialize(redis_client, key)
@redis = redis_client
@key = key
end
def read_state
json = @redis.get(@key)
json ? JSON.parse(json) : nil
rescue JSON::ParserError
nil
end
def write_state(state)
@redis.set(@key, JSON.generate(state))
puts "State saved to Redis key: #{@key}"
end
end
class RailsModelStateContainer
def initialize(model_instance)
@model = model_instance
end
def read_state
return nil unless @model.scraper_state.present?
JSON.parse(@model.scraper_state)
rescue JSON::ParserError
nil
end
def write_state(state)
@model.update!(scraper_state: JSON.generate(state))
puts "State saved to model ID: #{@model.id}"
end
end
# Example usage demonstrating proper resource management:
if __FILE__ == $0
# Helper method that scopes resources properly (like an application would)
def run_scraping_example(name, &block)
puts "=== #{name} ==="
begin
block.call
puts "✓ #{name} completed successfully"
rescue => e
puts "✗ #{name} failed: #{e.message}"
ensure
# Force cleanup of any lingering resources
ObjectSpace.each_object(Ferrum::Browser) { |browser|
begin
browser.quit
rescue
nil
end
}
ObjectSpace.each_object(Async::Task) { |task|
begin
task.stop
rescue
nil
end
}
GC.start
sleep(0.1) # Brief pause for cleanup
end
puts
end
# Example 1: Basic scraping of specific URLs
run_scraping_example("Basic Scraping Example") do
results = []
AsyncScraper.new(
"https://radioactive-labs.github.io/plutonium-core/",
"https://radioactive-labs.github.io/plutonium-core/guide/",
max_concurrent: 2,
max_pages: 2
).run do |page_data|
results << page_data
puts "Scraped: #{page_data[:url]} (#{page_data[:html].length} chars)"
end
puts "Processed #{results.size} pages"
end
# Example 2: Spider mode with markdown conversion using AsyncMarkdownScraper
run_scraping_example("Spider Mode with AsyncMarkdownScraper Example") do
state_container = FileStateContainer.new("scraper_state.json")
output_folder = "scraped_pages"
# Create output folder
FileUtils.mkdir_p(output_folder)
# Helper method to generate filename from URL
def generate_filename(url)
uri = URI.parse(url)
# Build filename from URL components
parts = []
parts << uri.host.gsub(/[^a-z0-9\-_]/i, "-") if uri.host
if uri.path && uri.path != "/"
path_part = uri.path.gsub(/[^a-z0-9\-_]/i, "-").squeeze("-")
parts << path_part unless path_part.empty?
else
parts << "index"
end
filename = parts.join("-").gsub(/^-|-$/, "")
filename = "page" if filename.empty?
"#{filename}.md"
end
AsyncMarkdownScraper.new(
"https://radioactive-labs.github.io/plutonium-core/",
mode: :spider,
max_concurrent: 3,
max_pages: 5,
state_container: state_container,
max_content_size: 5_000_000
).run do |page_data|
# Combine frontmatter and markdown
content = page_data[:frontmatter] + page_data[:markdown]
# Generate filename and save
filename = generate_filename(page_data[:url])
file_path = File.join(output_folder, filename)
File.write(file_path, content)
puts "Saved: #{page_data[:url]}#{filename}"
end
puts "All pages saved to: #{output_folder}/"
end
# Example 3: Resumable scraping with state persistence
run_scraping_example("Resumable Scraping Example") do
state_container = FileStateContainer.new("resumable_scraper_state.json")
AsyncScraper.new(
"https://radioactive-labs.github.io/plutonium-core/",
mode: :spider,
max_concurrent: 2,
max_pages: 3,
state_container: state_container
).run do |page_data|
puts "Processed: #{page_data[:url]}"
end
puts "State saved - can resume later by running the same command"
end
puts "🎉 All examples completed!"
puts "💡 Notice: Each scraper was scoped to its own block"
puts "🔄 Resources automatically cleaned up between examples"
# Clean exit (like application shutdown)
exit(0)
end
require "bundler/inline"
gemfile do
source "https://rubygems.org"
gem "minitest"
gem "async"
gem "ferrum"
gem "state_machines"
gem "breaker_machines"
gem "concurrent-ruby"
gem "nokogiri"
gem "pandoc-ruby"
gem "webmock"
gem "timecop"
end
require "minitest/autorun"
require "minitest/pride"
require "webmock/minitest"
require "timecop"
require_relative "async_scraper"
class ScrapedPageTest < Minitest::Test
def setup
@url = "https://example.com"
@page = ScrapedPage.new(@url)
end
def test_initialize
assert_equal @url, @page.url
assert_nil @page.html
assert_nil @page.scraped_at
assert_nil @page.error_message
assert_equal 0, @page.retry_count
assert_nil @page.last_error_at
assert_equal "pending", @page.state
end
def test_state_machine_starts_in_pending_state
assert @page.pending?
assert @page.ready_for_processing?
end
def test_transitions_from_pending_to_processing
@page.start_processing
assert @page.processing?
refute @page.ready_for_processing?
end
def test_transitions_from_processing_to_completed
@page.start_processing
@page.complete
assert @page.completed?
assert @page.success?
refute @page.ready_for_processing?
end
def test_transitions_from_processing_to_failed
@page.start_processing
@page.mark_failed
assert @page.failed?
assert @page.can_retry?
assert @page.ready_for_processing?
end
def test_transitions_from_failed_to_retrying
@page.start_processing
@page.mark_failed
@page.retry_page
assert @page.retrying?
assert @page.ready_for_processing?
end
def test_transitions_to_permanently_failed_after_giving_up
@page.start_processing
@page.mark_failed
@page.give_up
assert @page.permanently_failed?
refute @page.success?
refute @page.ready_for_processing?
end
def test_record_error
error = StandardError.new("Test error")
Timecop.freeze do
@page.record_error(error)
assert_equal "Test error", @page.error_message
assert_equal Time.now, @page.last_error_at
assert_equal 1, @page.retry_count
end
end
def test_record_success
html = "<html><body>Test</body></html>"
Timecop.freeze do
@page.record_success(html)
assert_equal html, @page.html
assert_equal Time.now, @page.scraped_at
assert_nil @page.error_message
end
end
def test_can_retry_when_count_less_than_3
@page.start_processing
@page.mark_failed
assert @page.can_retry?
end
def test_prevents_retry_when_count_reaches_3
@page.start_processing
@page.mark_failed
@page.instance_variable_set(:@retry_count, 3)
refute @page.can_retry?
end
def test_respects_backoff_delay
@page.start_processing
@page.mark_failed
Timecop.freeze do
@page.record_error(StandardError.new("Error"))
refute @page.can_retry?
# After first error, retry_count is 1, so backoff is 30 * (2**1) = 60 seconds
Timecop.travel(61) # Past the 60 second backoff
assert @page.can_retry?
end
end
def test_implements_exponential_backoff
@page.start_processing
@page.mark_failed
assert_equal 30, @page.send(:backoff_delay)
@page.instance_variable_set(:@retry_count, 1)
assert_equal 60, @page.send(:backoff_delay)
@page.instance_variable_set(:@retry_count, 2)
assert_equal 120, @page.send(:backoff_delay)
end
def test_recreates_page_from_hash_data
data = {
"url" => @url,
"html" => "<html>test</html>",
"scraped_at" => Time.now,
"state" => "completed",
"error_message" => nil,
"retry_count" => 0,
"last_error_at" => nil
}
restored_page = ScrapedPage.from_h(data)
assert_equal @url, restored_page.url
assert_equal "<html>test</html>", restored_page.html
assert restored_page.completed?
end
end
class AsyncScraperTest < Minitest::Test
def setup
@urls = ["https://example.com", "https://example.com/page2"]
@scraper = AsyncScraper.new(*@urls, max_concurrent: 2, timeout: 5)
end
def test_sets_configuration_correctly
assert_equal @urls, @scraper.instance_variable_get(:@initial_urls)
assert_equal 2, @scraper.instance_variable_get(:@max_concurrent)
assert_equal 5, @scraper.instance_variable_get(:@timeout)
assert_equal "idle", @scraper.state
end
def test_raises_error_for_empty_urls
assert_raises(ArgumentError) { AsyncScraper.new }
end
def test_raises_error_for_invalid_mode
assert_raises(ArgumentError) { AsyncScraper.new("https://example.com", mode: :invalid) }
end
def test_accepts_spider_mode
spider_scraper = AsyncScraper.new("https://example.com", mode: :spider)
assert_equal :spider, spider_scraper.instance_variable_get(:@mode)
end
def test_sets_default_values
default_scraper = AsyncScraper.new("https://example.com")
assert_equal :scrape, default_scraper.instance_variable_get(:@mode)
assert_equal 5, default_scraper.instance_variable_get(:@max_concurrent)
assert_equal 10, default_scraper.instance_variable_get(:@timeout)
assert_equal AsyncScraper::MAX_CONTENT_SIZE, default_scraper.instance_variable_get(:@max_content_size)
end
def test_starts_in_idle_state
assert @scraper.idle?
assert @scraper.can_start?
end
def test_transitions_to_running_when_started
@scraper.start
assert @scraper.running?
assert @scraper.active?
assert @scraper.can_process_pages?
end
def test_can_pause_and_resume
@scraper.start
@scraper.pause
assert @scraper.paused?
assert @scraper.active?
refute @scraper.can_process_pages?
@scraper.resume
assert @scraper.running?
assert @scraper.can_process_pages?
end
def test_can_stop_from_any_active_state
@scraper.start
@scraper.stop
assert @scraper.stopped?
refute @scraper.active?
refute @scraper.can_process_pages?
end
def test_handles_error_state
@scraper.start
@scraper.error_occurred
assert @scraper.error?
refute @scraper.active?
refute @scraper.can_process_pages?
end
def test_can_reset_to_idle
@scraper.start
@scraper.stop
@scraper.reset
assert @scraper.idle?
end
def test_returns_false_when_no_max_pages_set
refute @scraper.send(:max_pages_reached?)
end
def test_returns_true_when_max_pages_reached
limited_scraper = AsyncScraper.new("https://example.com", max_pages: 2)
limited_scraper.instance_variable_get(:@pages_scraped).increment
limited_scraper.instance_variable_get(:@pages_scraped).increment
assert limited_scraper.send(:max_pages_reached?)
end
def test_extracts_and_queues_same_host_links_only
html = <<~HTML
<html>
<body>
<a href="/page1">Page 1</a>
<a href="https://example.com/page2">Page 2</a>
<a href="https://other-site.com/page">Other Site</a>
</body>
</html>
HTML
current_url = "https://example.com"
@scraper.instance_variable_set(:@url_queue, Async::Queue.new)
@scraper.send(:extract_and_queue_links, html, current_url)
discovered_urls = @scraper.instance_variable_get(:@discovered_urls)
assert_includes discovered_urls, "https://example.com/page1"
refute_includes discovered_urls, "https://other-site.com/page"
end
def test_filters_out_non_content_urls
html = <<~HTML
<html>
<body>
<a href="/style.css">CSS File</a>
<a href="/image.jpg">Image</a>
</body>
</html>
HTML
current_url = "https://example.com"
@scraper.instance_variable_set(:@url_queue, Async::Queue.new)
@scraper.send(:extract_and_queue_links, html, current_url)
discovered_urls = @scraper.instance_variable_get(:@discovered_urls)
refute_includes discovered_urls, "https://example.com/style.css"
refute_includes discovered_urls, "https://example.com/image.jpg"
end
def test_ignores_special_link_types
html = <<~HTML
<html>
<body>
<a href="#section">Section Link</a>
<a href="javascript:void(0)">JS Link</a>
<a href="mailto:[email protected]">Email</a>
</body>
</html>
HTML
current_url = "https://example.com"
@scraper.instance_variable_set(:@url_queue, Async::Queue.new)
@scraper.send(:extract_and_queue_links, html, current_url)
discovered_urls = @scraper.instance_variable_get(:@discovered_urls)
refute_includes discovered_urls, "#section"
refute_includes discovered_urls, "javascript:void(0)"
refute_includes discovered_urls, "mailto:[email protected]"
end
end
class FileStateContainerTest < Minitest::Test
def setup
@file_path = "/tmp/test_scraper_state.json"
@container = FileStateContainer.new(@file_path)
@test_state = {"test" => "data", "number" => 42}
end
def teardown
File.delete(@file_path) if File.exist?(@file_path)
end
def test_writes_and_reads_state_correctly
@container.write_state(@test_state)
assert File.exist?(@file_path)
read_state = @container.read_state
assert_equal @test_state, read_state
end
def test_returns_nil_when_file_doesnt_exist
assert_nil @container.read_state
end
def test_handles_corrupted_json_gracefully
File.write(@file_path, "invalid json")
assert_nil @container.read_state
end
end
class AsyncMarkdownScraperTest < Minitest::Test
def setup
@urls = ["https://example.com"]
@markdown_scraper = AsyncMarkdownScraper.new(*@urls, max_concurrent: 1)
end
def test_creates_underlying_async_scraper
assert_instance_of AsyncScraper, @markdown_scraper.scraper
end
def test_passes_options_to_async_scraper
scraper = AsyncMarkdownScraper.new("https://example.com", mode: :spider, max_pages: 5)
underlying_scraper = scraper.scraper
assert_equal :spider, underlying_scraper.instance_variable_get(:@mode)
assert_equal 5, underlying_scraper.instance_variable_get(:@max_pages)
end
def test_extracts_frontmatter_correctly
html = <<~HTML
<!DOCTYPE html>
<html lang="en">
<head>
<title>Test Page Title</title>
<meta name="description" content="Test page description">
<meta name="keywords" content="test, page, example">
<meta name="author" content="Test Author">
<meta name="robots" content="index, follow">
<link rel="canonical" href="https://example.com/canonical">
</head>
<body>
<h1>Content</h1>
</body>
</html>
HTML
url = "https://example.com/test"
frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, html, url)
assert_includes frontmatter, "title: \"Test Page Title\""
assert_includes frontmatter, "description: \"Test page description\""
assert_includes frontmatter, "- \"test\""
assert_includes frontmatter, "- \"page\""
assert_includes frontmatter, "- \"example\""
assert_includes frontmatter, "author: \"Test Author\""
assert_includes frontmatter, "language: \"en\""
assert_includes frontmatter, "canonical_url: \"https://example.com/canonical\""
assert_includes frontmatter, "robots: \"index, follow\""
assert_includes frontmatter, "url: \"https://example.com/test\""
assert_includes frontmatter, "scraped_at:"
assert frontmatter.start_with?("---\n")
assert_includes frontmatter, "\n---\n"
end
def test_handles_missing_meta_tags_gracefully
minimal_html = "<html><head><title>Minimal</title></head><body></body></html>"
url = "https://example.com/test"
frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, minimal_html, url)
assert_includes frontmatter, "title: \"Minimal\""
assert_includes frontmatter, "url: \"https://example.com/test\""
refute_includes frontmatter, "description:"
refute_includes frontmatter, "keywords:"
end
def test_escapes_quotes_in_content
html_with_quotes = "<html><head><title>Title with \"quotes\"</title></head><body></body></html>"
url = "https://example.com/test"
frontmatter = @markdown_scraper.send(:extract_meta_frontmatter, html_with_quotes, url)
assert_includes frontmatter, "title: \"Title with \\\"quotes\\\"\""
end
def test_removes_unwanted_elements_and_converts_to_markdown
html = <<~HTML
<html>
<head>
<title>Test</title>
<script>alert("remove me");</script>
<style>body { color: red; }</style>
</head>
<body>
<nav>Navigation</nav>
<h1>Main Title</h1>
<p>Paragraph content</p>
<footer>Footer content</footer>
</body>
</html>
HTML
markdown = @markdown_scraper.send(:html_to_clean_markdown, html)
assert_includes markdown, "# Main Title"
assert_includes markdown, "Paragraph content"
refute_includes markdown, "alert(\"remove me\")"
refute_includes markdown, "color: red"
refute_includes markdown, "Navigation"
refute_includes markdown, "Footer content"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment