Skip to content

Instantly share code, notes, and snippets.

@thinkerbot
Last active December 22, 2015 21:29
Show Gist options
  • Save thinkerbot/6533127 to your computer and use it in GitHub Desktop.
Save thinkerbot/6533127 to your computer and use it in GitHub Desktop.
class NonFairQueue
attr_reader :nodes
attr_reader :edges
def initialize(options = {})
@nodes = options.fetch(:nodes, {}) # {name => {:command, :stdin_type, :stdout_type}}
@edges = options.fetch(:edges, {}) # {parent => [child]}
end
def run
pipes = {}
nodes.each_pair do |name, node|
pipes[name] = IO.popen
end
tees, untees = {}, {}
nodes.each_pair do |parent, pnode|
stdout_type = nodes[pnode][:stdout_type]
edges[parent].each do |child|
r, w = IO.pipe
stdin_type = nodes[child][:stdin_type]
tees[parent] << outputter(stdout_type, stdin_type, w)
untees[child] << r
end
end
all = ([$stdin, $stdout] + [pipes, tees, untees].map(&:values)).flatten
pids = {}
nodes.each_pair do |name, node|
stdin, stdout = pipes[name]
pids[name] = []
if node_pid = fork
pids[name] << node_pid
if tee_pid = fork
pids[name] << tee_pid
if untee_pid = fork
pids[name] << untee_pid
else # untee
inputs = untees[name]
close(all - [stdin] - inputs)
logger.name = "#{logger.name}::#{name} I"
untee(inputs, stdin)
end
else # tee
outputs = tees[name]
close(all - [stdout] - outputs)
logger.name = "#{logger.name}::#{name} O"
tee(stdout, outputs)
end
else # exec
close(all - [$stdin, stdin, $stdout, stdout])
$stdin.reopen(stdin); $stdout.reopen(stdout)
exec *node[:command]
end
end
close(all - [$stdin, $stdout])
status_codes = Process.waitall
status_codes = Hash[*status_codes]
results = {}
pids.each_pair do |name, (npid,tpid,upid)|
results[name] = {
'node' => status_codes[npid],
'tee' => status_codes[tpid],
'untee' => status_codes[upid]
}
end
results
end
class Outputter
attr_reader :output
attr_reader :type
attr_reader :prefix
attr_reader :prefix_length
def initialize(output, type = :*)
@output = output
@type = type
@prefix = type == :* ? "" : "#{type} "
@prefix_length = @prefix.length
end
def write(line)
output.write line
end
end
class Splitter < Outputter
def write(line)
if line[0, prefix_length] == prefix
super line[prefix_length, line.length - prefix_length]
end
end
end
class Joiner < Outputter
def write(line)
output.print prefix
super line
end
end
def outputter(stdout_type, stdin_type, output)
case
when stdout_type == stdin_type then Outputter.new(output, stdin_type)
when stdout_type == :* then Splitter.new(output, stdin_type)
when stdin_type == :* then Joiner.new(output, stdin_type)
else raise "invalid stdout/stdin pair: #{stdout_type.inspect} -> #{stdin_type.inspect}"
end
end
def close(ios)
ios.each {|io| io.close }
end
def tee(input, outputs)
while line = input.gets
logger.trace line
if line.length > pipe_buf
logger.error "line exceeds PIPE_BUF: #{line.length} (#{pipe_buf})"
break
else
outputs.each do |output|
output.write line
end
end
end
end
def untee(inputs, output)
if inputs.length == 1
input = inputs.first
while line = input.gets
logger.trace line
output.write line
end
else
while ready = IO.select(inputs)
ready.first.each do |input|
line = input.gets
logger.trace line
output.write line
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment