Skip to content

Instantly share code, notes, and snippets.

@alex-harvey-z3q
Created September 10, 2020 13:37
Show Gist options
  • Save alex-harvey-z3q/5bc93f952eaeebf3b81e4c4d36a1088d to your computer and use it in GitHub Desktop.
Save alex-harvey-z3q/5bc93f952eaeebf3b81e4c4d36a1088d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import json
import uuid
import os
import sys
from time import strftime, localtime
from typing import Generator
import boto3
from elasticsearch import Elasticsearch, helpers
def usage() -> None:
print("Usage: GROUP_NAME=... ES_HOST=... {}".format(
os.path.basename(__file__)))
sys.exit(1)
class CWLogs:
group_name = os.environ["GROUP_NAME"]
def __init__(self) -> None:
self.client = boto3.client("logs")
def events(self) -> Generator[dict, None, None]:
for event in self._generate_events():
yield event
def _generate_streams(self) -> Generator[dict, None, None]:
kwargs = {"logGroupName": self.group_name}
while True:
stream_batch = self.client.describe_log_streams(**kwargs)
yield from stream_batch["logStreams"]
try:
kwargs["nextToken"] = stream_batch["nextToken"]
except KeyError:
break
def _generate_events(self) -> Generator[dict, None, None]:
for stream_obj in self._generate_streams():
stream_name = stream_obj["logStreamName"]
kwargs = {
"logGroupName": self.group_name,
"logStreamName": stream_name}
while True:
logs_batch = self.client.get_log_events(**kwargs)
yield from logs_batch["events"]
try:
kwargs["nextToken"] = logs_batch["nextToken"]
except KeyError:
break
class ESWriter:
es_host = os.environ["ES_HOST"]
def __init__(self) -> None:
self.elastic = Elasticsearch()
def post(self, events: Generator[dict, None, None]) -> None:
try:
response = helpers.bulk(
self.elastic, self._transformer(events))
print("\nRESPONSE:", response)
except Exception as e:
print("\nERROR:", e)
@staticmethod
def _index_name(timestamp: str) -> str:
return "eventbridge-auth0-{}".format(
strftime("%Y.%U", localtime(float(timestamp))))
@staticmethod
def _normalize(message: str) -> str:
return message # TODO.
def _transformer(self, events: Generator[dict, None, None]) \
-> Generator[str, None, None]:
for event in events:
yield self._transform(event)
def _transform(self, event: dict) -> str:
timestamp = event["timestamp"]
index_name = self._index_name(timestamp)
message = self._normalize(event["message"])
transformed = [
json.dumps({
"index": {
"_id": str(uuid.uuid4()), # TODO. Check
"_index": index_name,
"_type": "_doc",
}
}),
json.dumps({
"source": {
"@source": "auto-populate script",
"@timestamp": timestamp,
"@message": message,
}
}),
]
return "\n".join(transformed)
def main() -> None:
for env_var in ["GROUP_NAME", "ES_HOST"]:
if env_var not in os.environ:
usage()
ESWriter().post(CWLogs().events())
if __name__ == "__main__":
main()
# vim: set ft=python:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment