Created
August 22, 2016 20:59
-
-
Save thomas-a-neil/c56409976002739a3511948fc3e29d77 to your computer and use it in GitHub Desktop.
Sample DAG to download from S3, sleep, and reupload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.python_operator import PythonOperator | |
from datetime import datetime, timedelta | |
# default arguments for each task | |
default_args = { | |
'owner': 'nthomas', | |
'depends_on_past': False, | |
'start_date': datetime(2015, 6, 1), | |
'retries': 1, | |
'retry_delay': timedelta(minutes=1), | |
} | |
dag = DAG('test_s3_download', | |
default_args=default_args, | |
schedule_interval=None) # "schedule_interval=None" means this dag will only be run by external commands | |
TEST_BUCKET = '23andme-nthomas' | |
TEST_KEY = 'foo.txt' | |
LOCAL_FILE = '/Users/nthomas/scratch/pipeline/airflow/foo.txt' | |
# simple download task | |
def download_file(bucket, key, destination): | |
import boto3 | |
s3 = boto3.resource('s3') | |
s3.meta.client.download_file(bucket, key, destination) | |
# simple upload task | |
def upload_file(source, bucket, key): | |
import boto3 | |
s3 = boto3.resource('s3') | |
s3.Bucket(bucket).upload_file(source, key) | |
download_from_s3 = PythonOperator( | |
task_id='download_from_s3', | |
python_callable=download_file, | |
op_kwargs={'bucket': TEST_BUCKET, 'key': TEST_KEY, 'destination': LOCAL_FILE}, | |
dag=dag) | |
sleep_task = BashOperator( | |
task_id='sleep_for_1', | |
bash_command='sleep 1', | |
dag=dag) | |
upload_to_s3 = PythonOperator( | |
task_id='upload_to_s3', | |
python_callable=upload_file, | |
op_kwargs={'bucket': TEST_BUCKET, 'key': TEST_KEY, 'source': LOCAL_FILE}, | |
dag=dag) | |
download_from_s3.set_downstream(sleep_task) | |
sleep_task.set_downstream(upload_to_s3) |
@quencha: In that case you can follow any of the below methodes.
- Mount a common file system like glusterfs to all your workers, So that the file system is shared across workers.
- Use dedicated queue for your workers and mention that queue in your dag task. This way you only pushes certain tasks to single workers. You can find detailed explanation here.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Nice example script.
How would you perform something like this if you were using CeleryExecutor, where each task will run on a different machine?