Last active
February 4, 2022 12:08
-
-
Save cjuroz/d45f4d73e74f068892c5e4f3d1c7fa7c to your computer and use it in GitHub Desktop.
Serverless Elasticsearch Curator for AWS Lambda using requests-aws4auth to sign requests with AWS ES
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# http://docs.aws.amazon.com/lambda/latest/dg/lambda-python-how-to-create-deployment-package.html | |
pip install elasticsearch-curator -t /path/to/project-dir | |
pip install requests-aws4auth -t /path/to/project-dir |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Run Elasticsearch Curator from AWS Lambda. | |
# | |
# Edit serverless-curator.yaml to define which indices should be purged. | |
from __future__ import print_function | |
import os | |
import certifi | |
import curator | |
import yaml | |
import time | |
from curator.exceptions import NoIndices | |
from elasticsearch import Elasticsearch, RequestsHttpConnection | |
from requests_aws4auth import AWS4Auth | |
# This is the entry point where Lambda will start execution. | |
def handler(event, context): | |
# For this function, we don't care about 'event' and 'context', | |
# but they need to be in the function signature anyway. | |
with open('serverless-curator.yaml') as config_file: | |
config = yaml.load(config_file) | |
# Create a place to track any indices that are deleted. | |
deleted_indices = {} | |
# Create a place to track backup indices | |
backup_indices = {} | |
# We can define multiple Elasticsearch clusters to manage, so we'll have | |
# an outer loop for working through them. | |
for cluster_config in config: | |
cluster_name = cluster_config['name'] | |
deleted_indices[cluster_name] = [] | |
backup_indices[cluster_name] = [] | |
awsauth = AWS4Auth(os.getenv('AWS_ACCESS_KEY_ID'), os.getenv('AWS_SECRET_ACCESS_KEY'), cluster_config['region'], 'es', session_token=os.getenv('AWS_SESSION_TOKEN')) | |
# Create a collection to the cluster. We're using mangaged clusters in | |
# Elastic Cloud for this example, so we can enable SSL security. | |
es = Elasticsearch(cluster_config['endpoint'], http_auth=awsauth, connection_class=RequestsHttpConnection) | |
# Snapshots first | |
for snapshot in cluster_config['snapshots']: | |
prefix = snapshot['prefix'] | |
name = snapshot['name'] + '-' + time.strftime('%Y%m%d%H%M%S') | |
# # Fetch all the index names. | |
# index_list = curator.IndexList(es) | |
# # Reduce the list to those that match the prefix | |
# if "kibana" not in prefix: | |
# index_list.filter_by_regex(kind='prefix', value=prefix) | |
# else: | |
# index_list.filter_kibana() | |
# curator.Snapshot(ilo=index_list, repository=snapshot['repository'], name=name).do_action() | |
############################################################################################ | |
# Workaround for AWS ES 5.3 bug (https://forums.aws.amazon.com/thread.jspa?threadID=257549) | |
es.snapshot.create(repository=snapshot['repository'], snapshot=name, body='{"indices": "' + prefix + '"}') | |
# Check if snapshot finished and sleep 5s if necessary before check again. | |
while True: | |
resp = es.snapshot.status(repository=snapshot['repository']) | |
if not resp['snapshots']: | |
break | |
print('Snapshot %s is still running...' % name) | |
time.sleep(5) | |
############################################################################################ | |
# Record the names of any indices we backup. | |
# backup_indices[cluster_name].extend(index_list.working_list()) | |
# Now we'll work through each set of time-series indices defined in our config for this cluster. | |
for index in cluster_config['indices']: | |
prefix = index['prefix'] | |
print('Checking "%s" indices on %s cluster.' % | |
(prefix, cluster_name)) | |
# Fetch all the index names. | |
index_list = curator.IndexList(es) | |
try: | |
# Reduce the list to those that match the prefix. | |
index_list.filter_by_regex(kind='prefix', value=prefix) | |
# Reduce again, by age. | |
index_list.filter_by_age(source='name', direction='older', | |
timestring='%Y.%m.%d', unit='days', | |
unit_count=index['days']) | |
curator.DeleteIndices(index_list).do_action() | |
# If nothing is left in the list, we'll get a NoIndices exception. | |
# That's OK. | |
except NoIndices: | |
pass | |
# Record the names of any indices we removed. | |
deleted_indices[cluster_name].extend(index_list.working_list()) | |
lambda_response = {'backup': backup_indices, 'deleted': deleted_indices} | |
print(lambda_response) | |
return lambda_response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
# Define Elasticsearch Clusters and indices here, to have them periodically | |
# pruned by Curator. | |
- name: example logging cluster | |
endpoint: https://curator:[email protected]:9243 | |
region: us-west-2 | |
snapshots: | |
- repository: index-backup-stage | |
prefix: logstash-stage-* | |
name: curator-stage | |
- repository: kibana-backup | |
prefix: .kibana | |
name: curator-kibana | |
indices: | |
- prefix: logstash- | |
days: 365 | |
- name: example metrics cluster | |
endpoint: https://curator:[email protected]:9243 | |
region: us-west-2 | |
indices: | |
- prefix: metricbeat- | |
days: 14 | |
- prefix: packetbeat- | |
days: 14 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment