Skip to content

Instantly share code, notes, and snippets.

@msumit
Last active June 20, 2024 07:42
Show Gist options
  • Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
Airflow file sensor example
from airflow import DAG
from airflow.operators.sensors import S3KeySensor
from airflow.operators import BashOperator
from datetime import datetime, timedelta
yday = datetime.combine(datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'msumit',
'depends_on_past': False,
'start_date': yday,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_file_sensor', default_args=default_args, schedule_interval='@daily')
t1 = S3KeySensor(
task_id='s3_file_test',
poke_interval=0,
timeout=10,
soft_fail=True,
bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py',
bucket_name=None,
dag=dag)
t2 = BashOperator(
task_id='task2',
depends_on_past=False,
bash_command='echo a big hadoop job putting files on s3',
trigger_rule='all_failed',
dag=dag)
t3 = BashOperator(
task_id='task3',
depends_on_past=False,
bash_command='echo im next job using s3 files',
trigger_rule='all_done',
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
t3.set_upstream(t2)
@anilkulkarni87
Copy link

When are we setting the S3Connection. We use Temperory credentials. Wondering how can we do that in realtime in a DAG.

@RahulJupelly
Copy link

Hi Sumit, Can you please explain little about "bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py'"

@tonyofleon
Copy link

I'm trying to use this, but it only works for my buckets in west region, for my buckets in East I get S3ResponseError: 400 bad request. Any workaround for this?

Thanks.

@msumit
Copy link
Author

msumit commented Apr 10, 2019

@rublinetsky it's a sample code, so the file might not exist there or you won't have access to that.
@anilkulkarni87 I guess you can provide extra information while setting up the default s3 connection with role & external_id and boto should take care of that.
@RahulJupelly that's the name of a file I'm sensing for in S3.
@tonyofleon can't say for sure, but it generally happens due version of certificate S3 regions expects, i.e. v2 or v4.

@Kuvanil
Copy link

Kuvanil commented Jan 24, 2020

The Bucket_key in S3 is having a suffix (generated with timestamp) , now how to call using in S3KeySensor. I don't want to specify as None, as I'm keeping exceptions as well.
Also tried calling thru "*" (asterisk) at the end. But It's poking , but not linking to any path or file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment