Last active
June 3, 2022 00:31
-
-
Save ekowcharles/a02abb2d807b3ee84930820c47b8d906 to your computer and use it in GitHub Desktop.
Process ALB Logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
setup: | |
python3 -m pip install -r requirements.pip | |
run: | |
rm -rf logs/* | |
aws s3 sync s3://<bucket>/AWSLogs/<account-id>/elasticloadbalancing/<region>/<year>/<month>/<day>/ `pwd`/logs | |
cd logs && gunzip *.log.gz | |
python3 upload.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
psycopg2-binary | |
csv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import psycopg2 | |
import csv | |
# https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html | |
CONNECTION = psycopg2.connect( | |
host='host', | |
database='database', | |
user='username', | |
password='password' | |
) | |
SPACE = ' ' | |
COLON = ':' | |
DIRECTORY = 'logs' | |
def setup_database(): | |
print('Recreating table ...') | |
sql = """ | |
DROP TABLE IF EXISTS logs; | |
CREATE TABLE logs | |
( | |
ttype varchar(225), | |
tdatetime varchar(225), | |
tdate varchar(225), | |
ttime varchar(225), | |
tminute varchar(225), | |
elb varchar(225), | |
client_ip varchar(225), | |
client_port varchar(225), | |
target_ip varchar(225), | |
target_port varchar(225), | |
request_processing_time varchar(225), | |
target_processing_time varchar(225), | |
response_processing_time varchar(225), | |
elb_status_code varchar(225), | |
target_status_code varchar(225), | |
received_bytes varchar(225), | |
sent_bytes varchar(225), | |
request_verb varchar(225), | |
request_url varchar(225), | |
request_proto varchar(225), | |
user_agent varchar(512), | |
ssl_cipher varchar(225), | |
ssl_protocol varchar(225), | |
target_group_arn varchar(225), | |
trace_id varchar(225), | |
domain_name varchar(225), | |
chosen_cert_arn varchar(225), | |
matched_rule_priority varchar(225), | |
request_creation_time varchar(225), | |
actions_executed varchar(225), | |
redirect_url varchar(225), | |
lambda_error_reason varchar(225), | |
target_port_list varchar(225), | |
target_status_code_list varchar(225), | |
classification varchar(225), | |
classification_reason varchar(225) | |
); | |
""" | |
cur = CONNECTION.cursor() | |
cur.execute(sql) | |
CONNECTION.commit() | |
cur.close() | |
def insert_row(line): | |
try: | |
client_ip = client_port = target_ip = target_port = request_verb = request_url = request_proto = tdatetime = tdate = ttime = '' | |
ttype, tdatetime, elb, client_ip, target_ip, request_processing_time, target_processing_time, response_processing_time, elb_status_code, target_status_code, received_bytes, sent_bytes, request_url, user_agent, ssl_cipher, ssl_protocol, target_group_arn, trace_id, domain_name, chosen_cert_arn, matched_rule_priority, request_creation_time, actions_executed, redirect_url, lambda_error_reason, target_port_list, target_status_code_list, classification, classification_reason = line | |
if COLON in client_ip: | |
client_ip, client_port = client_ip.split(COLON) | |
if COLON in target_ip: | |
target_ip, target_port = target_ip.split(COLON) | |
request_verb, request_url, request_proto = request_url.split(SPACE) | |
tdatetime = tdatetime[:19] | |
tdate, ttime = tdatetime.split('T') | |
tminute = ttime[:5] | |
sql = """INSERT INTO public.logs(ttype, tdatetime, tdate, ttime, tminute, elb, client_ip, client_port, target_ip, target_port, request_processing_time, target_processing_time, response_processing_time, elb_status_code, target_status_code, received_bytes, sent_bytes, request_verb, request_url, request_proto, user_agent, ssl_cipher, ssl_protocol, target_group_arn, trace_id, domain_name, chosen_cert_arn, matched_rule_priority, request_creation_time, actions_executed, redirect_url, lambda_error_reason, target_port_list, target_status_code_list, classification, | |
classification_reason) | |
VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""" | |
cur = CONNECTION.cursor() | |
cur.execute(sql, (ttype, tdatetime, tdate, ttime, tminute, elb, client_ip, client_port, target_ip, target_port, request_processing_time, target_processing_time, response_processing_time, elb_status_code, target_status_code, received_bytes, sent_bytes, request_verb, request_url, request_proto, | |
user_agent, ssl_cipher, ssl_protocol, target_group_arn, trace_id, domain_name, chosen_cert_arn, matched_rule_priority, request_creation_time, actions_executed, redirect_url, lambda_error_reason, target_port_list, target_status_code_list, classification, classification_reason)) | |
CONNECTION.commit() | |
cur.close() | |
except Exception as e: | |
print(e, line) | |
def parse_files(): | |
for filename in os.listdir(DIRECTORY): | |
path = '{}/{}'.format(DIRECTORY, filename) | |
with open(path, newline='\n') as csvfile: | |
print('Processing {} ...'.format(path)) | |
try: | |
reader = csv.reader(csvfile, delimiter=SPACE, quotechar='"') | |
for line in reader: | |
insert_row(line) | |
except Exception as e: | |
print(e) | |
if CONNECTION is not None: | |
try: | |
setup_database() | |
parse_files() | |
except Exception as e: | |
print(e) | |
finally: | |
CONNECTION.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment