Created
December 17, 2015 11:55
-
-
Save babo/ea813e3ce31b40f30c15 to your computer and use it in GitHub Desktop.
Simple AWS lambda job to create an SQS event for each S3 events.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import logging | |
try: | |
from urllib import splittype | |
except ImportError: | |
from urllib.parse import splittype | |
import boto3 | |
LOGGER_NAME = None | |
class Config(object): | |
queue = 'data-etl' | |
profile = None | |
def log(): | |
return logging.getLogger(LOGGER_NAME) | |
def config_logger(): | |
logger = log() | |
if not logger.handlers: | |
logger.addHandler(logging.StreamHandler()) | |
logger.setLevel(logging.INFO) | |
def send_to_sqs(uris): | |
session = boto3.session.Session(profile_name=Config.profile) | |
sqs = session.resource('sqs') | |
queue = sqs.get_queue_by_name(QueueName=Config.queue) | |
for s3_path in uris: | |
url_type, _ = splittype(s3_path) | |
if url_type == 's3': | |
resp = queue.send_message(MessageBody=s3_path) | |
log().info('Sent %s to %s with MessageId %s', s3_path, Config.queue, resp['MessageId']) | |
else: | |
log().info('Unknown uri %s', s3_path) | |
def lambda_handler(event, context): | |
config_logger() | |
uris = ['s3://{}/{}'.format(ev['s3']['bucket']['name'], ev['s3']['object']['key']) for ev in event['Records']] | |
if Config.queue: | |
send_to_sqs(uris) | |
else: | |
log().info('There is no queue configured') | |
log().info('Remaining time at exit: %d ms', context.get_remaining_time_in_millis()) | |
def process_args(source=None): | |
parser = argparse.ArgumentParser(description='Replicate S3 events into SQS') | |
parser.add_argument('-p', '--profile', action='store', type=str, default=Config.profile, help='AWS profile') | |
parser.add_argument('-q', '--queue', action='store', type=str, default=Config.queue, help='SQS queue or empty') | |
parser.add_argument('input', type=str, nargs='+', help='URI to process') | |
args = parser.parse_args(args=source) | |
Config.profile = args.profile | |
Config.queue = args.queue | |
return args | |
def main(): | |
config_logger() | |
send_to_sqs(process_args().input) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, can you write lambda func in python that reads the file line by line from S3 and sends it to SQS topic?