Last active
September 9, 2016 14:14
-
-
Save rklemme/8537604 to your computer and use it in GitHub Desktop.
Execute multiple commands connected through pipes either via threads or processes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/ruby -w | |
if ENV['DEBUG'] | |
def debug(message) | |
$stderr.puts message | |
end | |
else | |
def debug(message) | |
# nop | |
end | |
end | |
# Execute pipelines of commands in threads or processes. | |
# Commands are captured in lambdas which receive IO for reading and writing. | |
class Pipe | |
# Executor base class | |
BaseExecutor = Struct.new :head, :tail, :commands do | |
def ignore?(io) | |
head.equal? io or tail.equal? io | |
end | |
def close(io) | |
io.close if !ignore?(io) && File.pipe?(io) | |
rescue Exception | |
# ignore | |
end | |
private | |
def exec_lambda(cmd, read, write) | |
cmd[read, write] | |
ensure | |
close(write) | |
close(read) | |
end | |
end | |
# Executes pipeline in processes. | |
class ProcessExecutor < BaseExecutor | |
def run | |
out = tail | |
wait_pid = nil | |
commands.reverse.each_cons(2) do |cmd2, cmd1| | |
io_read, io_write = IO.pipe | |
pid = fork do | |
debug "running #$$" | |
close(io_write) | |
exec_lambda(cmd2, io_read, out) | |
debug "terminated #$$" | |
end | |
close(io_read) | |
close(out) | |
out = io_write | |
wait_pid ||= pid # only the first created process which is tail of pipeline | |
end | |
exec_lambda(commands.first, head, out) | |
debug "Waiting for #{wait_pid}" | |
Process.waitpid(wait_pid) if wait_pid | |
end | |
end | |
# Executes pipeline in threads. | |
class ThreadExecutor < BaseExecutor | |
def run | |
debug $$ | |
out = tail | |
to_join = nil | |
commands.reverse.each_cons(2) do |cmd2, cmd1| | |
io_read, io_write = IO.pipe | |
th = Thread.new(cmd2, io_read, out) do |cmd, iin, iout| | |
exec_lambda(cmd, iin, iout) | |
end | |
to_join ||= th | |
out = io_write | |
end | |
exec_lambda(commands.first, head, out) | |
to_join.join if to_join | |
end | |
end | |
def initialize | |
@cmds = [] | |
end | |
def add(cmd) | |
case cmd | |
when self.class | |
@cmds.concat(cmd.instance_variable_get('@cmds')) | |
when Proc | |
@cmds << cmd | |
else | |
raise ArgumentError, "Invalid: #{cmd.inspect}" | |
end | |
self | |
end | |
alias << add | |
alias | add | |
def +(pipe) | |
raise ArgumentError, "Not a pipe: #{pipe.inspect}" unless self.class === pipe | |
self.class.new.tap do |copy| | |
copy.add(self).add(pipe) | |
end | |
end | |
def execute_processes(read = $stdin, write = $stdout) | |
exec = ProcessExecutor.new read, write, @cmds.dup | |
debug exec.inspect | |
exec.run | |
end | |
def execute_threads(read = $stdin, write = $stdout) | |
exec = ThreadExecutor.new read, write, @cmds.dup | |
debug exec.inspect | |
exec.run | |
end | |
end | |
pipe = Pipe.new << | |
lambda {|io_in, io_out| debug "1 running #$$"; 5.times {|i| io_out.puts i }} << | |
lambda {|io_in, io_out| debug "2 running #$$"; io_in.each_line {|line| line.chomp!; io_out.puts ">> #{line} <<"}} | |
pipe.execute_processes | |
# pipe.execute_threads | |
p2 = Pipe.new | |
p2 << lambda {|r,w| c=0;r.each {|x| c+=1}; w.puts c} | |
# p2 << lambda {|r,w| r.each {|x| w.puts x}} | |
(pipe + p2).execute_processes |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment