Skip to content

Instantly share code, notes, and snippets.

@FestivalBobcats
Last active December 31, 2015 01:19
Show Gist options
  • Save FestivalBobcats/7913652 to your computer and use it in GitHub Desktop.
Save FestivalBobcats/7913652 to your computer and use it in GitHub Desktop.
Migrating Elasticsearch index from one cluster to another.
require 'json'
require 'tempfile'
OLD_CLUSTER_ENDPOINT = '' # root endpoint
NEW_CLUSTER_ENDPOINT = '' # root endpoint
PER_PAGE = 3_000
def get_json(url)
JSON.parse(`curl -s '#{url}'`)
end
def put_json(url, body=nil)
cmd = "curl -XPUT -s '#{url}'"
if body
cmd += " -d '#{body.to_json}'"
end
JSON.parse(`#{cmd}`)
end
def post_bulk(url, filepath)
cmd = "curl -XPOST -s '#{url}' --data-binary @#{filepath}"
JSON.parse(`#{cmd}`)
end
indices = get_json("#{OLD_CLUSTER_ENDPOINT}/_aliases").keys
total_docs = get_json("#{OLD_CLUSTER_ENDPOINT}/_count")['count']
new_num_nodes = get_json("#{NEW_CLUSTER_ENDPOINT}/_cluster/health")['number_of_data_nodes']
new_num_shards = new_num_nodes
new_num_replicas = new_num_nodes - 1
total_reindexed = 0
indices.each do |index|
mapping = get_json("#{OLD_CLUSTER_ENDPOINT}/#{index}/_mapping")
types = mapping[index].keys
index_settings = {refresh_interval: '-1', number_of_shards: new_num_shards, number_of_replicas: 1}
# create new index
put_json("#{NEW_CLUSTER_ENDPOINT}/#{index}", index_settings)
types.each do |type|
# new mapping
put_json("#{NEW_CLUSTER_ENDPOINT}/#{index}/#{type}/_mapping", {type => mapping[index][type]})
end
scroll_id = nil
page = 1
total_pages = nil
while !total_pages || page <= total_pages
start_time = Time.now
print 'Fetching...'
from = (page - 1) * PER_PAGE
search_url = "#{OLD_CLUSTER_ENDPOINT}/#{index}/_search?scroll=5m&size=#{PER_PAGE}&from=#{from}"
search_url += "&scroll_id=#{scroll_id}" if scroll_id
scan_resp = get_json(search_url)
scroll_id = scan_resp['_scroll_id']
total_pages ||= (scan_resp['hits']['total'] / PER_PAGE.to_f).ceil
print 'Formatting...'
bulk = []
scan_resp['hits']['hits'].each do |doc|
bulk << {index: {_index: doc['_index'], _type: doc['_type'], _id: doc['_id']}}
bulk << doc['_source']
end
bulk = bulk.map(&:to_json).join("\n") + "\n"
# trying to aleviate RAM
scan_resp = nil
print 'Writing locally...'
bulk_tmp_file = Tempfile.new('bulk_tmp_file')
File.open(bulk_tmp_file.path, 'w') {|f| f.write(bulk) }
print 'Bulking...'
post_bulk("#{NEW_CLUSTER_ENDPOINT}/#{index}/_bulk", bulk_tmp_file.path)
puts 'Done.'
end_time = Time.now
docs_per_sec = (PER_PAGE / (end_time - start_time)).round(2)
indexed = get_json("#{NEW_CLUSTER_ENDPOINT}/_count")['count']
puts "Indexed #{indexed} / #{total_docs}, #{(indexed * 100 / total_docs.to_f).round(2)}%, #{docs_per_sec} docs per second"
page += 1
end
print "Updating replicas..."
put_json("#{NEW_CLUSTER_ENDPOINT}/#{index}/_settings", {number_of_replicas: new_num_replicas, refresh_interval: '10s'})
puts "Done."
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment