Created
July 25, 2013 13:02
-
-
Save aminin/6079389 to your computer and use it in GitHub Desktop.
Multithread amazon s3 bucket copier + EventMachine patch for Fog::Connection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require './s3-bucket-copier' | |
credentials = { | |
:provider => "AWS", | |
:region => "eu-west-1", | |
:aws_access_key_id => "<KeyId>", | |
:aws_secret_access_key => "<KeySecret>" | |
} | |
connection = ::Fog::Storage.new(credentials) | |
bc = BucketCopier.new connection, :source_bucket, :target_bucket | |
bc.copy_bucket nil, 'photos/2013' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
require 'rubygems' | |
require 'fog' | |
require 'yaml' | |
require 'em-synchrony' | |
require 'em-synchrony/em-http' | |
module Fog | |
class Connection | |
def initialize(url, persistent=false, params={}) | |
Excon.defaults[:headers]['User-Agent'] ||= "fog/#{Fog::VERSION}" | |
@excon = Excon.new(url, params) | |
@persistent = persistent | |
end | |
alias_method :original_request, :request | |
# Behaves like original Fog::Connection#request | |
# Being called from Fiber inside EventMachine.run performs non-blocking request | |
def request(params, &block) | |
return original_request(params, &block) unless Module.const_defined?(:EM) && EM.reactor_running? | |
current_fiber = Fiber.current | |
url = "http://#{params[:host]}" | |
conn = EM::HttpRequest.new(url, connect_timeout: 10, inactivity_timeout: 30) | |
options = { | |
query: params[:query], | |
head: params[:headers], | |
path: params[:path] | |
} | |
http_client = conn.setup_request(params[:method].downcase.to_sym, options) | |
http_client.callback do |client| | |
if (parser = params.delete(:parser)) | |
body = Nokogiri::XML::SAX::PushParser.new(parser) | |
body << client.response | |
end | |
response = Excon::Response.new(body: client.response) | |
if parser | |
begin | |
body.finish | |
response.body = parser.response | |
rescue => e | |
$stderr.puts "Error: #{e.message}" | |
$stderr.puts "Backtrace: #{e.backtrace}" | |
EM::HttpClient | |
$stderr.puts "EM::HttpClient#responce: #{client.response}" | |
end | |
end | |
current_fiber.resume response | |
end | |
http_client.errback do |client| | |
$stderr.puts "EM::HttpClient#error: #{client.error}" | |
$stderr.puts "EM::HttpClient#responce: #{client.response}" | |
current_fiber.resume nil | |
end | |
Fiber.yield nil | |
end | |
end | |
end | |
class BucketCopier | |
# @param connection [Fog::Storage::AWS::Real] | |
# @param from_bucket [String] | |
# @param to_bucket [String] | |
# | |
def initialize(connection, from_bucket, to_bucket) | |
@connection, @from_bucket, @to_bucket = connection, from_bucket, to_bucket | |
@fibers_limit = 50 # 50..500 | |
@fibers_count = 0 | |
@max_keys = 1000 # 1..1000 | |
end | |
def copy_bucket(prefix, marker = '') | |
while marker | |
puts "Marker: #{marker}" | |
start_time = Time.now | |
source_index = index_bucket @from_bucket, prefix, marker | |
target_index = index_bucket @to_bucket, prefix, marker | |
end_time = Time.now | |
puts "Indexes got in #{end_time - start_time} s." | |
copy_index = source_index - target_index | |
start_time = Time.now | |
cnt = copy_index.count | |
copy_objects copy_index | |
end_time = Time.now | |
puts "Copied: #{cnt} objects in #{end_time - start_time} s." | |
marker = source_index.last ? next_marker(source_index, target_index) : nil | |
end | |
end | |
def next_marker(index1, index2) | |
return index1.last unless index2.last | |
return index2.last unless index1.last | |
[index1.last, index2.last].min | |
end | |
def index_bucket(bucket, prefix = nil, marker = '') | |
resp = @connection.get_bucket bucket, 'prefix' => prefix, 'marker' => marker, 'max-keys' => @max_keys | |
index = resp.body['Contents'].map do |f| f['Key'] end | |
index.push(nil) unless resp.body['IsTruncated'] | |
index | |
end | |
def copy_objects(objects) | |
EM.synchrony do | |
while element = objects.shift do | |
next if element.empty? | |
@fibers_count += 1 | |
ff = Fiber.new do | |
my_element = element.dup | |
resp = @connection.copy_object(@from_bucket, element, @to_bucket, element) | |
@fibers_count -=1 | |
#puts "#{my_element} #{resp.data[:body]}" | |
end | |
ff.resume nil | |
EM::Synchrony.sleep(1) unless @fibers_count < @fibers_limit | |
end | |
while @fibers_count > 0 do | |
puts "Fibers count #{@fibers_count}" | |
EM::Synchrony.sleep(1) | |
end | |
EM.stop | |
end | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Привет! Рубистом нет желания поработать?