Created
December 10, 2018 09:56
-
-
Save koshigoe/b8940245b9146817a0761580bae4a182 to your computer and use it in GitHub Desktop.
分割並行 HTTP ダウンロードのプロトタイプ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'open-uri' | |
require 'uri' | |
require 'net/http' | |
require 'thread' | |
require 'tmpdir' | |
require 'benchmark' | |
class PGet | |
attr_reader :thread_count, :min_chunk_size | |
def initialize(thread_count: 10) | |
@thread_count = thread_count | |
@min_chunk_size = min_chunk_size | |
end | |
def download(url, destination) | |
uri = URI.parse(url) | |
if thread_count > 1 && ranges = compute_ranges(uri) | |
multithread_download(uri, destination, ranges) | |
else | |
singlethread_download(uri, destination) | |
end | |
end | |
private | |
def get_rangeable_length(uri) | |
response = Net::HTTP.start(uri.host, uri.port, use_ssl: uri.is_a?(URI::HTTPS)) do |http| | |
http.head(uri.path) | |
end | |
return nil if response['Accept-Ranges'] != 'bytes' | |
response['Content-Length']&.to_i | |
end | |
THRESHOLD = 100 * 1024 * 1024 | |
def compute_ranges(uri) | |
return nil unless length = get_rangeable_length(uri) | |
return nil if length < THRESHOLD | |
chunk_size = length / thread_count | |
ranges = [] | |
thread_count.times do |i| | |
offset = chunk_size * i | |
progress = offset + chunk_size - 1 | |
ranges << [offset, progress] | |
end | |
ranges[-1][1] = limit = length - 1 | |
ranges | |
end | |
BUFFER_SIZE = 8 * 1024 | |
def multithread_download(uri, destination, ranges) | |
Dir.mktmpdir do |dir| | |
threads = ranges.map.with_index { |range, i| make_thread(uri, File.join(dir, i.to_s), range) } | |
threads.each(&:join) | |
File.open(destination, 'wb') do |output| | |
ranges.size.times { |i| IO.copy_stream(File.join(dir, i.to_s), output) } | |
end | |
end | |
rescue Net::OpenTimeout, Net::ReadTimeout, SocketError | |
raise ConnectionError | |
end | |
def make_thread(uri, destination, range) | |
Thread.new(uri, destination, range) do |uri, destination, range| | |
options = { 'Range' => "bytes=#{range.join('-')}" } | |
open(uri, 'rb', options) do |input| | |
IO.copy_stream(input, destination) | |
end | |
end | |
end | |
def singlethread_download(uri, destination) | |
open(uri, 'rb') do |input| | |
IO.copy_stream(input, destination) | |
end | |
rescue Net::OpenTimeout, Net::ReadTimeout, SocketError | |
raise ConnectionError | |
end | |
end | |
Benchmark.bm(20) do |x| | |
x.report('singlethread') do | |
url = 'https://s3-ap-northeast-1.amazonaws.com/df-monkey-preview/testdata/base.csv.zip' | |
PGet.new(thread_count: 1).download(url, './single-base.csv.zip') | |
end | |
x.report('multithread[2]') do | |
url = 'https://s3-ap-northeast-1.amazonaws.com/df-monkey-preview/testdata/base.csv.zip' | |
PGet.new(thread_count: 2).download(url, './multi-2-base.csv.zip') | |
end | |
x.report('multithread[3]') do | |
url = 'https://s3-ap-northeast-1.amazonaws.com/df-monkey-preview/testdata/base.csv.zip' | |
PGet.new(thread_count: 3).download(url, './multi-3-base.csv.zip') | |
end | |
x.report('multithread[4]') do | |
url = 'https://s3-ap-northeast-1.amazonaws.com/df-monkey-preview/testdata/base.csv.zip' | |
PGet.new(thread_count: 4).download(url, './multi-4-base.csv.zip') | |
end | |
end | |
__END__ | |
user system total real | |
singlethread 2.587592 2.136217 4.723809 ( 17.326337) | |
multithread[2] 2.919772 3.159515 6.079287 ( 18.070033) | |
multithread[3] 2.988703 3.112600 6.101303 ( 17.859645) | |
multithread[4] 3.147903 3.286234 6.434137 ( 17.968444) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment