Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created January 24, 2023 02:57
Show Gist options
  • Save anna-geller/5c6dd40cf6c902773378d177e0684e34 to your computer and use it in GitHub Desktop.
Save anna-geller/5c6dd40cf6c902773378d177e0684e34 to your computer and use it in GitHub Desktop.
from airflow.hooks.base import BaseHook
import pendulum
import requests
import uuid
def get_prefect_event(context):
prefect_event = [
{
"id": str(uuid.uuid4()),
"occurred": pendulum.now("UTC").to_iso8601_string(),
"event": "airflow.dag",
"resource": {
"prefect.resource.id": f"airflow.dag.{context.get('task_instance').dag_id}",
"prefect.resource.role": "airflow",
},
"related": [
{
"prefect.resource.id": "airflow.dag.*",
"prefect.resource.role": "airflow",
}
],
"payload": {
"logical_date": context["ds"],
"log_url": context.get("task_instance").log_url,
"mark_success_url": context.get("task_instance").mark_success_url,
"dag": context.get("task_instance").dag_id,
"dag_run_id": context["run_id"],
"task": context["task_instance_key_str"],
"final_state": context.get("task_instance").state,
},
}
]
return prefect_event
def send_prefect_event_callback(context):
prefect_event = get_prefect_event(context)
conn = BaseHook.get_connection("prefect")
api_key = conn.password
api_url = conn.login
url = f"{api_url}/events"
requests.post(
url=url,
json=prefect_event,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment