Skip to content

Instantly share code, notes, and snippets.

@adilkhash
Created October 31, 2021 16:31
Show Gist options
  • Save adilkhash/597d91d0e5efe03a5758be9503d186c6 to your computer and use it in GitHub Desktop.
Save adilkhash/597d91d0e5efe03a5758be9503d186c6 to your computer and use it in GitHub Desktop.
Trigger external dag via Airflow REST API
import datetime as dt
from urllib.parse import urljoin
import requests
from airflow.models import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2021, 1, 20),
}
username = 'airflow'
password = 'airflow'
BASE_URL = 'http://127.0.0.1:8080/api/v1/'
class Endpoints:
DAGS = urljoin(BASE_URL, 'dags')
DAG_RUNS = urljoin(BASE_URL, 'dags/{dag_id}/dagRuns')
CONFIG = urljoin(BASE_URL, 'config')
CONNECTIONS = urljoin(BASE_URL, 'connections')
PROVIDERS = urljoin(BASE_URL, 'providers')
def trigger_dag_run(context):
url = Endpoints.DAG_RUNS.format(dag_id='example_api_dag')
payload = {
'execution_date': f'{dt.datetime(2021, 10, 1, 0, 0, 0).isoformat()}Z',
}
response = requests.post(
url,
headers={
'Content-Type': 'application/json',
},
json=payload,
auth=(username, password),
)
print(response.text)
print(context)
with DAG(
dag_id='callback_dag',
schedule_interval='@daily',
default_args=default_args,
catchup=False,
on_success_callback=trigger_dag_run,
) as dag:
even_only = PythonOperator(
task_id='even_only',
python_callable=lambda: print("hello world"),
dag=dag,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment