Skip to content

Instantly share code, notes, and snippets.

@belano
Created November 26, 2015 16:07
Show Gist options
  • Save belano/63f7d1332829ddef082d to your computer and use it in GitHub Desktop.
Save belano/63f7d1332829ddef082d to your computer and use it in GitHub Desktop.
Elasticsearch reindex data with zero downtime
#!/usr/bin/env python
'''
Lots taken from http://codereview.stackexchange.com/q/56979
'''
from argparse import ArgumentParser, FileType
import requests
import json
import os, sys
class ElasticSearch():
def __init__(self, url):
self.url = url
def request(self, method, path, data=None):
url = 'http://%s/%s' % (self.url, path)
headers = {'Content-type': 'application/json'}
try:
return requests.request(method, url, data=data, headers=headers)
except requests.exceptions.RequestException as e:
print(e)
sys.exit(1)
def post(self, path, data):
return self.request('post', path, data)
def get(self, path, data=None):
return self.request('get', path, data)
def put(self, path, data=None):
return self.request('put', path, data)
def head(self, path, data=None):
return self.request('head', path, data)
def delete(self, path, data=None):
return self.request('delete', path, data)
def drop_index(self, index):
return self.delete(index)
def set_template(self, template_name, template):
return self.post('_template/%s' % template_name, data=json.dumps(template))
def alias(self, index, to):
return self.put('%s/_alias/%s' % (index, to))
def drop_alias(self, index, alias):
return self.delete('%s/_alias/%s' % (index, alias))
def refresh(self, index):
return self.post('%s/_refresh' % index, None)
def scan_and_scroll(self, index):
response = self.get('%s/_search?search_type=scan&scroll=1m' % index,
data=json.dumps({"query": {"match_all": {}},
"size": 100})).json()
while True:
response = self.get('_search/scroll?scroll=1m', data=response['_scroll_id']).json()
if len(response['hits']['hits']) == 0:
return
yield response['hits']['hits']
def bulk_insert(self, index, bulk):
return self.post('_bulk',
data=''.join(
json.dumps({'create': {'_index': index,
'_type': line['_type']}}) +
"\n" +
json.dumps(line['_source']) + "\n" for line in bulk))
def count(self, index):
response = self.get('%s/_count' % index, data=json.dumps({"query":{"match_all":{}}})).json()
return response['count'] if 'count' in response else 0
def add_alias(es, index, to):
response = es.alias(index, to)
print(response.text)
def reindex(es, alias, old_index, new_index):
'''
First, create a new index, appending a version or timestamp to the name
'''
print('create new index')
response = es.put(new_index)
print(response.text)
'''
Reindex data from old_index to the new new_index
'''
print('reindex data')
for bulk in es.scan_and_scroll(old_index):
response = es.bulk_insert(new_index, bulk)
print(response.text)
print ("\nDone")
es.refresh(new_index)
old_index_count = es.count(old_index)
new_index_count = es.count(new_index)
print ("old_index_count is %d, new_index_count is %d" % (old_index_count, new_index_count))
if new_index_count == old_index_count:
print ("OK, same number of docs in both indices.")
'''
Change the index alias to point to the new index, in a single atomic step
'''
print('update alias')
payload = json.dumps({"actions":[{"remove":{"alias":alias,"index":old_index}},{"add":{"alias":alias,"index":new_index}}]})
response = es.post('_aliases', data=payload)
print(response.text)
'''
Finally, delete the old index
'''
print('delete old index')
response = es.drop_index(old_index)
print(response.text)
def set_template(es, template_file, template_name):
'''
Remove current template if exists
'''
print('checking if template already exists')
response = es.head('_template/%s' % template_name)
if (response.status_code == requests.codes.ok):
print('template exists, proceeding to delete')
response = es.delete('_template/%s' % template_name)
print(response.text)
'''
Add the template
'''
print('adding template')
template_text = template_file.read()
response = es.set_template(template_name, json.loads(template_text))
print(response.text)
'''
Verify template exists
'''
print('retrieving template')
response = es.get('_template/%s?pretty' % template_name)
print(response.text)
if __name__ == '__main__':
parser = ArgumentParser(description="ES configuration helper")
parser.add_argument('--elasticsearch', required=True, help='ES host')
parser.add_argument('--template',
help='Index template file, starts with {"template"...',
type=FileType('r'))
parser.add_argument('--template_name', help='template name')
parser.add_argument('--index', help='old index')
parser.add_argument('--new_index', help='new index')
parser.add_argument('--alias', help='alias')
parser.add_argument('--action', required=True, choices=['set_template', 'reindex', 'add_alias'])
args = parser.parse_args()
es = ElasticSearch(args.elasticsearch)
if args.action == 'set_template':
set_template(es, args.template, args.template_name)
elif args.action == 'reindex':
reindex(es, args.alias, args.index, args.new_index)
elif args.action == 'add_alias':
add_alias(es, args.index, args.alias)
else:
print('unknown action')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment