Skip to content

Instantly share code, notes, and snippets.

@haginara
Created July 4, 2017 01:46
Show Gist options
  • Save haginara/9f59143ac90f568c9c690c42561d9dcf to your computer and use it in GitHub Desktop.
Save haginara/9f59143ac90f568c9c690c42561d9dcf to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys
import datetime
import json
import argparse
import logging
from collections import namedtuple
try:
from elasticsearch import Elasticsearch
from elasticsearch import RequestsHttpConnection
except ImportError as e:
print("Please install elasticsearch-py first, %s" % e)
sys.exit(1)
#from elasticsearch_dsl import Search, Q
__VERSION__ = (1, 0, 0)
logging.basicConfig(filename="delete-es.log", level=logging.INFO)
logger = logging.getLogger(__name__)
formatter = logging.Formatter("%(asctime)-15s,%(levelname)s,%(message)s")
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
choices = ('delete', 'list', 'close', 'config', 'info')
def create_arguments():
p = argparse.ArgumentParser()
p.add_argument("command", type=str, choices=choices, help="Choose one of the action")
p.add_argument("--host", default="127.0.0.1", required=False, dest='host')
p.add_argument("-i", "--index", default="logstash", required=False)
p.add_argument("-t", "--time_format", default="%Y.%m.%d", required=False)
p.add_argument("-o", "--older-than", type=int, default=90, dest='older_than', required=False)
#p.add_argument("-v", "--version", type='version')
return p.parse_args()
def delete_indices(client, index, older_than):
indices = sorted(client.cluster.state()['metadata']['indices'].keys())
#logger.info("Indices: %", indices)
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than)
(index_prefix, date_format) = index.split('-')
target_index = index_prefix + "-" + target_dt.strftime(date_format)
logger.info("Target Index: {}".format(target_index))
for index in indices:
try:
(prefix, date) = index.split('-')
if prefix == index_prefix:
dt = datetime.datetime.strptime(date, date_format)
if dt < target_dt and older_than < 0:
logger.info("Delete {} < {}".format(index, target_index))
client.indices.delete(index=index)
elif dt > target_dt and older_than > 0:
logger.info("Delete {} > {}".format(index, target_index))
client.indices.delete(index=index)
except ValueError, e:
continue
except Exception as e:
logger.error(str(e), exc_info=True)
def close_indices(client, index, older_than):
#indices = sorted(client.indices.stats()['indices'].keys())
indices = sorted(client.cluster.state()['metadata']['indices'].keys())
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than)
(index_prefix, date_format) = index.split('-')
target_index = index_prefix + "-" + target_dt.strftime(date_format)
logger.info("Target Index: {}".format(target_index))
for index in indices:
try:
(prefix, date) = index.split('-')
if prefix == index_prefix:
dt = datetime.datetime.strptime(date, date_format)
if dt < target_dt and older_than < 0:
logger.info("Close {} < {}".format(index, target_index))
client.indices.close(index=index)
elif dt > target_dt and older_than > 0:
logger.info("Close {} > {}".format(index, target_index))
client.indices.close(index=index)
except ValueError as e:
continue
except Exception as e:
logger.error(str(e), exc_info=True)
def config_indices(client, index, older_than, config_string):
"""
"""
logger.info("indices: %s", client.indices.stats()['indices'].keys())
indices = sorted(client.indices.stats()['indices'].keys())
#indices = sorted(client.cluster.state()['metadata']['indices'].keys())
target_dt = datetime.datetime.now() + datetime.timedelta(days=older_than)
(index_prefix, date_format) = index.split('-')
target_index = index_prefix + "-" + target_dt.strftime(date_format)
logger.info("Target Index: {}".format(target_index))
for index in indices:
try:
(prefix, date) = index.split('-')
if prefix == index_prefix:
dt = datetime.datetime.strptime(date, date_format)
if (dt < target_dt and older_than < 0) or (dt > target_dt and older_than > 0):
setting = client.indices.get_settings(index=index)[index]['settings']
updated = False
for parent in config_string:
for key in config_string[parent]:
if int(setting[parent][key]) != int(config_string[parent][key]):
updated = True
continue
if updated:
logger.info("Change the config {} < {}, {}".format(index, target_index, updated))
client.indices.put_settings(index=index, body=config_string)
else:
logger.info("No changes, %s", index)
except ValueError as e:
logger.debug("%s: %s" % (str(e), index))
continue
except Exception as e:
logger.error(str(e), exc_info=True)
def list_indices(client):
indices = sorted(client.indices.stats()['indices'].keys())
for index in indices:
logger.info(index)
def bulk_indices(clinet, index, path):
""" bulk indexing the data from the tail of path
"""
pass
if __name__ == '__main__':
p = create_arguments()
host = p.host
index = '%s-%s' %(p.index, p.time_format)
older_than = -(p.older_than)
client = Elasticsearch(host=host, port=9200, timeout=30, connection_class=RequestsHttpConnection, send_get_body_as='POST', retry_on_timeout=True)
logger.info("ES Health: %s", client.cluster.health())
options = {}
options["client"] = client
options["older_tahn"] = older_than
options["index"] = index
#print getattr(__main__, command+"_indices")(**options)
if p.command == 'delete':
logger.info("Deleting the indices: :%s, %d", index, older_than)
delete_indices(client, index, older_than)
elif p.command == 'list':
list_indices(client)
elif p.command == 'close':
close_indices(client, index, older_than)
elif p.command == 'config':
config_string = p.command[1]
print("index: {}, older_than: {}, config_string: {}".format(
index, older_than, config_string
))
config_indices(client, index, older_than, json.loads(config_string))
else:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment