Skip to content

Instantly share code, notes, and snippets.

@aminin
Created July 25, 2013 13:02
Show Gist options
  • Save aminin/6079389 to your computer and use it in GitHub Desktop.
Save aminin/6079389 to your computer and use it in GitHub Desktop.
Multithread amazon s3 bucket copier + EventMachine patch for Fog::Connection
require './s3-bucket-copier'
credentials = {
:provider => "AWS",
:region => "eu-west-1",
:aws_access_key_id => "<KeyId>",
:aws_secret_access_key => "<KeySecret>"
}
connection = ::Fog::Storage.new(credentials)
bc = BucketCopier.new connection, :source_bucket, :target_bucket
bc.copy_bucket nil, 'photos/2013'
# encoding: utf-8
require 'rubygems'
require 'fog'
require 'yaml'
require 'em-synchrony'
require 'em-synchrony/em-http'
module Fog
class Connection
def initialize(url, persistent=false, params={})
Excon.defaults[:headers]['User-Agent'] ||= "fog/#{Fog::VERSION}"
@excon = Excon.new(url, params)
@persistent = persistent
end
alias_method :original_request, :request
# Behaves like original Fog::Connection#request
# Being called from Fiber inside EventMachine.run performs non-blocking request
def request(params, &block)
return original_request(params, &block) unless Module.const_defined?(:EM) && EM.reactor_running?
current_fiber = Fiber.current
url = "http://#{params[:host]}"
conn = EM::HttpRequest.new(url, connect_timeout: 10, inactivity_timeout: 30)
options = {
query: params[:query],
head: params[:headers],
path: params[:path]
}
http_client = conn.setup_request(params[:method].downcase.to_sym, options)
http_client.callback do |client|
if (parser = params.delete(:parser))
body = Nokogiri::XML::SAX::PushParser.new(parser)
body << client.response
end
response = Excon::Response.new(body: client.response)
if parser
begin
body.finish
response.body = parser.response
rescue => e
$stderr.puts "Error: #{e.message}"
$stderr.puts "Backtrace: #{e.backtrace}"
EM::HttpClient
$stderr.puts "EM::HttpClient#responce: #{client.response}"
end
end
current_fiber.resume response
end
http_client.errback do |client|
$stderr.puts "EM::HttpClient#error: #{client.error}"
$stderr.puts "EM::HttpClient#responce: #{client.response}"
current_fiber.resume nil
end
Fiber.yield nil
end
end
end
class BucketCopier
# @param connection [Fog::Storage::AWS::Real]
# @param from_bucket [String]
# @param to_bucket [String]
#
def initialize(connection, from_bucket, to_bucket)
@connection, @from_bucket, @to_bucket = connection, from_bucket, to_bucket
@fibers_limit = 50 # 50..500
@fibers_count = 0
@max_keys = 1000 # 1..1000
end
def copy_bucket(prefix, marker = '')
while marker
puts "Marker: #{marker}"
start_time = Time.now
source_index = index_bucket @from_bucket, prefix, marker
target_index = index_bucket @to_bucket, prefix, marker
end_time = Time.now
puts "Indexes got in #{end_time - start_time} s."
copy_index = source_index - target_index
start_time = Time.now
cnt = copy_index.count
copy_objects copy_index
end_time = Time.now
puts "Copied: #{cnt} objects in #{end_time - start_time} s."
marker = source_index.last ? next_marker(source_index, target_index) : nil
end
end
def next_marker(index1, index2)
return index1.last unless index2.last
return index2.last unless index1.last
[index1.last, index2.last].min
end
def index_bucket(bucket, prefix = nil, marker = '')
resp = @connection.get_bucket bucket, 'prefix' => prefix, 'marker' => marker, 'max-keys' => @max_keys
index = resp.body['Contents'].map do |f| f['Key'] end
index.push(nil) unless resp.body['IsTruncated']
index
end
def copy_objects(objects)
EM.synchrony do
while element = objects.shift do
next if element.empty?
@fibers_count += 1
ff = Fiber.new do
my_element = element.dup
resp = @connection.copy_object(@from_bucket, element, @to_bucket, element)
@fibers_count -=1
#puts "#{my_element} #{resp.data[:body]}"
end
ff.resume nil
EM::Synchrony.sleep(1) unless @fibers_count < @fibers_limit
end
while @fibers_count > 0 do
puts "Fibers count #{@fibers_count}"
EM::Synchrony.sleep(1)
end
EM.stop
end
end
end
@akostrikov
Copy link

Привет! Рубистом нет желания поработать?

@aminin
Copy link
Author

aminin commented Dec 14, 2013

@akostrikov , ну ты нашёл где спросить — я только щас прочитал. Четыре месяца спустя.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment