Skip to content

Instantly share code, notes, and snippets.

@CharlesNepote
Created December 23, 2024 22:34
Show Gist options
  • Save CharlesNepote/7dcf4ddecd9b0edaa0650d3569e62ed3 to your computer and use it in GitHub Desktop.
Save CharlesNepote/7dcf4ddecd9b0edaa0650d3569e62ed3 to your computer and use it in GitHub Desktop.
Convert nginx logs to parquet file
import argparse
import re
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
#import dateutil
#from dateutil import parser
# Set up argument parser and finally parse the arguments
arg_parser = argparse.ArgumentParser(description='Process Nginx log file and convert to Parquet.')
arg_parser.add_argument('--log_file', type=str, help='Path to the Nginx log file')
arg_parser.add_argument('--parquet_file', type=str, help='Path to the output Parquet file')
args = arg_parser.parse_args()
# Get the log file path from the command-line arguments
log_file = args.log_file
parquet_file = args.parquet_file
# Function to parse a log line
# 0.0.0.0 - - [11/Sep/2022:00:01:01 +0000] "GET /w/ HTTP/1.1" 301 185 "http://eg.com/w/test" "Mozilla/5.0 (Linux; Android 7.0;)"
def parse_nginx_log(line):
parts = line.split(" ")
if len(parts) < 12:
raise ValueError(f"Unexpected log format: {line}")
return {
"remote_addr": parts[0],
"remote_user": parts[2],
"time_local": datetime.strptime(parts[3][1:], "%d/%b/%Y:%H:%M:%S"),
#"time_local": parser.parse(parts[3][1:]),
#"time_local_str": parts[3][1:],
"method": parts[5].strip('"'),
"request": parts[6],
"version": parts[7].strip('"'),
"status": parts[8],
"body_bytes_sent": int(parts[9]),
"http_referer": parts[10].strip('"'),
"http_user_agent": " ".join(parts[11:]).strip('"')
}
# Read and write in batches
batch_size = 100000
batch = []
# Initialise the Parquet file
writer = None
bad_lines = 0
counter = 0
with open(log_file, "r") as f:
for line in f:
try:
batch.append(parse_nginx_log(line))
except Exception as e:
#print(f"Erreur lors du parsing de la ligne : {line} - {e}")
bad_lines += 1
if len(batch) >= batch_size:
# Convert into DataFrame
df = pd.DataFrame(batch)
table = pa.Table.from_pandas(df)
# Incremental write to Parquet
if writer is None:
writer = pq.ParquetWriter(parquet_file, table.schema, compression='gzip')
writer.write_table(table)
# Reinitialize the batch
batch = []
counter += 1
# Write last elements
if batch:
df = pd.DataFrame(batch)
table = pa.Table.from_pandas(df)
if writer is None:
writer = pq.ParquetWriter(parquet_file, table.schema, compression='gzip')
writer.write_table(table)
# Close writer
if writer:
writer.close()
print(f"Number of lines: {counter}")
print(f"Number of errors: {bad_lines}")
@CharlesNepote
Copy link
Author

CharlesNepote commented Dec 23, 2024

nginx_logs2parquet.py --log_file logs.log --parquet_file output.parquet

Ingesting a 10 millions+ lines / 2.8GB file results a 155MB parquet file in ~160 s on my old computer (~65K lines / second).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment