Skip to content

Instantly share code, notes, and snippets.

@haginara
Last active March 2, 2016 19:56
Show Gist options
  • Save haginara/941b7fd2f56493f9e399 to your computer and use it in GitHub Desktop.
Save haginara/941b7fd2f56493f9e399 to your computer and use it in GitHub Desktop.
Managing elasticsaerch index using python
import sys
import datetime
import json
import copy
import argparse
import logging
try:
from elasticsearch import Elasticsearch
except ImportError:
print("Please install elasticsearch-py first")
sys.exit(1)
#from elasticsearch_dsl import Search, Q
logging.basicConfig(filename="delete-es.log", level=logging.INFO)
logger = logging.getLogger(__name__)
formatter = logging.Formatter("%(asctime)-15s,%(levelname)s,%(message)s")
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
def create_arguments():
p = argparse.ArgumentParser()
p.add_argument("command", type=str, nargs="+", metavar='Command')
p.add_argument("--host", default="127.0.0.1", required=False, dest='host')
p.add_argument("-i", "--index", default="logstash", required=False, dest='index')
p.add_argument("-o", "--older-than", type=int, default=90, dest='older_than', required=False)
return p.parse_args()
def delete_indices(client, index, older_than):
#url = "http://10.8.248.25:9200/"
#indices = sorted(client.indices.stats()['indices'].keys())
indices = sorted(client.cluster.state()['metadata']['indices'].keys())
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than)
(index_prefix, date_format) = index.split('-')
target_index = index_prefix + "-"+ target_dt.strftime(date_format)
logger.info("Target Index: {}".format(target_index))
for index in indices:
try:
(prefix, date) = index.split('-')
if prefix == index_prefix:
dt = datetime.datetime.strptime(date, date_format)
if dt < target_dt and older_than < 0 :
logger.info("Delete {} < {}".format(index, target_index))
client.indices.delete(index=index)
elif dt > target_dt and older_than > 0 :
logger.info("Delete {} > {}".format(index, target_index))
client.indices.delete(index=index)
except ValueError, e:
continue
except Exception as e:
logger.error(str(e), exc_info=True)
def close_indices(client, index, older_than):
#indices = sorted(client.indices.stats()['indices'].keys())
indices = sorted(client.cluster.state()['metadata']['indices'].keys())
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than)
(index_prefix, date_format) = index.split('-')
target_index = index_prefix + "-"+ target_dt.strftime(date_format)
logger.info("Target Index: {}".format(target_index))
for index in indices:
try:
(prefix, date) = index.split('-')
if prefix == index_prefix:
dt = datetime.datetime.strptime(date, date_format)
if dt < target_dt and older_than < 0 :
logger.info("Close {} < {}".format(index, target_index))
client.indices.close(index=index)
elif dt > target_dt and older_than > 0 :
logger.info("Close {} > {}".format(index, target_index))
client.indices.close(index=index)
except ValueError as e:
continue
except Exception as e:
logger.error(str(e), exc_info=True)
def config_indices(client, index, older_than, config_string):
"""
"""
indices = sorted(client.indices.stats()['indices'].keys())
#indices = sorted(client.cluster.state()['metadata']['indices'].keys())
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than)
(index_prefix, date_format) = index.split('-')
target_index = index_prefix + "-"+ target_dt.strftime(date_format)
logger.info("Target Index: {}".format(target_index))
for index in indices:
try:
(prefix, date) = index.split('-')
if prefix == index_prefix:
dt = datetime.datetime.strptime(date, date_format)
if (dt < target_dt and older_than < 0) or (dt > target_dt and older_than > 0) :
setting = client.indices.get_settings(index=index)[index]['settings']
updated = False
for parent in config_string:
for key in config_string[parent]:
if int(setting[parent][key]) != int(config_string[parent][key]):
updated= True
continue
if updated:
logger.info("Change the config {} < {}, {}".format(index, target_index, updated))
client.indices.put_settings(index=index, body=config_string)
else:
logger.info("No changes, %s",index)
except ValueError as e:
logger.debug("%s: %s"%(str(e), index))
continue
except Exception as e:
logger.error(str(e), exc_info=True)
def list_indices(client):
indices = sorted(client.indices.stats()['indices'].keys())
for index in indices:
logger.info(index)
def bulk_indices(clinet, index, path):
""" bulk indexing the data from the tail of path
"""
pass
if __name__ =='__main__':
import __main__
p = create_arguments()
command = p.command[0]
host = p.host
index = p.index + "-%Y.%m.%d"
older_than = -(p.older_than)
client = Elasticsearch(host=host)
commands = [ "delete", "list", "close", "config" ]
options = {}
options["client"] = client
options["older_tahn"] = older_than
options["index"] = index
#print getattr(__main__, command+"_indices")(**options)
if command in commands:
if command == 'delete':
delete_indices(client, index, older_than)
elif command == 'list':
list_indices(client)
elif command == 'close':
close_indices(client, index, older_than)
elif command == 'config':
config_string = p.command[1]
print( "index: {}, older_than: {}, config_string: {}".format(
index, older_than, config_string
))
config_indices(client, index, older_than, json.loads(config_string))
else:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment