Created
November 26, 2015 16:07
-
-
Save belano/63f7d1332829ddef082d to your computer and use it in GitHub Desktop.
Elasticsearch reindex data with zero downtime
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Lots taken from http://codereview.stackexchange.com/q/56979 | |
''' | |
from argparse import ArgumentParser, FileType | |
import requests | |
import json | |
import os, sys | |
class ElasticSearch(): | |
def __init__(self, url): | |
self.url = url | |
def request(self, method, path, data=None): | |
url = 'http://%s/%s' % (self.url, path) | |
headers = {'Content-type': 'application/json'} | |
try: | |
return requests.request(method, url, data=data, headers=headers) | |
except requests.exceptions.RequestException as e: | |
print(e) | |
sys.exit(1) | |
def post(self, path, data): | |
return self.request('post', path, data) | |
def get(self, path, data=None): | |
return self.request('get', path, data) | |
def put(self, path, data=None): | |
return self.request('put', path, data) | |
def head(self, path, data=None): | |
return self.request('head', path, data) | |
def delete(self, path, data=None): | |
return self.request('delete', path, data) | |
def drop_index(self, index): | |
return self.delete(index) | |
def set_template(self, template_name, template): | |
return self.post('_template/%s' % template_name, data=json.dumps(template)) | |
def alias(self, index, to): | |
return self.put('%s/_alias/%s' % (index, to)) | |
def drop_alias(self, index, alias): | |
return self.delete('%s/_alias/%s' % (index, alias)) | |
def refresh(self, index): | |
return self.post('%s/_refresh' % index, None) | |
def scan_and_scroll(self, index): | |
response = self.get('%s/_search?search_type=scan&scroll=1m' % index, | |
data=json.dumps({"query": {"match_all": {}}, | |
"size": 100})).json() | |
while True: | |
response = self.get('_search/scroll?scroll=1m', data=response['_scroll_id']).json() | |
if len(response['hits']['hits']) == 0: | |
return | |
yield response['hits']['hits'] | |
def bulk_insert(self, index, bulk): | |
return self.post('_bulk', | |
data=''.join( | |
json.dumps({'create': {'_index': index, | |
'_type': line['_type']}}) + | |
"\n" + | |
json.dumps(line['_source']) + "\n" for line in bulk)) | |
def count(self, index): | |
response = self.get('%s/_count' % index, data=json.dumps({"query":{"match_all":{}}})).json() | |
return response['count'] if 'count' in response else 0 | |
def add_alias(es, index, to): | |
response = es.alias(index, to) | |
print(response.text) | |
def reindex(es, alias, old_index, new_index): | |
''' | |
First, create a new index, appending a version or timestamp to the name | |
''' | |
print('create new index') | |
response = es.put(new_index) | |
print(response.text) | |
''' | |
Reindex data from old_index to the new new_index | |
''' | |
print('reindex data') | |
for bulk in es.scan_and_scroll(old_index): | |
response = es.bulk_insert(new_index, bulk) | |
print(response.text) | |
print ("\nDone") | |
es.refresh(new_index) | |
old_index_count = es.count(old_index) | |
new_index_count = es.count(new_index) | |
print ("old_index_count is %d, new_index_count is %d" % (old_index_count, new_index_count)) | |
if new_index_count == old_index_count: | |
print ("OK, same number of docs in both indices.") | |
''' | |
Change the index alias to point to the new index, in a single atomic step | |
''' | |
print('update alias') | |
payload = json.dumps({"actions":[{"remove":{"alias":alias,"index":old_index}},{"add":{"alias":alias,"index":new_index}}]}) | |
response = es.post('_aliases', data=payload) | |
print(response.text) | |
''' | |
Finally, delete the old index | |
''' | |
print('delete old index') | |
response = es.drop_index(old_index) | |
print(response.text) | |
def set_template(es, template_file, template_name): | |
''' | |
Remove current template if exists | |
''' | |
print('checking if template already exists') | |
response = es.head('_template/%s' % template_name) | |
if (response.status_code == requests.codes.ok): | |
print('template exists, proceeding to delete') | |
response = es.delete('_template/%s' % template_name) | |
print(response.text) | |
''' | |
Add the template | |
''' | |
print('adding template') | |
template_text = template_file.read() | |
response = es.set_template(template_name, json.loads(template_text)) | |
print(response.text) | |
''' | |
Verify template exists | |
''' | |
print('retrieving template') | |
response = es.get('_template/%s?pretty' % template_name) | |
print(response.text) | |
if __name__ == '__main__': | |
parser = ArgumentParser(description="ES configuration helper") | |
parser.add_argument('--elasticsearch', required=True, help='ES host') | |
parser.add_argument('--template', | |
help='Index template file, starts with {"template"...', | |
type=FileType('r')) | |
parser.add_argument('--template_name', help='template name') | |
parser.add_argument('--index', help='old index') | |
parser.add_argument('--new_index', help='new index') | |
parser.add_argument('--alias', help='alias') | |
parser.add_argument('--action', required=True, choices=['set_template', 'reindex', 'add_alias']) | |
args = parser.parse_args() | |
es = ElasticSearch(args.elasticsearch) | |
if args.action == 'set_template': | |
set_template(es, args.template, args.template_name) | |
elif args.action == 'reindex': | |
reindex(es, args.alias, args.index, args.new_index) | |
elif args.action == 'add_alias': | |
add_alias(es, args.index, args.alias) | |
else: | |
print('unknown action') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment