Last active
December 22, 2015 21:29
-
-
Save thinkerbot/6533127 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class NonFairQueue | |
attr_reader :nodes | |
attr_reader :edges | |
def initialize(options = {}) | |
@nodes = options.fetch(:nodes, {}) # {name => {:command, :stdin_type, :stdout_type}} | |
@edges = options.fetch(:edges, {}) # {parent => [child]} | |
end | |
def run | |
pipes = {} | |
nodes.each_pair do |name, node| | |
pipes[name] = IO.popen | |
end | |
tees, untees = {}, {} | |
nodes.each_pair do |parent, pnode| | |
stdout_type = nodes[pnode][:stdout_type] | |
edges[parent].each do |child| | |
r, w = IO.pipe | |
stdin_type = nodes[child][:stdin_type] | |
tees[parent] << outputter(stdout_type, stdin_type, w) | |
untees[child] << r | |
end | |
end | |
all = ([$stdin, $stdout] + [pipes, tees, untees].map(&:values)).flatten | |
pids = {} | |
nodes.each_pair do |name, node| | |
stdin, stdout = pipes[name] | |
pids[name] = [] | |
if node_pid = fork | |
pids[name] << node_pid | |
if tee_pid = fork | |
pids[name] << tee_pid | |
if untee_pid = fork | |
pids[name] << untee_pid | |
else # untee | |
inputs = untees[name] | |
close(all - [stdin] - inputs) | |
logger.name = "#{logger.name}::#{name} I" | |
untee(inputs, stdin) | |
end | |
else # tee | |
outputs = tees[name] | |
close(all - [stdout] - outputs) | |
logger.name = "#{logger.name}::#{name} O" | |
tee(stdout, outputs) | |
end | |
else # exec | |
close(all - [$stdin, stdin, $stdout, stdout]) | |
$stdin.reopen(stdin); $stdout.reopen(stdout) | |
exec *node[:command] | |
end | |
end | |
close(all - [$stdin, $stdout]) | |
status_codes = Process.waitall | |
status_codes = Hash[*status_codes] | |
results = {} | |
pids.each_pair do |name, (npid,tpid,upid)| | |
results[name] = { | |
'node' => status_codes[npid], | |
'tee' => status_codes[tpid], | |
'untee' => status_codes[upid] | |
} | |
end | |
results | |
end | |
class Outputter | |
attr_reader :output | |
attr_reader :type | |
attr_reader :prefix | |
attr_reader :prefix_length | |
def initialize(output, type = :*) | |
@output = output | |
@type = type | |
@prefix = type == :* ? "" : "#{type} " | |
@prefix_length = @prefix.length | |
end | |
def write(line) | |
output.write line | |
end | |
end | |
class Splitter < Outputter | |
def write(line) | |
if line[0, prefix_length] == prefix | |
super line[prefix_length, line.length - prefix_length] | |
end | |
end | |
end | |
class Joiner < Outputter | |
def write(line) | |
output.print prefix | |
super line | |
end | |
end | |
def outputter(stdout_type, stdin_type, output) | |
case | |
when stdout_type == stdin_type then Outputter.new(output, stdin_type) | |
when stdout_type == :* then Splitter.new(output, stdin_type) | |
when stdin_type == :* then Joiner.new(output, stdin_type) | |
else raise "invalid stdout/stdin pair: #{stdout_type.inspect} -> #{stdin_type.inspect}" | |
end | |
end | |
def close(ios) | |
ios.each {|io| io.close } | |
end | |
def tee(input, outputs) | |
while line = input.gets | |
logger.trace line | |
if line.length > pipe_buf | |
logger.error "line exceeds PIPE_BUF: #{line.length} (#{pipe_buf})" | |
break | |
else | |
outputs.each do |output| | |
output.write line | |
end | |
end | |
end | |
end | |
def untee(inputs, output) | |
if inputs.length == 1 | |
input = inputs.first | |
while line = input.gets | |
logger.trace line | |
output.write line | |
end | |
else | |
while ready = IO.select(inputs) | |
ready.first.each do |input| | |
line = input.gets | |
logger.trace line | |
output.write line | |
end | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment