Created
July 10, 2013 08:24
-
-
Save bnagy/5964423 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ARGF.each_line {|line| | |
begin | |
warn "Starting work on #{line.chomp}" | |
# expect a line like this: | |
# s3://commoncrawl-crawl-002/2010/09/24/9/1285380159663_9.arc.gz | |
proto,unused,bucket_name,*rest=line.chomp.split File::SEPARATOR | |
raise ArgumentError, "#{__FILE__}: Unknown S3 Protocol #{proto}" unless proto=~/^s3/ | |
object_name=File.join rest | |
size=Integer( s3.buckets[bucket_name].objects[object_name].content_length ) | |
warn "Reading from #{bucket_name.inspect}, #{object_name.inspect}, size #{size}" | |
ranges=(0..size).each_slice( CHUNKSIZE ).map {|ary| (ary.first..ary.last)} | |
# Ruby GzipReader is unable to unzip these files, so | |
# use unix tools, written by people who know what they | |
# are doing. Also means we don't need to eat much RAM, | |
# because everything is streaming. | |
Open3.popen3( 'gunzip -c' ) {|sin,sout,serr,thr| | |
# Create an ArcFile instance which will receive gunzip's stdout | |
arcfile=ArcFile.new sout | |
Thread.new do | |
# Download chunks in the background and pipe them into gunzip | |
# as we receive them | |
ranges.each {|target_range| | |
retry_count=5 | |
begin | |
chunk=s3.buckets[bucket_name].objects[object_name].read( :range => target_range ) | |
rescue | |
raise $! if (retry_count-=1)<0 | |
warn "Error (#{$!}) downloading #{target_range}, retrying." | |
sleep 1 and retry | |
end | |
sin.write chunk | |
Thread.pass | |
} | |
sin.close # which will send an EOF to the ArcFile | |
end | |
# Now we have a lazy ArcFile that we can treat as an Enumerable. | |
arcfile.each {|header, body| | |
# mimetype and URL extension (but don't keep ? params to php urls etc) | |
puts( "#{header.split[3]}".ljust(25) << "#{File.extname( header.split.first ).split('?').first}".ljust(15) ) | |
} | |
} | |
rescue | |
warn "Failed to process #{line}: #{$!}" | |
end | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment