Created
June 12, 2019 18:54
-
-
Save AndreLouisCaron/782e09443588e3d9d5167623c8fc8a08 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| # When running in a local back-end, both ElasticSearch and ElastAlert run | |
| # inside Docker containers that are launched side-by-side (e.g. via | |
| # `docker-compose` or K8S). Unfortunately, ElastAlert does not account for | |
| # this and requires that the `elastalert_status` index be created before | |
| # ElastAlert starts. In addition, when running via `docker-compose`, there is | |
| # no automatic restart when the `elastalert` command crashes on start. | |
| # | |
| # This script's purpose is to block the start of the ElastAlert command until | |
| # we can confirm that the `elastalert-create-index` command has completed. | |
| # This requires blocking until: | |
| # | |
| # - ElasticSearch is responsive; | |
| # - the `elastalert_status` index exists; and | |
| # - all type mappings have been configured on the `elastalert_status` index. | |
| # | |
| # Since `elastalert-create-index` creates the index and then configures the | |
| # type mappings for each document type in separate transactions, we actually | |
| # need to check the mappings for each document type has been set. Only then, | |
| # can we "know" that `elastalert-create-index` has completed and be confident | |
| # that the `elastalert` command will not crash on start. | |
| from __future__ import print_function | |
| import aws_requests_auth.aws_auth | |
| import boto3.session | |
| import os | |
| import requests | |
| import requests.exceptions | |
| import time | |
| import sys | |
| from datetime import ( | |
| datetime, | |
| timedelta, | |
| ) | |
| from urlparse import urljoin | |
| def elasticsearch_url(): | |
| """Compute ES URL from environment variables for ElastAlert.""" | |
| if bool(os.environ.get('ES_USE_SSL', 'false').lower() == 'true'): | |
| scheme = 'https' | |
| else: | |
| scheme = 'http' | |
| return '%s://%s:%d' % ( | |
| scheme, | |
| os.environ.get('ES_HOST', 'localhost'), | |
| int(os.environ.get('ES_PORT', '9200')), | |
| ) | |
| # All document types declared by `elastalert-create-index`. | |
| DESIRED_DOCUMENT_TYPES = frozenset(( | |
| 'elastalert', | |
| 'elastalert_error', | |
| 'elastalert_status', | |
| 'past_elastalert', | |
| 'silence', | |
| )) | |
| class RefeshableAWSRequestsAuth(aws_requests_auth.aws_auth.AWSRequestsAuth): | |
| def __init__(self, | |
| refreshable_credential, | |
| host, | |
| region, | |
| service): | |
| self.refreshable_credential = refreshable_credential | |
| self.aws_host = host | |
| self.aws_region = region | |
| self.service = service | |
| @property | |
| def aws_access_key(self): | |
| return self.refreshable_credential.access_key | |
| @property | |
| def aws_secret_access_key(self): | |
| return self.refreshable_credential.secret_key | |
| @property | |
| def aws_token(self): | |
| return self.refreshable_credential.token | |
| def elasticsearch_auth(): | |
| """Get the Requests custom authentication callable for ElastAlert.""" | |
| region = os.environ.get('AWS_DEFAULT_REGION', '').strip() | |
| if not region: | |
| return None | |
| session = boto3.session.Session(region_name=region) | |
| return RefeshableAWSRequestsAuth( | |
| refreshable_credential=session.get_credentials(), | |
| host=os.environ['ES_HOST'], | |
| region=region, | |
| service='es', | |
| ) | |
| def main(): | |
| """Block until `elastalert-create-index` has completed.""" | |
| ref = datetime.utcnow() | |
| deadline = ref + timedelta(minutes=5) | |
| auth = elasticsearch_auth() | |
| # Wait until the `elastalert_status` index exists (the script which | |
| # provisions it is concurrently waiting for ElasticSearch to become | |
| # available). | |
| now = datetime.utcnow() | |
| url = elasticsearch_url() | |
| url = urljoin(url, 'elastalert_status') | |
| print('[%s] url: %s' % ( | |
| now, | |
| url, | |
| )) | |
| with requests.Session() as session: | |
| while True: | |
| # We must have not exceeded the deadline. | |
| now = datetime.utcnow() | |
| if now >= deadline: | |
| print('[%s] timed out while waiting for %s :-(' % ( | |
| now, | |
| url, | |
| )) | |
| sys.exit(1) | |
| print('[%s] poll...' % ( | |
| now, | |
| )) | |
| # The index must exist. | |
| try: | |
| r = session.get(url, auth=auth) | |
| except requests.exceptions.ConnectionError as e: | |
| print('[%s] ElasticSearch is not yet responsive :-(' % ( | |
| now, | |
| )) | |
| time.sleep(1.0) | |
| continue | |
| if r.status_code == 404: | |
| print('[%s] index `%s` does not yet exist :-(' % ( | |
| now, | |
| 'elastalert_status', | |
| )) | |
| time.sleep(1.0) | |
| continue | |
| r.raise_for_status() | |
| # Mappings must exist for all document types. | |
| index = r.json() | |
| document_types = frozenset( | |
| index['elastalert_status'].get('mappings', {}) | |
| ) | |
| missing_document_types = DESIRED_DOCUMENT_TYPES - document_types | |
| if missing_document_types: | |
| for doc_type in missing_document_types: | |
| print('[%s] document type `%s` does not have mappings yet :-(' % ( | |
| now, | |
| doc_type, | |
| )) | |
| time.sleep(1.0) | |
| continue | |
| # Done. | |
| print('[%s] ready!' % ( | |
| now, | |
| )) | |
| return | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment