Created
September 10, 2020 13:37
-
-
Save alex-harvey-z3q/5bc93f952eaeebf3b81e4c4d36a1088d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import uuid | |
import os | |
import sys | |
from time import strftime, localtime | |
from typing import Generator | |
import boto3 | |
from elasticsearch import Elasticsearch, helpers | |
def usage() -> None: | |
print("Usage: GROUP_NAME=... ES_HOST=... {}".format( | |
os.path.basename(__file__))) | |
sys.exit(1) | |
class CWLogs: | |
group_name = os.environ["GROUP_NAME"] | |
def __init__(self) -> None: | |
self.client = boto3.client("logs") | |
def events(self) -> Generator[dict, None, None]: | |
for event in self._generate_events(): | |
yield event | |
def _generate_streams(self) -> Generator[dict, None, None]: | |
kwargs = {"logGroupName": self.group_name} | |
while True: | |
stream_batch = self.client.describe_log_streams(**kwargs) | |
yield from stream_batch["logStreams"] | |
try: | |
kwargs["nextToken"] = stream_batch["nextToken"] | |
except KeyError: | |
break | |
def _generate_events(self) -> Generator[dict, None, None]: | |
for stream_obj in self._generate_streams(): | |
stream_name = stream_obj["logStreamName"] | |
kwargs = { | |
"logGroupName": self.group_name, | |
"logStreamName": stream_name} | |
while True: | |
logs_batch = self.client.get_log_events(**kwargs) | |
yield from logs_batch["events"] | |
try: | |
kwargs["nextToken"] = logs_batch["nextToken"] | |
except KeyError: | |
break | |
class ESWriter: | |
es_host = os.environ["ES_HOST"] | |
def __init__(self) -> None: | |
self.elastic = Elasticsearch() | |
def post(self, events: Generator[dict, None, None]) -> None: | |
try: | |
response = helpers.bulk( | |
self.elastic, self._transformer(events)) | |
print("\nRESPONSE:", response) | |
except Exception as e: | |
print("\nERROR:", e) | |
@staticmethod | |
def _index_name(timestamp: str) -> str: | |
return "eventbridge-auth0-{}".format( | |
strftime("%Y.%U", localtime(float(timestamp)))) | |
@staticmethod | |
def _normalize(message: str) -> str: | |
return message # TODO. | |
def _transformer(self, events: Generator[dict, None, None]) \ | |
-> Generator[str, None, None]: | |
for event in events: | |
yield self._transform(event) | |
def _transform(self, event: dict) -> str: | |
timestamp = event["timestamp"] | |
index_name = self._index_name(timestamp) | |
message = self._normalize(event["message"]) | |
transformed = [ | |
json.dumps({ | |
"index": { | |
"_id": str(uuid.uuid4()), # TODO. Check | |
"_index": index_name, | |
"_type": "_doc", | |
} | |
}), | |
json.dumps({ | |
"source": { | |
"@source": "auto-populate script", | |
"@timestamp": timestamp, | |
"@message": message, | |
} | |
}), | |
] | |
return "\n".join(transformed) | |
def main() -> None: | |
for env_var in ["GROUP_NAME", "ES_HOST"]: | |
if env_var not in os.environ: | |
usage() | |
ESWriter().post(CWLogs().events()) | |
if __name__ == "__main__": | |
main() | |
# vim: set ft=python: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment