Created
June 9, 2021 10:07
-
-
Save kapcod/40d038bfe5227f692a9665dec5e4b036 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
# SimpleThreadPool gives a simple implementation for producer/consumers pattern | |
# Example: | |
# | |
# pool = SimpleThreadPool.new(10, max_queue: 5) do |user_params| | |
# Net::HTTP.post(UPDATE_API, user_params) | |
# end | |
# users.each do |user| | |
# pool << user.update_api_params | |
# end | |
# pool.close | |
class SimpleThreadPool | |
# Initiates the pool and starts `num_threads` working threads. | |
# `max_queue` option allows blocking main thread from adding new items to the pool queue | |
# if the consumers process it slower than the producer generates. | |
# The main goal of this option is to limit the memory usage in case the number or size of items | |
# will get big. `max_queue` can be as low as 1, but recommended to keep it at least equal to `num_threads`. | |
def initialize(num_threads, max_queue: nil) | |
@num_threads = num_threads | |
@queue = max_queue ? SizedQueue.new(max_queue) : Queue.new | |
@threads = @num_threads.times.map do | |
Thread.new do | |
while (item = @queue.pop) | |
yield item | |
end | |
end | |
end | |
end | |
# Adds item to the processing queue, in case `max_queue` was passed to `new`, it will block if the queue already | |
# contains this number of items. | |
# nil is reserved for closed queue detection and is not allowed to be pushed to the queue manually | |
def <<(item) | |
raise ArgumentError, "Cannot process 'nil' item" if item.nil? | |
@queue << item | |
end | |
# Will close the queue and wait until all threads will finish processing current items in the queue | |
# Any further attempt to push to the queue with fail with exeption | |
def close | |
@queue.close | |
@threads.each(&:join) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment