Created
October 31, 2021 16:31
-
-
Save adilkhash/597d91d0e5efe03a5758be9503d186c6 to your computer and use it in GitHub Desktop.
Trigger external dag via Airflow REST API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime as dt | |
from urllib.parse import urljoin | |
import requests | |
from airflow.models import DAG | |
from airflow.operators.python import PythonOperator | |
default_args = { | |
'owner': 'airflow', | |
'start_date': dt.datetime(2021, 1, 20), | |
} | |
username = 'airflow' | |
password = 'airflow' | |
BASE_URL = 'http://127.0.0.1:8080/api/v1/' | |
class Endpoints: | |
DAGS = urljoin(BASE_URL, 'dags') | |
DAG_RUNS = urljoin(BASE_URL, 'dags/{dag_id}/dagRuns') | |
CONFIG = urljoin(BASE_URL, 'config') | |
CONNECTIONS = urljoin(BASE_URL, 'connections') | |
PROVIDERS = urljoin(BASE_URL, 'providers') | |
def trigger_dag_run(context): | |
url = Endpoints.DAG_RUNS.format(dag_id='example_api_dag') | |
payload = { | |
'execution_date': f'{dt.datetime(2021, 10, 1, 0, 0, 0).isoformat()}Z', | |
} | |
response = requests.post( | |
url, | |
headers={ | |
'Content-Type': 'application/json', | |
}, | |
json=payload, | |
auth=(username, password), | |
) | |
print(response.text) | |
print(context) | |
with DAG( | |
dag_id='callback_dag', | |
schedule_interval='@daily', | |
default_args=default_args, | |
catchup=False, | |
on_success_callback=trigger_dag_run, | |
) as dag: | |
even_only = PythonOperator( | |
task_id='even_only', | |
python_callable=lambda: print("hello world"), | |
dag=dag, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment