Created
March 13, 2024 21:16
-
-
Save leonid-shevtsov/c99c05149a7fa03b54afcb1419e3d68c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'json' | |
require 'strscan' | |
FILENAME = ARGV[0] | |
POOLSIZE = 6 | |
BUFSIZE = 128 * 1024 * 1024 | |
pool = Array.new(POOLSIZE) do |_idx| # rubocop:disable Metrics/BlockLength | |
outpipe_out, outpipe_in = IO.pipe(encoding: Encoding::BINARY) | |
cmdpipe_out, cmdpipe_in = IO.pipe(encoding: Encoding::BINARY) | |
inpipe_out, inpipe_in = IO.pipe(encoding: Encoding::BINARY) | |
pid = fork do | |
inpipe_in.close | |
cmdpipe_in.close | |
outpipe_out.close | |
loop do | |
break if cmdpipe_out.eof? | |
len = cmdpipe_out.gets.to_i | |
part = inpipe_out.read(len) | |
stats = Hash.new { |h, k| h[k] = [10_000, -10_000, 0, 0] } | |
scanner = StringScanner.new(part) | |
loop do | |
city = scanner.scan_until(/;/) | |
temp_s = scanner.scan_until(/\n/) | |
temp = temp_s.to_f | |
existing = stats[city] | |
min, max, sum, count = existing | |
existing[0] = temp if temp < min | |
existing[1] = temp if temp > max | |
existing[2] = sum + temp | |
existing[3] = count + 1 | |
break if scanner.eos? | |
end | |
outpipe_in.puts(JSON.generate(stats)) | |
end | |
outpipe_in.close | |
end | |
outpipe_in.close | |
inpipe_out.close | |
cmdpipe_out.close | |
{ | |
pid:, cmdpipe_in:, inpipe_in:, outpipe_out: | |
} | |
end | |
collectpipe_out, collectpipe_in = IO.pipe(encoding: Encoding::BINARY) | |
fork do | |
collectpipe_out.close | |
pool.each do |w| | |
w[:inpipe_in].close | |
w[:cmdpipe_in].close | |
end | |
statparts = [] | |
output_pipes = pool.map { |w| w[:outpipe_out] } | |
loop do | |
ready_io = IO.select(output_pipes).first.first | |
if ready_io.eof? | |
output_pipes.delete(ready_io) | |
break if output_pipes.empty? | |
next | |
end | |
statparts << JSON.parse(ready_io.gets) | |
end | |
stats = {}.merge(*statparts) do |_, s1, s2| | |
[ | |
s1[0] < s2[0] ? s1[0] : s2[0], | |
s1[1] > s2[1] ? s1[1] : s2[1], | |
s1[2] + s2[2], | |
s1[3] + s2[3] | |
] | |
end | |
collectpipe_in.puts JSON.generate(stats) | |
end | |
collectpipe_in.close | |
workeridx = 0 | |
File.open(FILENAME, encoding: Encoding::BINARY) do |f| | |
loop do | |
buf = String.new('', encoding: Encoding::BINARY, capacity: BUFSIZE + 1000) | |
break if f.read(BUFSIZE, buf).nil? | |
begin | |
remainder = f.readline("\n") | |
buf.bytesplice(buf.length, buf.length, remainder) | |
rescue EOFError | |
# we still need to handle the part, but there is no remainder | |
'' | |
end | |
workeridx = (workeridx + 1) % pool.length | |
worker = pool[workeridx] | |
# puts "writing #{buf.length}" | |
worker[:cmdpipe_in].puts(buf.length.to_s) | |
worker[:inpipe_in].write(buf) | |
end | |
rescue EOFError | |
# finished processing the file | |
end | |
pool.each do |worker| | |
worker[:cmdpipe_in].close | |
worker[:inpipe_in].close | |
# Process.wait(worker[:pid]) | |
end | |
stats = JSON.parse(collectpipe_out.gets) | |
exit if ENV['NOOUT'] | |
puts JSON.pretty_generate(stats.sort.map do |(city, cstats)| | |
"#{city[0..-2]}=#{cstats[0]}/#{format('%0.1f', (cstats[2] / cstats[3]))}/#{cstats[1]}" | |
end) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment