-
-
Save codekitchen/44fc6bd733ea10430db56da10908a69e to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby --yjit | |
WORKER_THREADS = 4 | |
CHUNK_SIZE = 2**16 | |
# BUG: my city struct is being corrupted when moving from the worker to the main ractor. | |
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=1164>` becomes | |
# `#<struct City min=-11.3, tot=24088.30000000004, max=64.6, n=nil>`, note that each `n` attribute becomes `nil`. | |
# https://bugs.ruby-lang.org/issues/20165 | |
# I tried changing the Struct to a simple array and still using `move: true`, | |
# but I ran into similar data corruption issues! So I'm going to switch to `move: false`, | |
# it's not a huge deal in this case since it's just one hash per worker copied at the end. | |
City = Struct.new(:min, :tot, :max, :n) | |
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
workers = (1..WORKER_THREADS).map do | |
Ractor.new do | |
cities = city_hash | |
while chunk = Ractor.receive | |
chunk.each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = val.to_f | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
end | |
cities | |
end | |
end | |
# map step | |
nchunks = 0 | |
infile = File.open('measurements.txt', 'rb') | |
round = workers.cycle | |
until infile.eof? | |
chunk = infile.read CHUNK_SIZE | |
# ensure we're on a line boundary | |
chunk += infile.readline unless infile.eof? | |
ractor = round.next | |
ractor << chunk.freeze # needs backpressure | |
nchunks += 1 | |
# break if nchunks > 5_000 | |
end | |
# reduce step | |
results = city_hash | |
workers.each do |ractor| | |
ractor << nil # signal done | |
cities = ractor.take | |
cities.each do |name,o| | |
city = results[name] | |
city.min = o.min if o.min < city.min | |
city.max = o.max if o.max > city.max | |
city.tot += o.tot | |
city.n += o.n | |
end | |
end | |
results.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
#!/usr/bin/env ruby --yjit | |
require 'async' | |
require 'async/queue' | |
WORKER_THREADS = 4 | |
CHUNK_SIZE = 2**16 | |
Q_SIZE = 10_000 | |
City = Struct.new(:min, :tot, :max, :n) | |
def city_hash = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
workers = (1..WORKER_THREADS).map do | |
Ractor.new(Ractor.current) do |main_ractor| | |
cities = city_hash | |
while chunk = main_ractor.take | |
chunk.each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = val.to_f | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
end | |
cities | |
end | |
end | |
# map step | |
Async do | |
chunks = Async::LimitedQueue.new(Q_SIZE) | |
reader = Async do | |
nchunks = 0 | |
infile = File.open('measurements.txt', 'rb') | |
until infile.eof? | |
chunk = infile.read CHUNK_SIZE | |
# ensure we're on a line boundary | |
chunk += infile.readline unless infile.eof? | |
chunks << chunk.freeze | |
nchunks += 1 | |
# break if nchunks > 5_000 | |
end | |
chunks << nil | |
end | |
chunks.each { |c| Ractor.yield c } | |
end | |
# reduce step | |
results = city_hash | |
workers.size.times { Ractor.yield nil } # signal done | |
workers.each do |ractor| | |
cities = ractor.take | |
cities.each do |name,o| | |
city = results[name] | |
city.min = o.min if o.min < city.min | |
city.max = o.max if o.max > city.max | |
city.tot += o.tot | |
city.n += o.n | |
end | |
end | |
results.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
#!/usr/bin/env ruby --yjit | |
City = Struct.new(:min, :tot, :max, :n) | |
cities = Hash.new { |h,k| h[k] = City[Float::INFINITY,0,-Float::INFINITY,0] } | |
File.open('measurements.txt', 'rb').each_line do |line| | |
name,val = line.split(";") | |
city = cities[name] | |
city.n += 1 | |
val = Float(val) | |
city.min = val if val < city.min | |
city.max = val if val > city.max | |
city.tot += val | |
end | |
cities.sort_by(&:first).each{|n,c| | |
puts "%s=%.1f/%.1f/%.1f" % [n,c.min,c.tot/c.n,c.max] | |
} |
Hey! Cool gist. Ractors are exciting but unfortunately still experimental. I shamelessly took this as a base and updated it to use ZMQ. It forks workers and communicates with them over ZMQ sockets.
https://gist.github.com/paddor/471586839f69fe2d74f219731b861be4#file-1brc_zmq-rb
On my old MBP 2019 it completes within 162.58 sec:
$ time pv --rate --average-rate --progress ../measurements.txt | ./1brc_zmq.rb > agg.txt
main: 61827
worker 0: 61828
worker 1: 61829
worker 2: 61830
worker 3: 61831
worker 4: 61832
worker 5: 61833
worker 6: 61834
worker 7: 61835
worker 8: 61836
worker 9: 61837
worker 10: 61838
worker 11: 61839
worker 12: 61840
worker 13: 61841
worker 14: 61842
worker 15: 61843
[ 250MiB/s] [ 250MiB/s] [======================================================================>] 100%
main: all chunks sent to workers
main: sorting and printing ...
main: terminating workers: [61828, 61829, 61830, 61831, 61832, 61833, 61834, 61835, 61836, 61837, 61838, 61839, 61840, 61841, 61842, 61843]
________________________________________________________
Executed in 162.58 secs fish external
usr time 29.64 mins 0.37 millis 29.64 mins
sys time 1.46 mins 1.41 millis 1.46 mins
Nice! Current ractors seem to have surprising overhead, there's some discussion on this reddit thread with ractor perf vs a couple other forking solutions and yeah, current Ruby 3.3 ractors are so much slower than just forking. It doesn't seem related to getting the data into the ractors, I haven't looked any further but it feels like a locking or scheduling issue. Hopefully it's nothing fundamental and just a sign that ractors need further development.
Do you also have better results for Ractors with less workers? 8 Ractors almost twice slower than 4 for me
https://github.com/simpl1g/1brc/blob/main/README.md?plain=1#L59-L69