Last active
November 16, 2017 15:36
-
-
Save jorisdevrede/7d61c374882ba2416ababfc63b29a30f to your computer and use it in GitHub Desktop.
Elasticsearch optimization
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from argparse import ArgumentParser | |
from logbook import Logger, StreamHandler | |
import json | |
import requests | |
import sys | |
import uuid | |
StreamHandler(sys.stdout).push_application() | |
log = Logger(__name__) | |
class Connection: | |
"""Creates a connection to Elasticsearch""" | |
def __init__(self, baseurl): | |
"""Create a new connection to a master url | |
:param baseurl: url to an Elasticsearch master (e.g. http://esmaster:9200)""" | |
self._baseurl = baseurl | |
def alias_update(self, index, alias, operation='add'): | |
"""Add or remove an alias for an index | |
:param index: name of the index to create an alias for | |
:param alias: name of the alias | |
:param operation: either add or remove""" | |
data = {'actions': [ | |
{operation: {'index': index, 'alias': alias}} | |
]} | |
response = requests.post(self._baseurl + '/_aliases', data=json.dumps(data)) | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
def cluster_get_health(self, params): | |
"""Retrieves the health of a cluster""" | |
response = requests.get(self._baseurl + '/_cluster/health', params=params) | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
return response.json() | |
def cluster_get_stats(self): | |
"""Retrieves the statistics of a cluster""" | |
response = requests.get(self._baseurl + '/_cluster/stats') | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
return response.json() | |
def index_create(self, index, mappings=None, settings=None): | |
"""Creates a new index""" | |
data = {} | |
if mappings is not None: | |
data['mappings'] = mappings | |
if settings is not None: | |
data['settings'] = settings | |
response = requests.post(self._baseurl + '/' + index, data=json.dumps(data)) | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
def index_delete(self, index): | |
"""Deletes an existing index""" | |
response = requests.delete(self._baseurl + '/' + index) | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
def index_forcemerge(self, index): | |
"""Merges the segments of an index to 1""" | |
params = {'max_num_segments': 1} | |
response = requests.post(self._baseurl + '/' + index + '/_forcemerge', params=params) | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
def index_get_config(self, index): | |
"""Retrieves the config of an index""" | |
response = requests.get(self._baseurl + '/' + index) | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
return response.json() | |
def index_get_stats(self, index): | |
"""Retrieves the statistics of an index""" | |
response = requests.get(self._baseurl + '/' + index + '/_stats') | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
return response.json() | |
def index_reindex(self, index, new_index, shards=1): | |
"""Reindexes an index | |
Creates a new index with the same mapping and settings as the old index | |
except for shards and replicas, and reindexes the the data from the old | |
index into the new. | |
:param index: old index to reindex from | |
:param new_index: new index | |
:param shards: number of shards for the new index""" | |
index_config = self.index_get_config(index) | |
if requests.head(self._baseurl + '/' + new_index).status_code == 404: | |
c_mapping = index_config[index]['mappings'] | |
c_setting = {'index': { | |
'analysis': index_config[index]['settings']['index']['analysis']}} | |
self.index_create(new_index, c_mapping, c_setting) | |
data = {'source': {'index': index}, | |
'dest': {'index': new_index}} | |
log.info('Reindexing index {} to new_index {} with {} shard(s)'.format(index, new_index, shards)) | |
response = requests.post(self._baseurl + '/_reindex', data=json.dumps(data)) | |
log.debug('{} - {}'.format(response.status_code, response.text)) | |
class ESOptimization: | |
def __init__(self, baseurl): | |
self._connection = Connection(baseurl) | |
def index_replace(self, index, shards): | |
"""Creates a new index with another number of shards""" | |
# TODO: include an automatic snapshot | |
new_index = str(uuid.uuid4()) | |
self._connection.index_reindex(index, new_index, shards) | |
# TODO: make the timeout dependent on the index size | |
self._connection.cluster_get_health({'wait_for_status': 'green', 'timeout': '1h'}) | |
self._connection.index_delete(index) | |
self._connection.alias_update(new_index, index) | |
self._connection.index_forcemerge(new_index) | |
def index_optimize_shards(self, index): | |
index_config = self._connection.index_get_config(index) | |
index_shards = int(index_config[index]['settings']['index']['number_of_shards']) | |
index_stats = self._connection.index_get_stats(index) | |
index_primary_size = int(index_stats['_all']['primaries']['store']['size_in_bytes']) | |
size_2gb = 2147483648 | |
size_30gb = 32212254720 | |
size_45gb = 48318382080 | |
if index_primary_size < size_2gb and index_shards > 1: | |
log.info("Reducing index {} to 1 shard".format(index)) | |
self.index_replace(index, 1) | |
elif index_primary_size / index_shards > size_45gb: | |
number_of_shards = round(index_primary_size / size_30gb) | |
if index_shards < number_of_shards: | |
log.info("Increasing index {} to {} shards".format(index, number_of_shards)) | |
self.index_replace(index, number_of_shards) | |
if __name__ == '__main__': | |
parser = ArgumentParser(description="Optimize the sharding of an Elasticsearch index") | |
parser.add_argument('-i', '--index', dest='index') | |
parser.add_argument('-u', '--url', dest='url') | |
args = parser.parse_args() | |
esop = ESOptimization(args.url) | |
esop.index_optimize_shards(args.index) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment