Created
March 24, 2023 12:44
-
-
Save alfuken/aa6ce9aa09639c9c8157dc5551515bb7 to your computer and use it in GitHub Desktop.
Fetching and processing undefined number of pages in a threaded manner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# using https://github.com/taganaka/easy_threadpool_rb | |
require "easy_threadpool" | |
Thread.abort_on_exception = true | |
class ThreadsafeCounter | |
def initialize | |
@counter = 0 | |
@mutex = Mutex.new | |
end | |
def increment = @mutex.synchronize { @counter += 1 } | |
def value = @counter | |
end | |
$enqueued = ThreadsafeCounter.new | |
$processed = ThreadsafeCounter.new | |
@page_batch_size = 10 | |
@pool = Thread.pool(@page_batch_size) | |
@data = [] | |
def get_and_parse_page_number(x) | |
# dummy code to simulate page retrieval. Returns `nil` on pages with no content | |
sleep rand | |
x <= 100 ? x.to_s : nil | |
end | |
def process_page(x) | |
# as soon as we've added the page to the queue, we increment the corresponding counter | |
$enqueued.increment | |
# `.process` adds the code block to the queue, which is then consumed by one of the threads | |
# see https://github.com/taganaka/easy_threadpool_rb/blob/656bfabdc86a317b98bd5f28185eda98235abfcc/lib/easy_threadpool/pool.rb#L53 | |
@pool.process do | |
puts("processing page ##{x}") | |
retries = 0 | |
begin | |
page_content = get_and_parse_page_number x | |
if page_content | |
@data << page_content | |
process_page(x + @page_batch_size) | |
end | |
# as soon as we've finished processing the page, we increment the corresponding counter | |
$processed.increment | |
rescue Exception => e | |
if retries < 10 | |
retries += 1 | |
sleep 10 | |
retry | |
else # give up and move on | |
process_page(x + @page_batch_size) | |
end | |
end | |
end | |
end | |
# enqueue first N pages | |
(1..@page_batch_size).to_a.each do |page_n| | |
process_page(page_n) | |
end | |
# wait for all the enqueued pages to finish processing | |
sleep 0.1 until $processed.value == $enqueued.value | |
@pool.shutdown | |
puts "Verifying if the threaded data resulted in expected data:" | |
puts ("1".."100").to_a.sort == @data.sort |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment