|
import os |
|
import boto3 |
|
import yaml |
|
import threading |
|
import logging |
|
from datadog import api |
|
from boto3.dynamodb.conditions import Key |
|
|
|
# Set up the logging configuration |
|
logging.basicConfig(filename='upload.log', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') |
|
|
|
# REF: https://docs.datadoghq.com/metrics/custom_metrics/ |
|
# init DD |
|
api_key = os.environ.get('DATADOG_API_KEY') |
|
app_key = os.environ.get('DATADOG_APP_KEY') |
|
|
|
datadog.initialize(api_key=api_key, app_key=app_key) |
|
|
|
# Define the name of your DynamoDB table |
|
TABLE_NAME = "uploaded-files" |
|
|
|
# Connect to DynamoDB |
|
dynamodb = boto3.resource("dynamodb") |
|
table = dynamodb.Table(TABLE_NAME) |
|
|
|
def upload_file(bucket, path, key): |
|
s3 = boto3.client('s3', region_name='us-east-1') |
|
try: |
|
s3.upload_file(path, bucket, key) |
|
logging.info(f'Successfully uploaded {key}') |
|
# send the upload success events to DD api as custom metric |
|
api.Metric.send(metric='s3.upload.success') |
|
# Add the uploaded file name to DynamoDB |
|
table.put_item(Item={"Filename": key}) |
|
except Exception as e: |
|
logging.error(f'Failed to upload {key}: {str(e)}') |
|
# send the upload failed events to DD api as custom metric |
|
api.Metric.send(metric='s3.upload.failed') |
|
|
|
|
|
# Load the YAML file |
|
with open('files.yaml', 'r') as f: |
|
files = yaml.safe_load(f)['files'] |
|
|
|
# Get the list of files that have already been uploaded from DynamoDB |
|
response = table.scan(ProjectionExpression="Filename") |
|
uploaded_files = set(item['Filename'] for item in response['Items']) |
|
|
|
# Identify the net-new files that need to be uploaded |
|
net_new_files = [file for file in files if file['name'] not in uploaded_files] |
|
|
|
# Create a thread for each net-new file and start uploading |
|
threads = [] |
|
for file in net_new_files: |
|
name = file['name'] |
|
path = file['path'] |
|
thread = threading.Thread(target=upload_file, args=('fueled-fun', path, name)) |
|
threads.append(thread) |
|
thread.start() |
|
|
|
# Wait for all threads to finish |
|
for thread in threads: |
|
thread.join() |