Skip to content

Instantly share code, notes, and snippets.

@nehiljain
Created June 14, 2018 14:06
Show Gist options
  • Save nehiljain/6dace5faccb680653f7ea4d5d5273946 to your computer and use it in GitHub Desktop.
Save nehiljain/6dace5faccb680653f7ea4d5d5273946 to your computer and use it in GitHub Desktop.
Code Sample for Airflow II blog
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.s3_key_sensor import S3KeySensor
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
schedule = timedelta(minutes=5)
args = {
 'owner': 'airflow',
 'start_date': days_ago(1),
 'depends_on_past': False,
}
dag = DAG(
 dag_id='s3_key_sensor_demo_dag',
 schedule_interval=schedule, 
 default_args=args
)
def new_file_detection(**kwargs):
 print("A new file has arrived in s3 bucket")
 
file_sensor = S3KeySensor(
 task_id='s3_key_sensor_task',
 poke_interval=60 * 30, # (seconds); checking file every half an hour
 timeout=60 * 60 * 12, # timeout in 12 hours
 bucket_key="s3://[bucket_name]/[key]",
 bucket_name=None,
 wildcard_match=False,
 dag=dag)
print_message = PythonOperator(task_id='print_message',
 provide_context=True,
 python_callable=new_file_detection,
 dag=dag)
 
file_sensor >> print_message
@premanuj
Copy link

premanuj commented Feb 2, 2022

@sameerrao20118 Did you found your solution? If yes, colud you please share how you solved it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment