Last active
January 15, 2024 19:45
-
-
Save codekitchen/44fc6bd733ea10430db56da10908a69e to your computer and use it in GitHub Desktop.
Exploring the 1BRC in Ruby with Ractors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby --yjit | |
WORKER_THREADS = 4 | |
CHUNK_SIZE = 2**16 | |
# BUG: my city struct is being corrupted when moving from the worker to the main ractor. | |
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=1164>` becomes | |
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=nil>`, note that each `n` attribute becomes `nil`. | |
# https://bugs.ruby-lang.org/issues/20165 | |
# I tried changing the Struct to a simple array and still using `move: true`, | |
# but I ran into similar data corruption issues! So I'm going to switch to `move: false`, | |
# it's not a huge deal in this case since it's just one hash per worker copied at the end. | |
City = Struct.new(:min, :tot, :max, :n) | |
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
workers = (1..WORKER_THREADS).map do | |
Ractor.new do | |
cities = city_hash | |
while chunk = Ractor.receive | |
chunk.each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = val.to_f | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
end | |
cities | |
end | |
end | |
# map step | |
nchunks = 0 | |
infile = File.open('measurements.txt', 'rb') | |
round = workers.cycle | |
until infile.eof? | |
chunk = infile.read CHUNK_SIZE | |
# ensure we're on a line boundary | |
chunk += infile.readline unless infile.eof? | |
ractor = round.next | |
ractor << chunk.freeze # needs backpressure | |
nchunks += 1 | |
# break if nchunks > 5_000 | |
end | |
# reduce step | |
results = city_hash | |
workers.each do |ractor| | |
ractor << nil # signal done | |
cities = ractor.take | |
cities.each do |name,o| | |
city = results[name] | |
city.min = o.min if o.min < city.min | |
city.max = o.max if o.max > city.max | |
city.tot += o.tot | |
city.n += o.n | |
end | |
end | |
results.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby --yjit | |
require 'async' | |
require 'async/queue' | |
WORKER_THREADS = 4 | |
CHUNK_SIZE = 2**16 | |
Q_SIZE = 10_000 | |
City = Struct.new(:min, :tot, :max, :n) | |
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
workers = (1..WORKER_THREADS).map do | |
Ractor.new(Ractor.current) do |main_ractor| | |
cities = city_hash | |
while chunk = main_ractor.take | |
chunk.each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = val.to_f | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
end | |
cities | |
end | |
end | |
# map step | |
Async do | |
chunks = Async::LimitedQueue.new(Q_SIZE) | |
reader = Async do | |
nchunks = 0 | |
infile = File.open('measurements.txt', 'rb') | |
until infile.eof? | |
chunk = infile.read CHUNK_SIZE | |
# ensure we're on a line boundary | |
chunk += infile.readline unless infile.eof? | |
chunks << chunk.freeze | |
nchunks += 1 | |
# break if nchunks > 5_000 | |
end | |
chunks << nil | |
end | |
chunks.each { |c| Ractor.yield c } | |
end | |
# reduce step | |
results = city_hash | |
workers.size.times { Ractor.yield nil } # signal done | |
workers.each do |ractor| | |
cities = ractor.take | |
cities.each do |name,o| | |
city = results[name] | |
city.min = o.min if o.min < city.min | |
city.max = o.max if o.max > city.max | |
city.tot += o.tot | |
city.n += o.n | |
end | |
end | |
results.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby --yjit | |
City = Struct.new(:min, :tot, :max, :n) | |
cities = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
File.open('measurements.txt', 'rb').each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = Float(val) | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
cities.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
Nice! Current ractors seem to have surprising overhead, there's some discussion on this reddit thread with ractor perf vs a couple other forking solutions and yeah, current Ruby 3.3 ractors are so much slower than just forking. It doesn't seem related to getting the data into the ractors, I haven't looked any further but it feels like a locking or scheduling issue. Hopefully it's nothing fundamental and just a sign that ractors need further development.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey! Cool gist. Ractors are exciting but unfortunately still experimental. I shamelessly took this as a base and updated it to use ZMQ. It forks workers and communicates with them over ZMQ sockets.
https://gist.github.com/paddor/471586839f69fe2d74f219731b861be4#file-1brc_zmq-rb
On my old MBP 2019 it completes within 162.58 sec: