Last active
March 2, 2016 19:56
-
-
Save haginara/941b7fd2f56493f9e399 to your computer and use it in GitHub Desktop.
Managing elasticsaerch index using python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import datetime | |
import json | |
import copy | |
import argparse | |
import logging | |
try: | |
from elasticsearch import Elasticsearch | |
except ImportError: | |
print("Please install elasticsearch-py first") | |
sys.exit(1) | |
#from elasticsearch_dsl import Search, Q | |
logging.basicConfig(filename="delete-es.log", level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
formatter = logging.Formatter("%(asctime)-15s,%(levelname)s,%(message)s") | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.INFO) | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
def create_arguments(): | |
p = argparse.ArgumentParser() | |
p.add_argument("command", type=str, nargs="+", metavar='Command') | |
p.add_argument("--host", default="127.0.0.1", required=False, dest='host') | |
p.add_argument("-i", "--index", default="logstash", required=False, dest='index') | |
p.add_argument("-o", "--older-than", type=int, default=90, dest='older_than', required=False) | |
return p.parse_args() | |
def delete_indices(client, index, older_than): | |
#url = "http://10.8.248.25:9200/" | |
#indices = sorted(client.indices.stats()['indices'].keys()) | |
indices = sorted(client.cluster.state()['metadata']['indices'].keys()) | |
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than) | |
(index_prefix, date_format) = index.split('-') | |
target_index = index_prefix + "-"+ target_dt.strftime(date_format) | |
logger.info("Target Index: {}".format(target_index)) | |
for index in indices: | |
try: | |
(prefix, date) = index.split('-') | |
if prefix == index_prefix: | |
dt = datetime.datetime.strptime(date, date_format) | |
if dt < target_dt and older_than < 0 : | |
logger.info("Delete {} < {}".format(index, target_index)) | |
client.indices.delete(index=index) | |
elif dt > target_dt and older_than > 0 : | |
logger.info("Delete {} > {}".format(index, target_index)) | |
client.indices.delete(index=index) | |
except ValueError, e: | |
continue | |
except Exception as e: | |
logger.error(str(e), exc_info=True) | |
def close_indices(client, index, older_than): | |
#indices = sorted(client.indices.stats()['indices'].keys()) | |
indices = sorted(client.cluster.state()['metadata']['indices'].keys()) | |
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than) | |
(index_prefix, date_format) = index.split('-') | |
target_index = index_prefix + "-"+ target_dt.strftime(date_format) | |
logger.info("Target Index: {}".format(target_index)) | |
for index in indices: | |
try: | |
(prefix, date) = index.split('-') | |
if prefix == index_prefix: | |
dt = datetime.datetime.strptime(date, date_format) | |
if dt < target_dt and older_than < 0 : | |
logger.info("Close {} < {}".format(index, target_index)) | |
client.indices.close(index=index) | |
elif dt > target_dt and older_than > 0 : | |
logger.info("Close {} > {}".format(index, target_index)) | |
client.indices.close(index=index) | |
except ValueError as e: | |
continue | |
except Exception as e: | |
logger.error(str(e), exc_info=True) | |
def config_indices(client, index, older_than, config_string): | |
""" | |
""" | |
indices = sorted(client.indices.stats()['indices'].keys()) | |
#indices = sorted(client.cluster.state()['metadata']['indices'].keys()) | |
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than) | |
(index_prefix, date_format) = index.split('-') | |
target_index = index_prefix + "-"+ target_dt.strftime(date_format) | |
logger.info("Target Index: {}".format(target_index)) | |
for index in indices: | |
try: | |
(prefix, date) = index.split('-') | |
if prefix == index_prefix: | |
dt = datetime.datetime.strptime(date, date_format) | |
if (dt < target_dt and older_than < 0) or (dt > target_dt and older_than > 0) : | |
setting = client.indices.get_settings(index=index)[index]['settings'] | |
updated = False | |
for parent in config_string: | |
for key in config_string[parent]: | |
if int(setting[parent][key]) != int(config_string[parent][key]): | |
updated= True | |
continue | |
if updated: | |
logger.info("Change the config {} < {}, {}".format(index, target_index, updated)) | |
client.indices.put_settings(index=index, body=config_string) | |
else: | |
logger.info("No changes, %s",index) | |
except ValueError as e: | |
logger.debug("%s: %s"%(str(e), index)) | |
continue | |
except Exception as e: | |
logger.error(str(e), exc_info=True) | |
def list_indices(client): | |
indices = sorted(client.indices.stats()['indices'].keys()) | |
for index in indices: | |
logger.info(index) | |
def bulk_indices(clinet, index, path): | |
""" bulk indexing the data from the tail of path | |
""" | |
pass | |
if __name__ =='__main__': | |
import __main__ | |
p = create_arguments() | |
command = p.command[0] | |
host = p.host | |
index = p.index + "-%Y.%m.%d" | |
older_than = -(p.older_than) | |
client = Elasticsearch(host=host) | |
commands = [ "delete", "list", "close", "config" ] | |
options = {} | |
options["client"] = client | |
options["older_tahn"] = older_than | |
options["index"] = index | |
#print getattr(__main__, command+"_indices")(**options) | |
if command in commands: | |
if command == 'delete': | |
delete_indices(client, index, older_than) | |
elif command == 'list': | |
list_indices(client) | |
elif command == 'close': | |
close_indices(client, index, older_than) | |
elif command == 'config': | |
config_string = p.command[1] | |
print( "index: {}, older_than: {}, config_string: {}".format( | |
index, older_than, config_string | |
)) | |
config_indices(client, index, older_than, json.loads(config_string)) | |
else: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment