Skip to content

Instantly share code, notes, and snippets.

@AndreLouisCaron
Created June 12, 2019 18:54
Show Gist options
  • Select an option

  • Save AndreLouisCaron/782e09443588e3d9d5167623c8fc8a08 to your computer and use it in GitHub Desktop.

Select an option

Save AndreLouisCaron/782e09443588e3d9d5167623c8fc8a08 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# When running in a local back-end, both ElasticSearch and ElastAlert run
# inside Docker containers that are launched side-by-side (e.g. via
# `docker-compose` or K8S). Unfortunately, ElastAlert does not account for
# this and requires that the `elastalert_status` index be created before
# ElastAlert starts. In addition, when running via `docker-compose`, there is
# no automatic restart when the `elastalert` command crashes on start.
#
# This script's purpose is to block the start of the ElastAlert command until
# we can confirm that the `elastalert-create-index` command has completed.
# This requires blocking until:
#
# - ElasticSearch is responsive;
# - the `elastalert_status` index exists; and
# - all type mappings have been configured on the `elastalert_status` index.
#
# Since `elastalert-create-index` creates the index and then configures the
# type mappings for each document type in separate transactions, we actually
# need to check the mappings for each document type has been set. Only then,
# can we "know" that `elastalert-create-index` has completed and be confident
# that the `elastalert` command will not crash on start.
from __future__ import print_function
import aws_requests_auth.aws_auth
import boto3.session
import os
import requests
import requests.exceptions
import time
import sys
from datetime import (
datetime,
timedelta,
)
from urlparse import urljoin
def elasticsearch_url():
"""Compute ES URL from environment variables for ElastAlert."""
if bool(os.environ.get('ES_USE_SSL', 'false').lower() == 'true'):
scheme = 'https'
else:
scheme = 'http'
return '%s://%s:%d' % (
scheme,
os.environ.get('ES_HOST', 'localhost'),
int(os.environ.get('ES_PORT', '9200')),
)
# All document types declared by `elastalert-create-index`.
DESIRED_DOCUMENT_TYPES = frozenset((
'elastalert',
'elastalert_error',
'elastalert_status',
'past_elastalert',
'silence',
))
class RefeshableAWSRequestsAuth(aws_requests_auth.aws_auth.AWSRequestsAuth):
def __init__(self,
refreshable_credential,
host,
region,
service):
self.refreshable_credential = refreshable_credential
self.aws_host = host
self.aws_region = region
self.service = service
@property
def aws_access_key(self):
return self.refreshable_credential.access_key
@property
def aws_secret_access_key(self):
return self.refreshable_credential.secret_key
@property
def aws_token(self):
return self.refreshable_credential.token
def elasticsearch_auth():
"""Get the Requests custom authentication callable for ElastAlert."""
region = os.environ.get('AWS_DEFAULT_REGION', '').strip()
if not region:
return None
session = boto3.session.Session(region_name=region)
return RefeshableAWSRequestsAuth(
refreshable_credential=session.get_credentials(),
host=os.environ['ES_HOST'],
region=region,
service='es',
)
def main():
"""Block until `elastalert-create-index` has completed."""
ref = datetime.utcnow()
deadline = ref + timedelta(minutes=5)
auth = elasticsearch_auth()
# Wait until the `elastalert_status` index exists (the script which
# provisions it is concurrently waiting for ElasticSearch to become
# available).
now = datetime.utcnow()
url = elasticsearch_url()
url = urljoin(url, 'elastalert_status')
print('[%s] url: %s' % (
now,
url,
))
with requests.Session() as session:
while True:
# We must have not exceeded the deadline.
now = datetime.utcnow()
if now >= deadline:
print('[%s] timed out while waiting for %s :-(' % (
now,
url,
))
sys.exit(1)
print('[%s] poll...' % (
now,
))
# The index must exist.
try:
r = session.get(url, auth=auth)
except requests.exceptions.ConnectionError as e:
print('[%s] ElasticSearch is not yet responsive :-(' % (
now,
))
time.sleep(1.0)
continue
if r.status_code == 404:
print('[%s] index `%s` does not yet exist :-(' % (
now,
'elastalert_status',
))
time.sleep(1.0)
continue
r.raise_for_status()
# Mappings must exist for all document types.
index = r.json()
document_types = frozenset(
index['elastalert_status'].get('mappings', {})
)
missing_document_types = DESIRED_DOCUMENT_TYPES - document_types
if missing_document_types:
for doc_type in missing_document_types:
print('[%s] document type `%s` does not have mappings yet :-(' % (
now,
doc_type,
))
time.sleep(1.0)
continue
# Done.
print('[%s] ready!' % (
now,
))
return
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment