Last active
May 10, 2019 15:02
-
-
Save phil-monroe/4477190 to your computer and use it in GitHub Desktop.
Parallel, Multiple File Remote File Copy. Fills up network bandwidth while copying many files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
mkdir -p ~/bin/ | |
curl -s -L https://gist.github.com/raw/4477190/pmrcp.rb > ~/bin/pmrcp | |
chmod +x ~/bin/pmrcp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
START = Time.now | |
BATCH_SIZE = (ENV['B'] || 20).to_i | |
CONCURRENCY = (ENV['C'] || 15).to_i | |
RSH = ENV['RSH'] || 'rsh' | |
RCP = ENV['RCP'] || 'rcp' | |
require 'thread' | |
require 'pp' | |
class Pool | |
attr_accessor :pool, :queue, :queue_limit | |
def initialize(size, limit) | |
@queue, @queue_limit = Queue.new, limit | |
@pool = Array.new(size) do |i| | |
spawn(i) | |
end | |
end | |
def size | |
pool.select{|t| t.alive?}.size | |
end | |
def inc(n = 1) | |
n.times do | |
pool << spawn(pool.size) | |
end | |
end | |
def dec | |
pool.select{ |t| t.alive? }.first[:stop] = true | |
end | |
def schedule(*args, &block) | |
sleep 1 while(queue.size > queue_limit) | |
queue << [block, args] | |
end | |
def join | |
sleep 1 while !queue.empty? | |
shutdown | |
end | |
def shutdown | |
pool.map { |t| t[:stop] = true } | |
pool.select{ |t| t.alive? && t[:state] == :popping }.map { |t| t.kill } | |
pool.select{ |t| t.alive? }.map(&:join) | |
queue.instance_variable_get(:@waiting).clear | |
pool.clear | |
end | |
private | |
def spawn index | |
Thread.new do | |
t = Thread.current | |
t[:id] = index | |
t[:stop] = false | |
loop do | |
unless t[:stop] | |
begin | |
t[:state] = :popping | |
job, args = queue.pop | |
t[:state] = :processing | |
job.call(*args) | |
rescue => e | |
STDERR.puts "ERROR - #{Thread.current[:id]} - #{e.inspect}" | |
end | |
else | |
Thread.current.exit | |
end | |
end | |
end | |
end | |
end | |
puts "Starting with pid #{Process.pid}" | |
puts | |
pool = Pool.new 0, 1000 | |
# Setup signals to alter pool size | |
Signal.trap("TTIN") do | |
puts "Received signal TTIN: Adding another worker to the pool." | |
pool.inc | |
end | |
Signal.trap("TTOU") do | |
puts "Received signal TTOU: Removing a worker to the pool." | |
pool.dec | |
end | |
Signal.trap("USR1") do | |
pp pool | |
end | |
Signal.trap("STOP") do | |
pool.shutdown | |
end | |
Signal.trap("CONT") do | |
pool.inc CONCURRENCY | |
end | |
LOG_MUTEX = Mutex.new | |
def worker_log str, io = STDOUT | |
LOG_MUTEX.synchronize do | |
io.puts "WORKER[#{Thread.current[:id]}] - #{str}" | |
end | |
end | |
src_dir = File.expand_path(ARGV[0] )|| raise("Need a source directory") | |
dst_hst, dst_dir = ARGV[1].split(':') | |
dst_hst || raise("Need a destination host") | |
dst_dir || raise("Need a destination directory") | |
puts "--- Copying Folder Structure ---" | |
src_folders = Dir["#{src_dir}/**/**"].select{ |f| File.directory? f }.unshift(src_dir) | |
dst_folders = src_folders.map{ |d| d.gsub(src_dir, dst_dir)} | |
cmd = "#{RSH} #{dst_hst} 'mkdir -p #{dst_folders.join(' ')}'" | |
puts cmd | |
system(cmd) || raise("failed to create folder structure") | |
puts | |
src_folders.select! { |f| f !~ /#{ENV['EXCLUDE']}/} if ENV['EXCLUDE'] | |
puts "--- Scheduling File Transfers ---" | |
src_folders.each do |src_folder| | |
src_files = Dir.entries(src_folder).select{ |f| !File.directory?("#{src_folder}/#{f}") } | |
src_files.map!{ |f| "#{src_folder}/#{f}" } | |
src_files.each_slice(BATCH_SIZE) do |batch| | |
dst_folder = src_folder.gsub(src_dir, dst_dir) | |
puts "Scheduling #{src_folder} => #{dst_folder} - #{batch.size}" | |
pool.schedule(batch, dst_folder) do |file_batch, remote_folder| | |
sleep rand(CONCURRENCY) | |
s = Time.now | |
file_batch.select!{ |f| File.exist? f} | |
worker_log "Starting Batch: Batch Size: #{batch.size} \t Time: #{s} \t Queue Size: #{pool.queue.size}" | |
unless file_batch.empty? | |
cmd = "#{RCP} #{file_batch.join(' ')} #{dst_hst}:#{remote_folder}" | |
worker_log cmd | |
unless system cmd | |
worker_log "ERROR: #{cmd}" | |
worker_log "ERROR: #{cmd}", STDERR | |
pool.schedule file_batch, remote_folder | |
end | |
end | |
worker_log "Finished batch in: #{Time.now - s}" | |
end | |
end | |
end | |
puts | |
puts "Number of RCP jobs: #{pool.queue.size}" | |
puts | |
puts "--- Starting file transfer... ---" | |
puts | |
pool.inc CONCURRENCY | |
pool.join | |
puts "--- Finished file transfer ---" | |
DONE = Time.now | |
puts "Started: #{START}" | |
puts "Finished: #{DONE}" | |
puts "It took: #{(DONE - START)/3600}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment