Skip to content

Instantly share code, notes, and snippets.

@freerobby
Last active December 11, 2015 22:28
Show Gist options
  • Save freerobby/4669321 to your computer and use it in GitHub Desktop.
Save freerobby/4669321 to your computer and use it in GitHub Desktop.
# First split the keys into a bunch of files:
# ubuntu@rails01:/mnt/links$ split -l 100000 -d -a 4 links_dump.dat
current_file = 4
def get_filenames(base_path, num_files) # Remember, num_files= last file # + 1
filenames = []
num_files.times do |num|
num = num.to_s
while num.length < 4 do
num = "0#{num}"
end
filenames << "#{base_path}/x#{num}"
end
filenames
end
filepaths = get_filenames('/mnt/links', 1091)
def get_keys_from_file(path)
_keys = []
File.open(path, 'r').each_line do |key|
_keys << key.strip
end
_keys
end
keys = get_keys_from_file(filepaths[current_file]); nil
def run_mr_repair(keyset, start, stop)
mr = Riak::MapReduce.new(Ripple.client).timeout(1000 * 60 * 30)
# timeout = ((stop - start + 1) * 1000)
# puts "Setting timeout to #{timeout/1000} seconds for MR job."
# mr.timeout(timeout)
keyset[start..stop].each do |k|
mr.add('links', k)
end
mr.map(['rs_reindex2', 'map'], :language => 'erlang')
# puts mr.inspect
mr.run
end
def repair_keys_batch(keyset, num, start)
started_at = Time.now.to_i
stop = start + num - 1
puts "Repairing keys #{start} through #{stop}"
run_mr_repair(keyset, start, stop)
puts "Repaired #{num} keys in #{Time.now.to_i - started_at} seconds (#{num/(Time.now.to_i - started_at)}) keys/sec."
end
def queue_mr_repairs(keyset, batch_size = 100, start_at = 0)
failures = []
batches = keyset.count / batch_size
batches += 1 if keyset.count % batch_size > 0
puts "Running repair on #{keyset.count} keys over #{batches} batches, beginning at time=#{Time.now.to_i}..."
batches.times do |batch|
next if batch < start_at
start = batch * batch_size
stop = (batch + 1) * batch_size - 1
num = stop - start + 1
started_at = Time.now.to_i
puts "Beginning batch ##{batch} (start=#{start}, stop=#{stop}, current time=#{started_at})"
begin
run_mr_repair(keyset, start, stop)
rescue Riak::MapReduceError => e
puts "FAILED between #{start} and #{stop}"
puts "Read-Repairing Keys..."
keyset[start..stop].each_with_index do |k, i|
Link.find(k)
puts "Done with #{i + 1}" if (i + 1) % 100 == 0
end
puts "Retrying batch..."
begin
run_mr_repair(keyset, start, stop)
rescue Riak::MapReduceError => e2
failures << {:start => start, :stop => stop, :error => e}
puts "PERMANENTLY FAILED FROM #{start} to #{stop}"
end
end
puts "Repaired #{num} keys in #{Time.now.to_i - started_at} seconds (#{num/(Time.now.to_i - started_at)}) keys/sec."
end
puts "Repair completed at time=#{Time.now.to_i}"
failures
end
def perform_all_repairs(start_at_file = 0, max_files = 10000)
files = get_filenames('/mnt/links', 1091).dup
start_at_file.times do |t|
files.delete_at 0
end
failures = {}
done_files = 0
files.each do |filename|
done_files += 1
next if done_files > max_files
keys = get_keys_from_file(filename)
failures[filename] = queue_mr_repairs(keys, 1500)
end
failures
end
perform_all_repairs(7, 7)
count = 0
keys[5000..5999].each do |k|
count += 1
puts "Done with #{count}" if count % 100 == 0
Link.find(k)
nil
end
repair_keys_batch(keys, 1000, 1000)
repair_keys_batch(keys, 1000, 2000)
repair_keys_batch(keys, 1000, 3000)
repair_keys_batch(keys, 1000, 4000)
repair_keys_batch(keys, 1000, 5000)
repair_keys_batch(keys, 1000, 6000)
repair_keys_batch(keys, 1000, 7000)
repair_keys_batch(keys, 1000, 8000)
repair_keys_batch(keys, 1000, 9000)
repair_keys_batch(keys, 1000, 10000)
repair_keys_batch(keys, 1000, 11000)
repair_keys_batch(keys, 1000, 12000)
repair_keys_batch(keys, 1000, 13000)
repair_keys_batch(keys, 1000, 14000)
repair_keys_batch(keys, 1000, 15000)
repair_keys_batch(keys, 1000, 16000)
repair_keys_batch(keys, 1000, 17000)
repair_keys_batch(keys, 1000, 18000)
repair_keys_batch(keys, 1000, 19000)
repair_keys_batch(keys, 1000, 20000)
repair_keys_batch(keys, 1000, 21000)
repair_keys_batch(keys, 1000, 22000)
repair_keys_batch(keys, 1000, 23000)
repair_keys_batch(keys, 1000, 24000)
repair_keys_batch(keys, 1000, 25000)
repair_keys_batch(keys, 1000, 26000)
repair_keys_batch(keys, 1000, 27000)
repair_keys_batch(keys, 1000, 28000)
repair_keys_batch(keys, 1000, 29000)
repair_keys_batch(keys, 1000, 30000)
repair_keys_batch(keys, 1000, 31000)
repair_keys_batch(keys, 1000, 32000)
repair_keys_batch(keys, 1000, 33000)
repair_keys_batch(keys, 1000, 34000)
repair_keys_batch(keys, 1000, 35000)
repair_keys_batch(keys, 1000, 36000)
repair_keys_batch(keys, 1000, 37000)
repair_keys_batch(keys, 1000, 38000)
repair_keys_batch(keys, 1000, 39000)
repair_keys_batch(keys, 1000, 40000)
repair_keys_batch(keys, 1000, 41000)
repair_keys_batch(keys, 1000, 42000)
repair_keys_batch(keys, 1000, 43000)
repair_keys_batch(keys, 1000, 44000)
repair_keys_batch(keys, 1000, 45000)
repair_keys_batch(keys, 1000, 46000)
repair_keys_batch(keys, 1000, 47000)
repair_keys_batch(keys, 1000, 48000)
repair_keys_batch(keys, 1000, 49000)
repair_keys_batch(keys, 1000, 50000)
repair_keys_batch(keys, 1000, 51000)
repair_keys_batch(keys, 1000, 52000)
repair_keys_batch(keys, 1000, 53000)
repair_keys_batch(keys, 1000, 54000)
repair_keys_batch(keys, 1000, 55000)
repair_keys_batch(keys, 1000, 56000)
repair_keys_batch(keys, 1000, 57000)
repair_keys_batch(keys, 1000, 58000)
repair_keys_batch(keys, 1000, 59000)
repair_keys_batch(keys, 1000, 60000)
repair_keys_batch(keys, 1000, 61000)
repair_keys_batch(keys, 1000, 62000)
repair_keys_batch(keys, 1000, 63000)
repair_keys_batch(keys, 1000, 64000)
repair_keys_batch(keys, 1000, 65000)
repair_keys_batch(keys, 1000, 66000)
repair_keys_batch(keys, 1000, 67000)
repair_keys_batch(keys, 1000, 68000)
repair_keys_batch(keys, 1000, 69000)
repair_keys_batch(keys, 1000, 70000)
repair_keys_batch(keys, 1000, 71000)
# STart here
repair_keys_batch(keys, 1000, 72000)
repair_keys_batch(keys, 1000, 73000)
repair_keys_batch(keys, 1000, 74000)
repair_keys_batch(keys, 1000, 75000)
repair_keys_batch(keys, 1000, 76000)
repair_keys_batch(keys, 1000, 77000)
repair_keys_batch(keys, 1000, 78000)
repair_keys_batch(keys, 1000, 79000)
repair_keys_batch(keys, 1000, 80000)
repair_keys_batch(keys, 1000, 81000)
repair_keys_batch(keys, 1000, 82000)
repair_keys_batch(keys, 1000, 83000)
repair_keys_batch(keys, 1000, 84000)
repair_keys_batch(keys, 1000, 85000)
repair_keys_batch(keys, 1000, 86000)
repair_keys_batch(keys, 1000, 87000)
repair_keys_batch(keys, 1000, 88000)
repair_keys_batch(keys, 1000, 89000)
repair_keys_batch(keys, 1000, 90000)
repair_keys_batch(keys, 1000, 91000)
repair_keys_batch(keys, 1000, 92000)
repair_keys_batch(keys, 1000, 93000)
repair_keys_batch(keys, 1000, 94000)
repair_keys_batch(keys, 1000, 95000)
repair_keys_batch(keys, 1000, 96000)
repair_keys_batch(keys, 1000, 97000)
repair_keys_batch(keys, 1000, 98000)
repair_keys_batch(keys, 1000, 99000)
repair_keys_batch(keys, 1000, 100000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment