Skip to content

Instantly share code, notes, and snippets.

@mik-laj

mik-laj/a.py Secret

Created April 18, 2019 23:04
Show Gist options
  • Save mik-laj/3e1f6137153825d7a4477c70ed1ee3c8 to your computer and use it in GitHub Desktop.
Save mik-laj/3e1f6137153825d7a4477c70ed1ee3c8 to your computer and use it in GitHub Desktop.
import shlex
from airflow import models
from airflow.operators import dummy_operator
from o2a_libs.el_wf_functions import *
from airflow.utils import dates
import datetime
from airflow.operators import bash_operator
from airflow.utils.trigger_rule import TriggerRule
from o2a_libs.el_basic_functions import *
PARAMS = {
"user.name": "fs",
"nameNode": "hdfs://localhost:8020",
"resourceManager": "localhost:8032",
"queueName": "default",
"examplesRoot": "examples",
"oozie.wf.application.path": "hdfs://localhost:8020/user/fs/examples/apps/shell",
"dataproc_cluster": "oozie-o2a-2cpu",
"gcp_conn_id": "google_cloud_default",
"gcp_region": "europe-west3",
"gcp_uri_prefix": "gs://europe-west1-o2a-integratio-f690ede2-bucket/dags",
}
with models.DAG(
"test_fs_dag",
schedule_interval=None, # Change to suit your needs
start_date=dates.days_ago(0), # Change to suit your needs
) as dag:
fs_node = dummy_operator.DummyOperator(task_id="fs_node", trigger_rule="one_success")
mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/fs/examples/test-mkdir-1"),
),
task_id="mkdir",
)
delete_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/fs/examples/test-delete-1"),
),
task_id="delete_fs_0_mkdir",
)
delete_fs_1_delete = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -rm -r /user/fs/examples/test-delete-1"),
),
task_id="delete_fs_1_delete",
)
delete_fs_0_mkdir.set_downstream(delete_fs_1_delete)
move_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/fs/examples/test-move-1"),
),
task_id="move_fs_0_mkdir",
)
move_fs_1_move = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mv /user/fs/examples/test-move-1 /user/fs/examples/test-move-2"),
),
task_id="move_fs_1_move",
)
move_fs_0_mkdir.set_downstream(move_fs_1_move)
chmod_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/fs/examples/test-chmod-1"),
),
task_id="chmod_fs_0_mkdir",
)
chmod_fs_1_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/fs/examples/test-chmod-2"),
),
task_id="chmod_fs_1_mkdir",
)
chmod_fs_2_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/fs/examples/test-chmod-3"),
),
task_id="chmod_fs_2_mkdir",
)
chmod_fs_3_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/fs/examples/test-chmod-4"),
),
task_id="chmod_fs_3_mkdir",
)
chmod_fs_4_chmod = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chmod 777 /user/fs/examples/test-chmod-1"),
),
task_id="chmod_fs_4_chmod",
)
chmod_fs_5_chmod = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chmod 777 /user/fs/examples/test-chmod-2"),
),
task_id="chmod_fs_5_chmod",
)
chmod_fs_6_chmod = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chmod 777 /user/fs/examples/test-chmod-3"),
),
task_id="chmod_fs_6_chmod",
)
chmod_fs_7_chmod = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chmod -R 777 /user/fs/examples/test-chmod-4"),
),
task_id="chmod_fs_7_chmod",
)
chmod_fs_0_mkdir.set_downstream(chmod_fs_1_mkdir)
chmod_fs_1_mkdir.set_downstream(chmod_fs_2_mkdir)
chmod_fs_2_mkdir.set_downstream(chmod_fs_3_mkdir)
chmod_fs_3_mkdir.set_downstream(chmod_fs_4_chmod)
chmod_fs_4_chmod.set_downstream(chmod_fs_5_chmod)
chmod_fs_5_chmod.set_downstream(chmod_fs_6_chmod)
chmod_fs_6_chmod.set_downstream(chmod_fs_7_chmod)
touchz = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -touchz /user/fs/examples/test-touchz-1"),
),
task_id="touchz",
)
chgrp_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir /user/fs/examples/test-chgrp-1"),
),
task_id="chgrp_fs_0_mkdir",
)
chgrp_fs_1_chgrp = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -chgrp hadoop /user/fs/examples/test-chgrp-1"),
),
task_id="chgrp_fs_1_chgrp",
)
chgrp_fs_0_mkdir.set_downstream(chgrp_fs_1_chgrp)
fs_node.set_downstream(chmod_fs_0_mkdir)
fs_node.set_downstream(chgrp_fs_0_mkdir)
fs_node.set_downstream(touchz)
fs_node.set_downstream(mkdir)
fs_node.set_downstream(move_fs_0_mkdir)
fs_node.set_downstream(delete_fs_0_mkdir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment