-
-
Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
from airflow import DAG | |
from airflow.operators.sensors import S3KeySensor | |
from airflow.operators import BashOperator | |
from datetime import datetime, timedelta | |
yday = datetime.combine(datetime.today() - timedelta(1), | |
datetime.min.time()) | |
default_args = { | |
'owner': 'msumit', | |
'depends_on_past': False, | |
'start_date': yday, | |
'email': ['[email protected]'], | |
'email_on_failure': False, | |
'email_on_retry': False, | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5) | |
} | |
dag = DAG('s3_file_sensor', default_args=default_args, schedule_interval='@daily') | |
t1 = S3KeySensor( | |
task_id='s3_file_test', | |
poke_interval=0, | |
timeout=10, | |
soft_fail=True, | |
bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py', | |
bucket_name=None, | |
dag=dag) | |
t2 = BashOperator( | |
task_id='task2', | |
depends_on_past=False, | |
bash_command='echo a big hadoop job putting files on s3', | |
trigger_rule='all_failed', | |
dag=dag) | |
t3 = BashOperator( | |
task_id='task3', | |
depends_on_past=False, | |
bash_command='echo im next job using s3 files', | |
trigger_rule='all_done', | |
dag=dag) | |
t2.set_upstream(t1) | |
t3.set_upstream(t1) | |
t3.set_upstream(t2) |
Thank you for this. One minor thing: I think seven_days_ago
should be yesterday
@Anmolk7 I think for that you can extend the BaseSensor
and write up poke
method with some simple python code to return True
/False
on the basis of presence of the file/s
@owenmyerscsm nice catch :)
Sumit, I am trying to run this example and I am getting the error:
ssl.CertificateError: hostname u'dev.canopydata.com.s3.amazonaws.com' doesn't match either of '*.s3.amazonaws.com', 's3.amazonaws.com'
This is a known issue with bucket names that include dots. I tried one known work-around (adding "calling_format": "boto.s3.connection.OrdinaryCallingFormat" to the connection), but it did not help - the certificate mismatch problem goes away, but now I am getting "301 Moved Permanently" message.
Which versions of Boto and Python are you using? My freshly installed development airflow runs on python 2.7.10 and has boto-2.48.0 installed.
When are we setting the S3Connection. We use Temperory credentials. Wondering how can we do that in realtime in a DAG.
Hi Sumit, Can you please explain little about "bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py'"
I'm trying to use this, but it only works for my buckets in west region, for my buckets in East I get S3ResponseError: 400 bad request. Any workaround for this?
Thanks.
@rublinetsky it's a sample code, so the file might not exist there or you won't have access to that.
@anilkulkarni87 I guess you can provide extra information while setting up the default s3 connection with role & external_id and boto should take care of that.
@RahulJupelly that's the name of a file I'm sensing for in S3.
@tonyofleon can't say for sure, but it generally happens due version of certificate S3 regions expects, i.e. v2 or v4.
The Bucket_key in S3 is having a suffix (generated with timestamp) , now how to call using in S3KeySensor. I don't want to specify as None, as I'm keeping exceptions as well.
Also tried calling thru "*" (asterisk) at the end. But It's poking , but not linking to any path or file.
what about sensing files on local drive on local host?