Skip to content

Instantly share code, notes, and snippets.

@spraints
Created October 3, 2024 16:34
Show Gist options
  • Save spraints/8e538ff016833f25bd5ce5b6a7f71ffd to your computer and use it in GitHub Desktop.
Save spraints/8e538ff016833f25bd5ce5b6a7f71ffd to your computer and use it in GitHub Desktop.
#/ Usage: ruby run.rb [--resume] runlist.txt outdir
#/
#/ --resume picks up after a gracefully shut down previous attempt.
#/
#/ outdir must not exist, but be a path where we can create a directory.
#/
#/ runlist.txt must be a list of the inputs we're going to process, once per line.
require "fileutils"
NUM_CORE = 128
# Define some bounds for the concurrency limit. I'm experimenting and don't
# want things to run too far off the rails.
MIN_CAPACITY = 1
INIT_CAPACITY = NUM_CORE / 10
MAX_CAPACITY = NUM_CORE * 5
def main(input:, outdir:)
if File.exist?(outdir)
$stderr.puts "error: #{outdir} already exists"
exit 1
end
puts "reading #{input}..."
inputs = File.readlines(input).map(&:strip)
input_size = inputs.size
puts "found #{input_size} inputs"
FileUtils.mkdir_p(outdir)
logfile = File.open(File.join(outdir, "run.log"), "w")
sigs = SigHandler.new
mon = PressureMonitor.new
capacity = INIT_CAPACITY
running = {}
skipped = 0
done = 0
finished = []
until (sigs.shutdown? || inputs.empty?) && running.empty?
while !running.empty?
pid, status = Process.waitpid2(-1, Process::WNOHANG)
break if pid.nil?
info = running.delete(pid)
finished.push({stop: Time.now, status: status}.merge(info))
done += 1
end
capacity = mon.adjust_capacity(capacity)
while !sigs.shutdown? && running.size < capacity && !inputs.empty?
input = inputs.shift
pid = spawn "ruby", "process-input.rb", "--out-dir", outdir, "--input", input,
out: logfile, err: logfile
running[pid] = {start: Time.now, input: input}
end
printf "\r[%s] %3d/%3d running / %6d skipped / %6d done / %6d remaining%s",
mon.to_s,
running.size,
capacity,
skipped,
done,
inputs.size,
sigs.shutdown? ? " SHUTTING DOWN" : ""
# Wait until there's a signal or until it's time to maybe adjust capacity
# (every second).
if IO.select([sigs.r], nil, nil, 1.0)
sigs.r.read(1)
end
end
puts
ok, failed = finished.partition { |x| x[:status].success? }
puts "#{ok.size} ok, #{failed.size} failed"
puts "avg time to process each input: #{avg_time(finished)}"
puts "avg time of ok: #{avg_time(ok)}" unless ok.empty?
end
def avg_time(results)
seconds = results.map { |x| x[:stop].to_f - x[:start].to_f }.sum / results.size
"%9.1f ms" % (1000.0 * seconds)
end
class PressureMonitor
INTERVAL = 1.0
INCREASE = 1
DECREASE = 0.9
def initialize
@init = get_pressure
@prev = @cur = @init
@prev_time = @last = Time.now.to_f
end
def to_s
[cpu, memory, io].map(&:to_c).join
end
def adjust_capacity(cap)
now = Time.now.to_f
if now - @last < INTERVAL
return cap
end
@prev = @cur
@prev_time = @last
@cur = get_pressure
@last = now
all_healthy = true
any_sick = false
[cpu, memory, io].each do |h|
all_healthy = false unless h.healthy?
any_sick = true if h.sick?
end
if any_sick
cap = [ MIN_CAPACITY, (cap * DECREASE).to_i ].max
elsif all_healthy
cap = [ MAX_CAPACITY, cap + INCREASE ].min
end
cap
rescue
cap
end
def cpu
mkp(:cpu, healthy: 10, sick: 40)
end
def memory
mkp(:memory, healthy: 1.0, sick: 1.0)
end
def io
mkp(:io, healthy: 1.0, sick: 5.0)
end
def mkp(cat, healthy:, sick:)
Pressure.new(@cur.fetch(cat), prev: @prev.fetch(cat), interval: @last - @prev_time, healthy: healthy, sick: sick)
end
def get_pressure
[:cpu, :io, :memory].map do |cat|
[cat, Pressure.parse(File.read("/proc/pressure/#{cat}"))]
end.to_h
end
end
class Pressure
def self.parse(content)
if content.lines.first =~ /some avg10=(\d+\.\d+) .* total=(\d+)/
{recent: $1.to_f, total: $2.to_i}
else
raise "could not parse #{content.inspect}"
end
end
def initialize(data, prev:, interval:, healthy:, sick:)
@data = data
@prev = prev
@interval = interval
@healthy = healthy
@sick = sick
end
def to_c
case
when healthy?
"^"
when sick?
"v"
else
"="
end
end
def healthy?
pct < @healthy
end
def sick?
pct > @sick
end
def pct
if @interval < 0.1
@data[:recent]
else
seconds_delay = (@data[:total] - @prev[:total]) / 1000000.0
100.0 * seconds_delay / @interval
end
end
end
class SigHandler
def initialize
@shutdown = false
@r, w = IO.pipe
[:INT, :TERM, :QUIT].each do |sig|
trap(sig) do
if @shutdown
exit 1
end
@shutdown = true
end
end
trap(:CLD) { w.write(".") }
end
def r
@r
end
def shutdown?
@shutdown
end
end
input, outdir, should_be_nil, = ARGV
if input.nil? || outdir.nil? || !should_be_nil.nil?
system "cat #$0 | grep ^#/ | cut -c4-"
exit 1
end
main(input: input, outdir: outdir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment