Skip to content

Instantly share code, notes, and snippets.

@msumit
Last active June 20, 2024 07:42
Show Gist options
  • Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
Airflow file sensor example
from airflow import DAG
from airflow.operators.sensors import S3KeySensor
from airflow.operators import BashOperator
from datetime import datetime, timedelta
yday = datetime.combine(datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'msumit',
'depends_on_past': False,
'start_date': yday,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_file_sensor', default_args=default_args, schedule_interval='@daily')
t1 = S3KeySensor(
task_id='s3_file_test',
poke_interval=0,
timeout=10,
soft_fail=True,
bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py',
bucket_name=None,
dag=dag)
t2 = BashOperator(
task_id='task2',
depends_on_past=False,
bash_command='echo a big hadoop job putting files on s3',
trigger_rule='all_failed',
dag=dag)
t3 = BashOperator(
task_id='task3',
depends_on_past=False,
bash_command='echo im next job using s3 files',
trigger_rule='all_done',
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
t3.set_upstream(t2)
@Kuvanil
Copy link

Kuvanil commented Jan 24, 2020

The Bucket_key in S3 is having a suffix (generated with timestamp) , now how to call using in S3KeySensor. I don't want to specify as None, as I'm keeping exceptions as well.
Also tried calling thru "*" (asterisk) at the end. But It's poking , but not linking to any path or file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment