Skip to content

Instantly share code, notes, and snippets.

@sahil1
Last active June 17, 2020 18:51
Show Gist options
  • Save sahil1/cc47a7651f4d94778c2abcf81b08fd0a to your computer and use it in GitHub Desktop.
Save sahil1/cc47a7651f4d94778c2abcf81b08fd0a to your computer and use it in GitHub Desktop.
Airflow Sample DAG with ECS Operator
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.ecs_operator import ECSOperator
import os
from datetime import datetime, timedelta
import boto3
import json
from config import *
default_args = {
'owner': 'ubuntu',
'start_date': datetime(2019, 8, 14),
'retry_delay': timedelta(seconds=60*60)
}
def process_sqs_message(**context):
queue_url = "https://sqs.us-east-1.amazonaws.com/aws_account_id/airflow_test_" + ENVIRONMENT
sqs = boto3.client('sqs', region_name='us-east-1')
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
VisibilityTimeout=0,
WaitTimeSeconds=0
)
print("RESPONSE", response)
if "Messages" in response:
message = response['Messages'][0]
receipt_handle = message['ReceiptHandle']
# Delete received message from queue
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
bucket = json.loads(message["Body"])['Records'][0]['s3']['bucket']['name']
key = json.loads(message["Body"])['Records'][0]['s3']['object']['key']
context['ti'].xcom_push(key='bucket', value=bucket)
context['ti'].xcom_push(key='key', value=key)
else:
context['ti'].xcom_push(key='bucket', value=None)
context['ti'].xcom_push(key='key', value=None)
return True
def write_output(**kwargs):
bucket = kwargs['ti'].xcom_pull(key='bucket')
key = kwargs['ti'].xcom_pull(key='key')
if(bucket != None):
s3 = boto3.resource('s3')
bucketObj = s3.Bucket(bucket)
bucketObj.download_file(key, '/home/ubuntu/Downloads/' + key.split("/")[-1])
client = boto3.client('s3')
transfer = boto3.s3.transfer.S3Transfer(client)
transfer.upload_file('/home/ubuntu/Downloads/' + key.split("/")[-1], bucket, 'output/' + key.split('/')[-1])
return True
def print_message(**kwargs):
print('Completed All Tasks')
return True
with DAG('airflow_dag_test', catchup=False, default_args=default_args, schedule_interval=timedelta(minutes=60)) as dag:
message_reader = PythonOperator(
task_id="message_reader",
python_callable=process_sqs_message,
provide_context=True,
dag=dag
)
print_message = PythonOperator(
task_id="print_message",
python_callable=print_message,
provide_context=True,
dag=dag
)
write_output_ecs = ECSOperator(
task_id="test_task_" + ENVIRONMENT,
task_definition="test_task_" + ENVIRONMENT,
cluster='ecs_cluster',
aws_conn_id="aws_default",
overrides={
'containerOverrides': [
{
'name': "test_task_" + ENVIRONMENT,
'environment': [
{
'name': 'bucket',
'value': '{}'.format("{{ task_instance.xcom_pull(key='bucket') }}")
},
{
'name': 'key',
'value': '{}'.format("{{ task_instance.xcom_pull(key='key') }}")
},
]
}
]
},
region_name='us-east-1',
launch_type='EC2',
dag=dag
)
message_reader >> write_output_ecs >> print_message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment