Skip to content

Instantly share code, notes, and snippets.

@sgsharma
Last active May 31, 2023 08:12
Show Gist options
  • Save sgsharma/e5eacdb8e4e8a06803478c0c7dd2d115 to your computer and use it in GitHub Desktop.
Save sgsharma/e5eacdb8e4e8a06803478c0c7dd2d115 to your computer and use it in GitHub Desktop.
Parse aurora mysql logs using lambda and send to Honeycomb
import json
import re
import gzip
import base64
def parse_json_with_regex(event, context):
output = []
for record in event['records']:
print(f'Processing record {record}')
cw_data = record['data']
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload).decode("utf-8")
event_body = json.loads(uncompressed_payload)
# Parse JSON string
log_event = event_body['logEvents'][0]
message = log_event['message']
# Regular expressions for extracting fields
timestamp_regex = r"# Time: (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)"
user_host_regex = r"# User@Host: (\w+)\[(\w+)\] @ \[([\d\.]+)\] Id: (\d+)"
query_time_regex = r"# Query_time: (\d+\.\d+) Lock_time: (\d+\.\d+) Rows_sent: (\d+) Rows_examined: (\d+)"
set_timestamp_regex = r"SET timestamp=(\d+);"
# Extract fields using regular expressions
timestamp_match = re.search(timestamp_regex, message)
user_host_match = re.search(user_host_regex, message)
query_time_match = re.search(query_time_regex, message)
set_timestamp_match = re.search(set_timestamp_regex, message)
query_stmt_start_index = re.search(set_timestamp_regex, message).start()
query_stmt = re.sub(set_timestamp_regex, "", message[query_stmt_start_index:]).strip()
data = {
'timestamp': timestamp_match.group(1) if timestamp_match else None,
'user': user_host_match.group(1) if user_host_match else None,
'host': user_host_match.group(3) if user_host_match else None,
'id': user_host_match.group(4) if user_host_match else None,
'query_time': float(query_time_match.group(1)) if query_time_match else None,
'lock_time': float(query_time_match.group(2)) if query_time_match else None,
'rows_sent': int(query_time_match.group(3)) if query_time_match else None,
'rows_examined': int(query_time_match.group(4)) if query_time_match else None,
'timestamp_set': int(set_timestamp_match.group(1)) if set_timestamp_match else None,
'query': query_stmt if query_stmt else None
}
parsed_fields = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(data).encode('utf-8')).decode('utf-8')
}
output.append(parsed_fields)
return {'records': output}
module "mysql-logs" {
source = "honeycombio/integrations/aws//modules/cloudwatch-logs"
name = "rds-logs"
cloudwatch_log_groups = ["/aws/rds/cluster/database-1/slowquery"]
honeycomb_api_key = var.honeycomb_api_key
enable_lambda_transform = true
lambda_transform_arn = "arn:aws:lambda:us-east-1:<acct-id>:function:kfh-lambda-transform-dev-main"
honeycomb_dataset_name = "rds-mysql-logs"
s3_failure_bucket_arn = "arn:aws:s3:::demo-kinesis-integration-failures"
}
Parse Aurora mysql logs using cloudwatch, kinesis firehose, and lambda
terraform {
required_version = "~> 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.9"
}
honeycombio = {
source = "honeycombio/honeycombio"
version = "~> 0.13.0"
}
}
}
service: kfh-lambda-transform
frameworkVersion: '3'
provider:
name: aws
runtime: python3.9
functions:
main:
handler: handler.parse_json_with_regex
package:
exclude:
- venv/**
- terraform/**
variable honeycomb_api_key {
type = string
description = "Honeycomb API key"
default = ""
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment