Skip to content

Instantly share code, notes, and snippets.

@mik-laj
Created April 15, 2019 09:01
Show Gist options
  • Save mik-laj/0a2b1d02dfe63195b705eac70faf42ad to your computer and use it in GitHub Desktop.
Save mik-laj/0a2b1d02dfe63195b705eac70faf42ad to your computer and use it in GitHub Desktop.
from airflow.operators import dummy_operator
import shlex
from airflow import models
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils import dates
from airflow.operators import bash_operator
import datetime
PARAMS = {
"user.name": "pig",
"nameNode": "hdfs://localhost:8020",
"resourceManager": "localhost:8032",
"queueName": "default",
"examplesRoot": "examples",
"oozie.wf.application.path": "hdfs://localhost:8020/user/pig/examples/apps/shell",
"dataproc_cluster": "cluster-o2a",
"gcp_region": "europe-west3",
}
with models.DAG(
"test_fs_dag",
schedule_interval=None, # Change to suit your needs
start_date=dates.days_ago(0), # Change to suit your needs
) as dag:
fs_node = dummy_operator.DummyOperator(task_id="fs_node", trigger_rule="one_success")
mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-mkdir-1"),
),
task_id="mkdir",
)
delete_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-delete-1"),
),
task_id="delete_fs_0_mkdir",
)
delete_fs_1_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-delete-2"),
),
task_id="delete_fs_1_mkdir",
)
delete_fs_2_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-delete-3"),
),
task_id="delete_fs_2_mkdir",
)
delete_fs_3_delete = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -rm -r -skipTrash /user/pig/test-fs/test-delete-1"),
),
task_id="delete_fs_3_delete",
)
delete_fs_4_delete = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -rm -r /user/pig/test-fs/test-delete-2"),
),
task_id="delete_fs_4_delete",
)
delete_fs_5_delete = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -rm -r /user/pig/test-fs/test-delete-3"),
),
task_id="delete_fs_5_delete",
)
delete_fs_0_mkdir.set_downstream(delete_fs_1_mkdir)
delete_fs_1_mkdir.set_downstream(delete_fs_2_mkdir)
delete_fs_2_mkdir.set_downstream(delete_fs_3_delete)
delete_fs_3_delete.set_downstream(delete_fs_4_delete)
delete_fs_4_delete.set_downstream(delete_fs_5_delete)
move_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-move-1"),
),
task_id="move_fs_0_mkdir",
)
move_fs_1_move = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mv /user/pig/test-fs/test-move-1 /home/pig/test-fs/test-move-2"),
),
task_id="move_fs_1_move",
)
move_fs_0_mkdir.set_downstream(move_fs_1_move)
chmod_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-chmod-1"),
),
task_id="chmod_fs_0_mkdir",
)
chmod_fs_1_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-chmod-2"),
),
task_id="chmod_fs_1_mkdir",
)
chmod_fs_2_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-chmod-3"),
),
task_id="chmod_fs_2_mkdir",
)
chmod_fs_3_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-chmod-4"),
),
task_id="chmod_fs_3_mkdir",
)
chmod_fs_4_chmod = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chmod 777 /user/pig/test-fs/test-chmod-1"),
),
task_id="chmod_fs_4_chmod",
)
chmod_fs_5_chmod = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chmod 777 /user/pig/test-fs/test-chmod-2"),
),
task_id="chmod_fs_5_chmod",
)
chmod_fs_6_chmod = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chmod 777 /user/pig/test-fs/test-chmod-3"),
),
task_id="chmod_fs_6_chmod",
)
chmod_fs_7_chmod = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chmod -R 777 /user/pig/test-fs/test-chmod-4"),
),
task_id="chmod_fs_7_chmod",
)
chmod_fs_0_mkdir.set_downstream(chmod_fs_1_mkdir)
chmod_fs_1_mkdir.set_downstream(chmod_fs_2_mkdir)
chmod_fs_2_mkdir.set_downstream(chmod_fs_3_mkdir)
chmod_fs_3_mkdir.set_downstream(chmod_fs_4_chmod)
chmod_fs_4_chmod.set_downstream(chmod_fs_5_chmod)
chmod_fs_5_chmod.set_downstream(chmod_fs_6_chmod)
chmod_fs_6_chmod.set_downstream(chmod_fs_7_chmod)
touchz = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -touchz /user/pig/test-fs/test-touchz-1"),
),
task_id="touchz",
)
chgrp_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-chgrp-1"),
),
task_id="chgrp_fs_0_mkdir",
)
chgrp_fs_1_chgrp = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chgrp hadoop /user/pig/test-fs/test-chgrp-1"),
),
task_id="chgrp_fs_1_chgrp",
)
chgrp_fs_0_mkdir.set_downstream(chgrp_fs_1_chgrp)
setrep_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/pig/test-fs/test-setrep-1"),
),
task_id="setrep_fs_0_mkdir",
)
setrep_fs_1_setrep = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -setrep 2 /user/pig/test-fs/test-setrep-1"),
),
task_id="setrep_fs_1_setrep",
)
setrep_fs_0_mkdir.set_downstream(setrep_fs_1_setrep)
fail = bash_operator.BashOperator(bash_command="exit 1", task_id="fail", trigger_rule="one_failed")
fs_node.set_downstream(chmod_fs_0_mkdir)
mkdir.set_downstream(fail)
fs_node.set_downstream(chgrp_fs_0_mkdir)
chgrp_fs_1_chgrp.set_downstream(fail)
fs_node.set_downstream(move_fs_0_mkdir)
setrep_fs_1_setrep.set_downstream(fail)
fs_node.set_downstream(setrep_fs_0_mkdir)
touchz.set_downstream(fail)
fs_node.set_downstream(delete_fs_0_mkdir)
fs_node.set_downstream(mkdir)
fs_node.set_downstream(touchz)
move_fs_1_move.set_downstream(fail)
chmod_fs_7_chmod.set_downstream(fail)
delete_fs_5_delete.set_downstream(fail)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment