Skip to content

Instantly share code, notes, and snippets.

@nehiljain
Created June 14, 2018 14:06
Show Gist options
  • Save nehiljain/6dace5faccb680653f7ea4d5d5273946 to your computer and use it in GitHub Desktop.
Save nehiljain/6dace5faccb680653f7ea4d5d5273946 to your computer and use it in GitHub Desktop.
Code Sample for Airflow II blog
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.s3_key_sensor import S3KeySensor
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
schedule = timedelta(minutes=5)
args = {
 'owner': 'airflow',
 'start_date': days_ago(1),
 'depends_on_past': False,
}
dag = DAG(
 dag_id='s3_key_sensor_demo_dag',
 schedule_interval=schedule, 
 default_args=args
)
def new_file_detection(**kwargs):
 print("A new file has arrived in s3 bucket")
 
file_sensor = S3KeySensor(
 task_id='s3_key_sensor_task',
 poke_interval=60 * 30, # (seconds); checking file every half an hour
 timeout=60 * 60 * 12, # timeout in 12 hours
 bucket_key="s3://[bucket_name]/[key]",
 bucket_name=None,
 wildcard_match=False,
 dag=dag)
print_message = PythonOperator(task_id='print_message',
 provide_context=True,
 python_callable=new_file_detection,
 dag=dag)
 
file_sensor >> print_message
@sameerrao20118
Copy link

Hi Nehil,
Thanks for Sharing this, could you please help me with the following doubts that i have.

  1. bucket_key="s3://[bucket_name]/[key]",
    can i monitor multiple folders within a bucket ,
  2. If i have to monitor multiple folders how do i manage dependencies, eg my next action should trigger only after all folders have files posted within them.

@cosbor11
Copy link

How do we print the file name of the file that the S3KeySensor found?

@premanuj
Copy link

premanuj commented Feb 2, 2022

@sameerrao20118 Did you found your solution? If yes, colud you please share how you solved it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment