Instantly share code, notes, and snippets.
Last active
December 11, 2015 22:28
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save freerobby/4669321 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# First split the keys into a bunch of files: | |
# ubuntu@rails01:/mnt/links$ split -l 100000 -d -a 4 links_dump.dat | |
current_file = 4 | |
def get_filenames(base_path, num_files) # Remember, num_files= last file # + 1 | |
filenames = [] | |
num_files.times do |num| | |
num = num.to_s | |
while num.length < 4 do | |
num = "0#{num}" | |
end | |
filenames << "#{base_path}/x#{num}" | |
end | |
filenames | |
end | |
filepaths = get_filenames('/mnt/links', 1091) | |
def get_keys_from_file(path) | |
_keys = [] | |
File.open(path, 'r').each_line do |key| | |
_keys << key.strip | |
end | |
_keys | |
end | |
keys = get_keys_from_file(filepaths[current_file]); nil | |
def run_mr_repair(keyset, start, stop) | |
mr = Riak::MapReduce.new(Ripple.client).timeout(1000 * 60 * 30) | |
# timeout = ((stop - start + 1) * 1000) | |
# puts "Setting timeout to #{timeout/1000} seconds for MR job." | |
# mr.timeout(timeout) | |
keyset[start..stop].each do |k| | |
mr.add('links', k) | |
end | |
mr.map(['rs_reindex2', 'map'], :language => 'erlang') | |
# puts mr.inspect | |
mr.run | |
end | |
def repair_keys_batch(keyset, num, start) | |
started_at = Time.now.to_i | |
stop = start + num - 1 | |
puts "Repairing keys #{start} through #{stop}" | |
run_mr_repair(keyset, start, stop) | |
puts "Repaired #{num} keys in #{Time.now.to_i - started_at} seconds (#{num/(Time.now.to_i - started_at)}) keys/sec." | |
end | |
def queue_mr_repairs(keyset, batch_size = 100, start_at = 0) | |
failures = [] | |
batches = keyset.count / batch_size | |
batches += 1 if keyset.count % batch_size > 0 | |
puts "Running repair on #{keyset.count} keys over #{batches} batches, beginning at time=#{Time.now.to_i}..." | |
batches.times do |batch| | |
next if batch < start_at | |
start = batch * batch_size | |
stop = (batch + 1) * batch_size - 1 | |
num = stop - start + 1 | |
started_at = Time.now.to_i | |
puts "Beginning batch ##{batch} (start=#{start}, stop=#{stop}, current time=#{started_at})" | |
begin | |
run_mr_repair(keyset, start, stop) | |
rescue Riak::MapReduceError => e | |
puts "FAILED between #{start} and #{stop}" | |
puts "Read-Repairing Keys..." | |
keyset[start..stop].each_with_index do |k, i| | |
Link.find(k) | |
puts "Done with #{i + 1}" if (i + 1) % 100 == 0 | |
end | |
puts "Retrying batch..." | |
begin | |
run_mr_repair(keyset, start, stop) | |
rescue Riak::MapReduceError => e2 | |
failures << {:start => start, :stop => stop, :error => e} | |
puts "PERMANENTLY FAILED FROM #{start} to #{stop}" | |
end | |
end | |
puts "Repaired #{num} keys in #{Time.now.to_i - started_at} seconds (#{num/(Time.now.to_i - started_at)}) keys/sec." | |
end | |
puts "Repair completed at time=#{Time.now.to_i}" | |
failures | |
end | |
def perform_all_repairs(start_at_file = 0, max_files = 10000) | |
files = get_filenames('/mnt/links', 1091).dup | |
start_at_file.times do |t| | |
files.delete_at 0 | |
end | |
failures = {} | |
done_files = 0 | |
files.each do |filename| | |
done_files += 1 | |
next if done_files > max_files | |
keys = get_keys_from_file(filename) | |
failures[filename] = queue_mr_repairs(keys, 1500) | |
end | |
failures | |
end | |
perform_all_repairs(7, 7) | |
count = 0 | |
keys[5000..5999].each do |k| | |
count += 1 | |
puts "Done with #{count}" if count % 100 == 0 | |
Link.find(k) | |
nil | |
end | |
repair_keys_batch(keys, 1000, 1000) | |
repair_keys_batch(keys, 1000, 2000) | |
repair_keys_batch(keys, 1000, 3000) | |
repair_keys_batch(keys, 1000, 4000) | |
repair_keys_batch(keys, 1000, 5000) | |
repair_keys_batch(keys, 1000, 6000) | |
repair_keys_batch(keys, 1000, 7000) | |
repair_keys_batch(keys, 1000, 8000) | |
repair_keys_batch(keys, 1000, 9000) | |
repair_keys_batch(keys, 1000, 10000) | |
repair_keys_batch(keys, 1000, 11000) | |
repair_keys_batch(keys, 1000, 12000) | |
repair_keys_batch(keys, 1000, 13000) | |
repair_keys_batch(keys, 1000, 14000) | |
repair_keys_batch(keys, 1000, 15000) | |
repair_keys_batch(keys, 1000, 16000) | |
repair_keys_batch(keys, 1000, 17000) | |
repair_keys_batch(keys, 1000, 18000) | |
repair_keys_batch(keys, 1000, 19000) | |
repair_keys_batch(keys, 1000, 20000) | |
repair_keys_batch(keys, 1000, 21000) | |
repair_keys_batch(keys, 1000, 22000) | |
repair_keys_batch(keys, 1000, 23000) | |
repair_keys_batch(keys, 1000, 24000) | |
repair_keys_batch(keys, 1000, 25000) | |
repair_keys_batch(keys, 1000, 26000) | |
repair_keys_batch(keys, 1000, 27000) | |
repair_keys_batch(keys, 1000, 28000) | |
repair_keys_batch(keys, 1000, 29000) | |
repair_keys_batch(keys, 1000, 30000) | |
repair_keys_batch(keys, 1000, 31000) | |
repair_keys_batch(keys, 1000, 32000) | |
repair_keys_batch(keys, 1000, 33000) | |
repair_keys_batch(keys, 1000, 34000) | |
repair_keys_batch(keys, 1000, 35000) | |
repair_keys_batch(keys, 1000, 36000) | |
repair_keys_batch(keys, 1000, 37000) | |
repair_keys_batch(keys, 1000, 38000) | |
repair_keys_batch(keys, 1000, 39000) | |
repair_keys_batch(keys, 1000, 40000) | |
repair_keys_batch(keys, 1000, 41000) | |
repair_keys_batch(keys, 1000, 42000) | |
repair_keys_batch(keys, 1000, 43000) | |
repair_keys_batch(keys, 1000, 44000) | |
repair_keys_batch(keys, 1000, 45000) | |
repair_keys_batch(keys, 1000, 46000) | |
repair_keys_batch(keys, 1000, 47000) | |
repair_keys_batch(keys, 1000, 48000) | |
repair_keys_batch(keys, 1000, 49000) | |
repair_keys_batch(keys, 1000, 50000) | |
repair_keys_batch(keys, 1000, 51000) | |
repair_keys_batch(keys, 1000, 52000) | |
repair_keys_batch(keys, 1000, 53000) | |
repair_keys_batch(keys, 1000, 54000) | |
repair_keys_batch(keys, 1000, 55000) | |
repair_keys_batch(keys, 1000, 56000) | |
repair_keys_batch(keys, 1000, 57000) | |
repair_keys_batch(keys, 1000, 58000) | |
repair_keys_batch(keys, 1000, 59000) | |
repair_keys_batch(keys, 1000, 60000) | |
repair_keys_batch(keys, 1000, 61000) | |
repair_keys_batch(keys, 1000, 62000) | |
repair_keys_batch(keys, 1000, 63000) | |
repair_keys_batch(keys, 1000, 64000) | |
repair_keys_batch(keys, 1000, 65000) | |
repair_keys_batch(keys, 1000, 66000) | |
repair_keys_batch(keys, 1000, 67000) | |
repair_keys_batch(keys, 1000, 68000) | |
repair_keys_batch(keys, 1000, 69000) | |
repair_keys_batch(keys, 1000, 70000) | |
repair_keys_batch(keys, 1000, 71000) | |
# STart here | |
repair_keys_batch(keys, 1000, 72000) | |
repair_keys_batch(keys, 1000, 73000) | |
repair_keys_batch(keys, 1000, 74000) | |
repair_keys_batch(keys, 1000, 75000) | |
repair_keys_batch(keys, 1000, 76000) | |
repair_keys_batch(keys, 1000, 77000) | |
repair_keys_batch(keys, 1000, 78000) | |
repair_keys_batch(keys, 1000, 79000) | |
repair_keys_batch(keys, 1000, 80000) | |
repair_keys_batch(keys, 1000, 81000) | |
repair_keys_batch(keys, 1000, 82000) | |
repair_keys_batch(keys, 1000, 83000) | |
repair_keys_batch(keys, 1000, 84000) | |
repair_keys_batch(keys, 1000, 85000) | |
repair_keys_batch(keys, 1000, 86000) | |
repair_keys_batch(keys, 1000, 87000) | |
repair_keys_batch(keys, 1000, 88000) | |
repair_keys_batch(keys, 1000, 89000) | |
repair_keys_batch(keys, 1000, 90000) | |
repair_keys_batch(keys, 1000, 91000) | |
repair_keys_batch(keys, 1000, 92000) | |
repair_keys_batch(keys, 1000, 93000) | |
repair_keys_batch(keys, 1000, 94000) | |
repair_keys_batch(keys, 1000, 95000) | |
repair_keys_batch(keys, 1000, 96000) | |
repair_keys_batch(keys, 1000, 97000) | |
repair_keys_batch(keys, 1000, 98000) | |
repair_keys_batch(keys, 1000, 99000) | |
repair_keys_batch(keys, 1000, 100000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment