Skip to content

Instantly share code, notes, and snippets.

@letenkov
Last active December 19, 2024 15:40
Show Gist options
  • Save letenkov/b0f58df7d01aa38bbfaba66fc40ecc0b to your computer and use it in GitHub Desktop.
Save letenkov/b0f58df7d01aa38bbfaba66fc40ecc0b to your computer and use it in GitHub Desktop.
This project provides a tool for processing Avro files stored on S3. It reads the files, counts the number of rows, and watches for new files that are added while running.

Avro Reader

This project provides a tool for processing Avro files stored on S3. It reads the files, counts the number of rows, and watches for new files that are added while running.

Features

  • Reads Avro files from a specified S3 bucket and prefix.
  • Counts the number of rows in each Avro file.
  • Watches for and processes new files that are added to the S3 bucket during execution.
  • Allows configuration of the number of threads for parallel processing.

Requirements

  • Docker
  • Kubernetes
  • AWS S3 or compatible object storage, such as Yandex Cloud Storage

Setup

Docker

  1. Build the Docker image:

    docker build -t your-username/your-image-name:latest .
  2. Push the Docker image to a container registry:

    docker push your-username/your-image-name:latest

Kubernetes

  1. Update the deployment.yaml with your specific configuration such as access keys, bucket name, and prefix.

  2. Deploy to your Kubernetes cluster:

    kubectl apply -f deployment.yaml

Configuration

The behavior of the Avro Reader is controlled via environment variables specified in the Kubernetes deployment.yaml:

  • AWS_ACCESS_KEY_ID: Your AWS access key.
  • AWS_SECRET_ACCESS_KEY: Your AWS secret key.
  • AWS_DEFAULT_REGION: The AWS region where your S3 bucket is located.
  • AWS_ENDPOINT_URL: Endpoint URL for non-AWS S3 service like Yandex Cloud.
  • S3_BUCKET_NAME: Name of the S3 bucket.
  • S3_PREFIX: Prefix to filter relevant Avro files in the bucket.
  • THREADS: Number of threads to use for processing files concurrently.

Usage

Upon deployment, the application will:

  1. Connect to the specified S3 bucket.
  2. List and process all existing Avro files under the specified prefix.
  3. Continue watching for and processing any new files added to the bucket.

License

This project is licensed under the Apache License 2.0. See the LICENSE file for more details.

Author

  • Letenkov, Eugene, 2024
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Letenkov, Eugene
# Copyright 2024 Letenkov, Eugene. All rights reserved.
import os
import boto3
from fastavro import reader
import io
from concurrent.futures import ThreadPoolExecutor, as_completed
import argparse
import signal
stop_processing = False
def signal_handler(sig, frame):
global stop_processing
print('Signal received, stopping...')
stop_processing = True
def get_all_avro_files(bucket_name, prefix, s3_client):
print(f"Fetching list of Avro files from bucket: {bucket_name} with prefix: {prefix}")
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
avro_files = []
for page in page_iterator:
file_info = page.get('Contents', [])
avro_files.extend([file['Key'] for file in file_info if file['Key'].endswith('.avro')])
print(f"Found {len(avro_files)} Avro files.")
return avro_files
def count_rows_in_avro_file(s3_client, bucket_name, file_key):
if stop_processing:
return 0
print(f"Processing file: {file_key}\n")
obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
data = obj['Body'].read()
with io.BytesIO(data) as file_io:
avro_reader = reader(file_io)
row_count = 0
for _ in avro_reader:
if stop_processing:
print(f"Interrupted processing of file: {file_key}")
return row_count
row_count += 1
return row_count
def process_files(bucket_name, avro_files, num_threads):
s3_client = boto3.client('s3')
total_rows = 0
num_files = len(avro_files)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(count_rows_in_avro_file, s3_client, bucket_name, file_key): file_key for file_key in avro_files}
completed_files = 0
try:
for future in as_completed(futures):
if stop_processing:
print("Process interrupted. Attempting to cancel remaining tasks...")
for f in futures:
f.cancel()
break
try:
total_rows += future.result()
completed_files += 1
remaining_files = num_files - completed_files
print(f"Processed {completed_files}/{num_files} files. Remaining: {remaining_files}\n")
except Exception as e:
print(f"Error processing file: {e}")
finally:
print("Shutting down executor")
executor.shutdown(wait=False)
return total_rows
def main():
global stop_processing
signal.signal(signal.SIGINT, signal_handler)
parser = argparse.ArgumentParser(description="Process Avro files in S3 bucket.")
parser.add_argument('--bucket_name', required=True, help='Name of the S3 bucket')
parser.add_argument('--prefix', required=True, help='Prefix of the S3 objects')
parser.add_argument('--threads', type=int, default=10, help='Number of threads to use for processing files')
args = parser.parse_args()
bucket_name = args.bucket_name
prefix = args.prefix
num_threads = args.threads
s3_client = boto3.client('s3')
avro_files = get_all_avro_files(bucket_name, prefix, s3_client)
total_rows = process_files(bucket_name, avro_files, num_threads)
if not stop_processing:
print(f"Total number of rows across all files: {total_rows}")
else:
print("Processing was interrupted.")
if __name__ == '__main__':
main()
apiVersion: apps/v1
kind: Deployment
metadata:
name: avro-processing
spec:
replicas: 1
selector:
matchLabels:
app: avro-processing
template:
metadata:
labels:
app: avro-processing
spec:
containers:
- name: avro-reader
image: your-username/your-image-name:latest
command: ["/bin/sh"]
args:
- -c
- >-
python /app/avro-reader.py
--bucket_name=$(S3_BUCKET_NAME)
--prefix=$(S3_PREFIX)
--threads=$(THREADS)
env:
- name: AWS_ACCESS_KEY_ID
value: "your-access-key"
- name: AWS_SECRET_ACCESS_KEY
value: "your-secret-key"
- name: AWS_DEFAULT_REGION
value: "ru-central1"
- name: AWS_ENDPOINT_URL
value: "https://storage.yandexcloud.net"
- name: S3_BUCKET_NAME
value: "my-bucket"
- name: S3_PREFIX
value: "my/prefix/"
- name: THREADS
value: "20"
FROM python:3.9-slim
RUN pip install --no-cache-dir boto3==1.35.84 fastavro==1.9.7 cramjam==2.9.1
ENV PYTHONUNBUFFERED=1
COPY avro-reader.py /app/avro-reader.py
WORKDIR /app
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment