Skip to content

Instantly share code, notes, and snippets.

@cemdrk
Last active August 5, 2024 08:58
Show Gist options
  • Save cemdrk/80fb9620959c7e6a247ca8eb89c84314 to your computer and use it in GitHub Desktop.
Save cemdrk/80fb9620959c7e6a247ca8eb89c84314 to your computer and use it in GitHub Desktop.
Airflow Dag To Run Node JS Jobs
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.decorators import dag, task
default_args = {
"owner": "me",
"start_date": days_ago(1),
}
DOCKER_IMG = "<your_docker_img>"
K8S_NAMESPACE = "airflow"
JOB_NAME = "<your_js_job_name>"
JS_RUN_FILE = "<js_file_path_in_docker>"
SCHEDULE_INTERVAL = Variable.get(f"{JOB_NAME}_schedule", default_var=None)
JS_RUN_PARAMS_VAR = Variable.get(f"{JOB_NAME}_params", default_var="{}")
@task
def run_js_file(**context):
# use manuel trigger conf if provided
JS_RUN_PARAMS_CONF = context["dag_run"].conf.get("JS_RUN_PARAMS")
JS_RUN_PARAMS = JS_RUN_PARAMS_CONF if JS_RUN_PARAMS_CONF else JS_RUN_PARAMS_VAR
k8s = KubernetesPodOperator(
namespace=K8S_NAMESPACE,
image=DOCKER_IMG,
cmds=["node", JS_RUN_FILE],
arguments=[JS_RUN_PARAMS],
name=JOB_NAME,
task_id=f"run_{JOB_NAME}",
in_cluster=True,
is_delete_operator_pod=False,
get_logs=True,
image_pull_secrets=[{"name": "airflow-registry"}],
)
k8s.execute(context)
@dag(
schedule=SCHEDULE_INTERVAL,
default_args=default_args,
dag_id=JOB_NAME,
catchup=False,
params={
"JS_RUN_PARAMS": None,
}
)
def dag_func():
run_js_file()
dag_func()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment