Skip to content

Instantly share code, notes, and snippets.

@mrk21
Last active October 1, 2024 13:24
Show Gist options
  • Save mrk21/8179c0252547cbd2dd6924885073b8a2 to your computer and use it in GitHub Desktop.
Save mrk21/8179c0252547cbd2dd6924885073b8a2 to your computer and use it in GitHub Desktop.
Simple thread pool for Ruby
class ThreadPool
def initialize(n)
@n = n
@queue = Thread::SizedQueue.new(@n)
@workers = []
end
def shutdown
@queue.close
errors = []
@workers.each do |worker|
worker.join
rescue => e
errors << e
end
raise errors.first unless errors.blank?
end
def post(&task)
return false if @queue.closed?
boot_worker
@queue.push(task)
return true
end
private def boot_worker
return if @workers.size >= @n
@workers << Thread.new { run_worker }
end
private def run_worker
while task = @queue.pop
begin
task.call
rescue
@queue.clear
@queue.close
raise
end
end
end
end
require_relative './01_thread_pool'
require 'logger'
logger = Logger.new($stdout)
logger.info '> starting...'
thread_pool = ThreadPool.new(5)
30.times.each do |i|
logger.info "> queue size: #{thread_pool.size}: `#{i}` pushing..."
thread_pool.post do
sleep 1
logger.info i
sleep 1
end
logger.info "> queue size: #{thread_pool.size}: `#{i}` pushed"
end
logger.info '> stopping...'
thread_pool.shutdown
logger.info '> stopped!'
=begin
I, [2024-04-19T09:23:00.742595 #23681] INFO -- : > starting...
I, [2024-04-19T09:23:00.742966 #23681] INFO -- : > queue size: 0: `0` pushing...
I, [2024-04-19T09:23:00.743175 #23681] INFO -- : > queue size: 1: `0` pushed
I, [2024-04-19T09:23:00.743225 #23681] INFO -- : > queue size: 0: `1` pushing...
I, [2024-04-19T09:23:00.743243 #23681] INFO -- : > queue size: 1: `1` pushed
I, [2024-04-19T09:23:00.743249 #23681] INFO -- : > queue size: 1: `2` pushing...
I, [2024-04-19T09:23:00.743261 #23681] INFO -- : > queue size: 2: `2` pushed
I, [2024-04-19T09:23:00.743273 #23681] INFO -- : > queue size: 2: `3` pushing...
I, [2024-04-19T09:23:00.743309 #23681] INFO -- : > queue size: 1: `3` pushed
I, [2024-04-19T09:23:00.743341 #23681] INFO -- : > queue size: 0: `4` pushing...
I, [2024-04-19T09:23:00.743353 #23681] INFO -- : > queue size: 1: `4` pushed
I, [2024-04-19T09:23:00.743361 #23681] INFO -- : > queue size: 1: `5` pushing...
I, [2024-04-19T09:23:00.743366 #23681] INFO -- : > queue size: 2: `5` pushed
I, [2024-04-19T09:23:00.743381 #23681] INFO -- : > queue size: 2: `6` pushing...
I, [2024-04-19T09:23:00.743385 #23681] INFO -- : > queue size: 3: `6` pushed
I, [2024-04-19T09:23:00.743388 #23681] INFO -- : > queue size: 3: `7` pushing...
I, [2024-04-19T09:23:00.743404 #23681] INFO -- : > queue size: 3: `7` pushed
I, [2024-04-19T09:23:00.743414 #23681] INFO -- : > queue size: 3: `8` pushing...
I, [2024-04-19T09:23:00.743417 #23681] INFO -- : > queue size: 4: `8` pushed
I, [2024-04-19T09:23:00.743420 #23681] INFO -- : > queue size: 4: `9` pushing...
I, [2024-04-19T09:23:00.743423 #23681] INFO -- : > queue size: 5: `9` pushed
I, [2024-04-19T09:23:00.743433 #23681] INFO -- : > queue size: 5: `10` pushing...
I, [2024-04-19T09:23:01.744520 #23681] INFO -- : 1
I, [2024-04-19T09:23:01.744674 #23681] INFO -- : 3
I, [2024-04-19T09:23:01.746086 #23681] INFO -- : 4
I, [2024-04-19T09:23:01.747007 #23681] INFO -- : 2
I, [2024-04-19T09:23:01.747056 #23681] INFO -- : 0
I, [2024-04-19T09:23:02.745153 #23681] INFO -- : > queue size: 5: `10` pushed
I, [2024-04-19T09:23:02.745261 #23681] INFO -- : > queue size: 5: `11` pushing...
I, [2024-04-19T09:23:02.746270 #23681] INFO -- : > queue size: 5: `11` pushed
I, [2024-04-19T09:23:02.746299 #23681] INFO -- : > queue size: 5: `12` pushing...
I, [2024-04-19T09:23:02.748462 #23681] INFO -- : > queue size: 5: `12` pushed
I, [2024-04-19T09:23:02.748575 #23681] INFO -- : > queue size: 4: `13` pushing...
I, [2024-04-19T09:23:02.748590 #23681] INFO -- : > queue size: 5: `13` pushed
I, [2024-04-19T09:23:02.748610 #23681] INFO -- : > queue size: 5: `14` pushing...
I, [2024-04-19T09:23:02.749090 #23681] INFO -- : > queue size: 5: `14` pushed
I, [2024-04-19T09:23:02.749123 #23681] INFO -- : > queue size: 5: `15` pushing...
I, [2024-04-19T09:23:03.746487 #23681] INFO -- : 5
I, [2024-04-19T09:23:03.747447 #23681] INFO -- : 6
I, [2024-04-19T09:23:03.750669 #23681] INFO -- : 7
I, [2024-04-19T09:23:03.750797 #23681] INFO -- : 8
I, [2024-04-19T09:23:03.751055 #23681] INFO -- : 9
I, [2024-04-19T09:23:04.747847 #23681] INFO -- : > queue size: 5: `15` pushed
I, [2024-04-19T09:23:04.747930 #23681] INFO -- : > queue size: 5: `16` pushing...
I, [2024-04-19T09:23:04.748059 #23681] INFO -- : > queue size: 5: `16` pushed
I, [2024-04-19T09:23:04.748099 #23681] INFO -- : > queue size: 5: `17` pushing...
I, [2024-04-19T09:23:04.752887 #23681] INFO -- : > queue size: 5: `17` pushed
I, [2024-04-19T09:23:04.752908 #23681] INFO -- : > queue size: 5: `18` pushing...
I, [2024-04-19T09:23:04.753177 #23681] INFO -- : > queue size: 5: `18` pushed
I, [2024-04-19T09:23:04.753252 #23681] INFO -- : > queue size: 5: `19` pushing...
I, [2024-04-19T09:23:04.755807 #23681] INFO -- : > queue size: 5: `19` pushed
I, [2024-04-19T09:23:04.755842 #23681] INFO -- : > queue size: 5: `20` pushing...
I, [2024-04-19T09:23:05.748653 #23681] INFO -- : 10
I, [2024-04-19T09:23:05.748913 #23681] INFO -- : 11
I, [2024-04-19T09:23:05.754761 #23681] INFO -- : 12
I, [2024-04-19T09:23:05.755085 #23681] INFO -- : 13
I, [2024-04-19T09:23:05.756269 #23681] INFO -- : 14
I, [2024-04-19T09:23:06.750502 #23681] INFO -- : > queue size: 5: `20` pushed
I, [2024-04-19T09:23:06.750711 #23681] INFO -- : > queue size: 5: `21` pushing...
I, [2024-04-19T09:23:06.750970 #23681] INFO -- : > queue size: 5: `21` pushed
I, [2024-04-19T09:23:06.751057 #23681] INFO -- : > queue size: 5: `22` pushing...
I, [2024-04-19T09:23:06.756180 #23681] INFO -- : > queue size: 5: `22` pushed
I, [2024-04-19T09:23:06.756282 #23681] INFO -- : > queue size: 5: `23` pushing...
I, [2024-04-19T09:23:06.757186 #23681] INFO -- : > queue size: 5: `23` pushed
I, [2024-04-19T09:23:06.757250 #23681] INFO -- : > queue size: 5: `24` pushing...
I, [2024-04-19T09:23:06.757537 #23681] INFO -- : > queue size: 5: `24` pushed
I, [2024-04-19T09:23:06.757590 #23681] INFO -- : > queue size: 5: `25` pushing...
I, [2024-04-19T09:23:07.751239 #23681] INFO -- : 15
I, [2024-04-19T09:23:07.751441 #23681] INFO -- : 16
I, [2024-04-19T09:23:07.758272 #23681] INFO -- : 17
I, [2024-04-19T09:23:07.759168 #23681] INFO -- : 18
I, [2024-04-19T09:23:07.760531 #23681] INFO -- : 19
I, [2024-04-19T09:23:08.754048 #23681] INFO -- : > queue size: 5: `25` pushed
I, [2024-04-19T09:23:08.754155 #23681] INFO -- : > queue size: 5: `26` pushing...
I, [2024-04-19T09:23:08.755400 #23681] INFO -- : > queue size: 5: `26` pushed
I, [2024-04-19T09:23:08.755455 #23681] INFO -- : > queue size: 5: `27` pushing...
I, [2024-04-19T09:23:08.761295 #23681] INFO -- : > queue size: 4: `27` pushed
I, [2024-04-19T09:23:08.761383 #23681] INFO -- : > queue size: 4: `28` pushing...
I, [2024-04-19T09:23:08.761414 #23681] INFO -- : > queue size: 5: `28` pushed
I, [2024-04-19T09:23:08.761437 #23681] INFO -- : > queue size: 5: `29` pushing...
I, [2024-04-19T09:23:08.763427 #23681] INFO -- : > queue size: 5: `29` pushed
I, [2024-04-19T09:23:08.763463 #23681] INFO -- : > stopping...
I, [2024-04-19T09:23:09.755312 #23681] INFO -- : 20
I, [2024-04-19T09:23:09.759050 #23681] INFO -- : 21
I, [2024-04-19T09:23:09.762835 #23681] INFO -- : 22
I, [2024-04-19T09:23:09.763110 #23681] INFO -- : 23
I, [2024-04-19T09:23:09.764942 #23681] INFO -- : 24
I, [2024-04-19T09:23:11.759153 #23681] INFO -- : 25
I, [2024-04-19T09:23:11.765669 #23681] INFO -- : 27
I, [2024-04-19T09:23:11.766846 #23681] INFO -- : 28
I, [2024-04-19T09:23:11.768197 #23681] INFO -- : 26
I, [2024-04-19T09:23:11.768303 #23681] INFO -- : 29
I, [2024-04-19T09:23:12.772832 #23681] INFO -- : > stopped!
=end
describe ThreadPool do
it 'runs tasks in parallel with the specified parallelism' do
mutex = Mutex.new
values = []
t0 = Time.zone.now
pool = ThreadPool.new(5)
10.times do |i|
pool.post do
sleep 1
mutex.synchronize { values << i }
end
end
pool.shutdown
t1 = Time.zone.now
expect(values).to match_array(10.times.to_a)
expect((t1 - t0).floor).to eq(2)
end
context 'when it raised exceptions on worker threads' do
it 'stops to run tasks and it raises the exception on shutdown' do
mutex = Mutex.new
values = []
pool = ThreadPool.new(5)
10.times do |i|
posted = pool.post do
if i == 4
sleep 0.5
raise StandardError, "error"
else
sleep 1
mutex.synchronize { values << i }
end
end
break unless posted
end
expect { pool.shutdown }.to raise_error(StandardError, "error")
expect(values).to match_array([0, 1, 2, 3])
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment