Created
October 3, 2024 16:34
-
-
Save spraints/8e538ff016833f25bd5ce5b6a7f71ffd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#/ Usage: ruby run.rb [--resume] runlist.txt outdir | |
#/ | |
#/ --resume picks up after a gracefully shut down previous attempt. | |
#/ | |
#/ outdir must not exist, but be a path where we can create a directory. | |
#/ | |
#/ runlist.txt must be a list of the inputs we're going to process, once per line. | |
require "fileutils" | |
NUM_CORE = 128 | |
# Define some bounds for the concurrency limit. I'm experimenting and don't | |
# want things to run too far off the rails. | |
MIN_CAPACITY = 1 | |
INIT_CAPACITY = NUM_CORE / 10 | |
MAX_CAPACITY = NUM_CORE * 5 | |
def main(input:, outdir:) | |
if File.exist?(outdir) | |
$stderr.puts "error: #{outdir} already exists" | |
exit 1 | |
end | |
puts "reading #{input}..." | |
inputs = File.readlines(input).map(&:strip) | |
input_size = inputs.size | |
puts "found #{input_size} inputs" | |
FileUtils.mkdir_p(outdir) | |
logfile = File.open(File.join(outdir, "run.log"), "w") | |
sigs = SigHandler.new | |
mon = PressureMonitor.new | |
capacity = INIT_CAPACITY | |
running = {} | |
skipped = 0 | |
done = 0 | |
finished = [] | |
until (sigs.shutdown? || inputs.empty?) && running.empty? | |
while !running.empty? | |
pid, status = Process.waitpid2(-1, Process::WNOHANG) | |
break if pid.nil? | |
info = running.delete(pid) | |
finished.push({stop: Time.now, status: status}.merge(info)) | |
done += 1 | |
end | |
capacity = mon.adjust_capacity(capacity) | |
while !sigs.shutdown? && running.size < capacity && !inputs.empty? | |
input = inputs.shift | |
pid = spawn "ruby", "process-input.rb", "--out-dir", outdir, "--input", input, | |
out: logfile, err: logfile | |
running[pid] = {start: Time.now, input: input} | |
end | |
printf "\r[%s] %3d/%3d running / %6d skipped / %6d done / %6d remaining%s", | |
mon.to_s, | |
running.size, | |
capacity, | |
skipped, | |
done, | |
inputs.size, | |
sigs.shutdown? ? " SHUTTING DOWN" : "" | |
# Wait until there's a signal or until it's time to maybe adjust capacity | |
# (every second). | |
if IO.select([sigs.r], nil, nil, 1.0) | |
sigs.r.read(1) | |
end | |
end | |
puts | |
ok, failed = finished.partition { |x| x[:status].success? } | |
puts "#{ok.size} ok, #{failed.size} failed" | |
puts "avg time to process each input: #{avg_time(finished)}" | |
puts "avg time of ok: #{avg_time(ok)}" unless ok.empty? | |
end | |
def avg_time(results) | |
seconds = results.map { |x| x[:stop].to_f - x[:start].to_f }.sum / results.size | |
"%9.1f ms" % (1000.0 * seconds) | |
end | |
class PressureMonitor | |
INTERVAL = 1.0 | |
INCREASE = 1 | |
DECREASE = 0.9 | |
def initialize | |
@init = get_pressure | |
@prev = @cur = @init | |
@prev_time = @last = Time.now.to_f | |
end | |
def to_s | |
[cpu, memory, io].map(&:to_c).join | |
end | |
def adjust_capacity(cap) | |
now = Time.now.to_f | |
if now - @last < INTERVAL | |
return cap | |
end | |
@prev = @cur | |
@prev_time = @last | |
@cur = get_pressure | |
@last = now | |
all_healthy = true | |
any_sick = false | |
[cpu, memory, io].each do |h| | |
all_healthy = false unless h.healthy? | |
any_sick = true if h.sick? | |
end | |
if any_sick | |
cap = [ MIN_CAPACITY, (cap * DECREASE).to_i ].max | |
elsif all_healthy | |
cap = [ MAX_CAPACITY, cap + INCREASE ].min | |
end | |
cap | |
rescue | |
cap | |
end | |
def cpu | |
mkp(:cpu, healthy: 10, sick: 40) | |
end | |
def memory | |
mkp(:memory, healthy: 1.0, sick: 1.0) | |
end | |
def io | |
mkp(:io, healthy: 1.0, sick: 5.0) | |
end | |
def mkp(cat, healthy:, sick:) | |
Pressure.new(@cur.fetch(cat), prev: @prev.fetch(cat), interval: @last - @prev_time, healthy: healthy, sick: sick) | |
end | |
def get_pressure | |
[:cpu, :io, :memory].map do |cat| | |
[cat, Pressure.parse(File.read("/proc/pressure/#{cat}"))] | |
end.to_h | |
end | |
end | |
class Pressure | |
def self.parse(content) | |
if content.lines.first =~ /some avg10=(\d+\.\d+) .* total=(\d+)/ | |
{recent: $1.to_f, total: $2.to_i} | |
else | |
raise "could not parse #{content.inspect}" | |
end | |
end | |
def initialize(data, prev:, interval:, healthy:, sick:) | |
@data = data | |
@prev = prev | |
@interval = interval | |
@healthy = healthy | |
@sick = sick | |
end | |
def to_c | |
case | |
when healthy? | |
"^" | |
when sick? | |
"v" | |
else | |
"=" | |
end | |
end | |
def healthy? | |
pct < @healthy | |
end | |
def sick? | |
pct > @sick | |
end | |
def pct | |
if @interval < 0.1 | |
@data[:recent] | |
else | |
seconds_delay = (@data[:total] - @prev[:total]) / 1000000.0 | |
100.0 * seconds_delay / @interval | |
end | |
end | |
end | |
class SigHandler | |
def initialize | |
@shutdown = false | |
@r, w = IO.pipe | |
[:INT, :TERM, :QUIT].each do |sig| | |
trap(sig) do | |
if @shutdown | |
exit 1 | |
end | |
@shutdown = true | |
end | |
end | |
trap(:CLD) { w.write(".") } | |
end | |
def r | |
@r | |
end | |
def shutdown? | |
@shutdown | |
end | |
end | |
input, outdir, should_be_nil, = ARGV | |
if input.nil? || outdir.nil? || !should_be_nil.nil? | |
system "cat #$0 | grep ^#/ | cut -c4-" | |
exit 1 | |
end | |
main(input: input, outdir: outdir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment