|
from dagster import ( |
|
asset, |
|
define_asset_job, |
|
ScheduleDefinition, |
|
Definitions, |
|
OpExecutionContext, |
|
RunsFilter, |
|
op, |
|
job, |
|
) |
|
import psycopg2 |
|
from datetime import datetime, timedelta |
|
import random |
|
import time |
|
import pandas as pd |
|
|
|
|
|
def _postgres(): |
|
"""Connect to postgres db, assumes" |
|
USE DATABASE dagster_metadata; CREATE TABLE runs (run_id TEXT PRIMARY KEY, job_name TEXT, status TEXT, updated_at TIMESTAMP, started_at TIMESTAMP, ended_at TIMESTAMP) |
|
""" |
|
connection = psycopg2.connect( |
|
user="postgres", |
|
password="password", |
|
host="localhost", |
|
port="5432", |
|
database="dagster_metadata", |
|
) |
|
cursor = connection.cursor() |
|
return (connection, cursor) |
|
|
|
|
|
def _add_to_pg_cursor(cursor, query, vars): |
|
"""Fake and insecure way to run a postgres query for demo purposes ONLY""" |
|
try: |
|
# scary sql injection opportunity |
|
cursor.execute(query, vars) |
|
|
|
except (Exception, psycopg2.Error) as error: |
|
print(f"Error logging {query} operation: {error}") |
|
|
|
|
|
def _construct_query(run_id, job_name, status, updated_at, started_at, ended_at): |
|
query = """ |
|
INSERT INTO runs (run_id, job_name, status, updated_at, started_at, ended_at) |
|
VALUES (%s, %s, %s, %s, %s, %s) |
|
ON CONFLICT (run_id) DO UPDATE |
|
SET status = EXCLUDED.status, updated_at = EXCLUDED.updated_at, ended_at = EXCLUDED.ended_at |
|
""" |
|
vars = (run_id, job_name, status, updated_at, started_at, ended_at) |
|
return (query, vars) |
|
|
|
|
|
def _get_max_update_time(cursor): |
|
query = "SELECT MAX(updated_at) FROM runs" |
|
cursor.execute(query) |
|
result = cursor.fetchone() |
|
if result[0] is not None: |
|
return pd.to_datetime(result[0]) |
|
|
|
# if the external metadata table is empty, start from a month ago |
|
return datetime.now() - timedelta(days=31) |
|
|
|
|
|
@op |
|
def push_metadata(context: OpExecutionContext): |
|
"""Pushes updates on Dagster run statuses to external postgres table""" |
|
try: |
|
connection, cursor = _postgres() |
|
|
|
last_update = _get_max_update_time(cursor) |
|
|
|
run_records = context.instance.get_run_records( |
|
filters=RunsFilter(updated_after=pd.to_datetime(last_update)) |
|
) |
|
|
|
context.log.debug(f"Potentially syncing {len(run_records)} records") |
|
|
|
for run_record in run_records: |
|
|
|
# optional ... filter out runs for the job_name "push_metadata" to exclude reporting on this job |
|
|
|
updated_at = pd.to_datetime(run_record.update_timestamp) |
|
status = run_record.dagster_run.status.value |
|
run_id = run_record.dagster_run.run_id |
|
job_name = run_record.dagster_run.job_name |
|
started_at = datetime.fromtimestamp(run_record.start_time) |
|
ended_at = run_record.end_time |
|
|
|
if ended_at is not None: |
|
ended_at = datetime.fromtimestamp(ended_at) |
|
|
|
query, vars = _construct_query( |
|
run_id, job_name, status, updated_at, started_at, ended_at |
|
) |
|
_add_to_pg_cursor(cursor, query, vars) |
|
|
|
connection.commit() |
|
|
|
finally: |
|
if connection: |
|
cursor.close() |
|
connection.close() |
|
|
|
|
|
@job |
|
def push_metadata_job(): |
|
push_metadata() |
|
|
|
|
|
@asset |
|
def might_fail(): |
|
time.sleep(45) |
|
if random.randint(0, 1): |
|
raise Exception("Random error!") |
|
|
|
|
|
job1 = define_asset_job(name="job1", selection=["might_fail"]) |
|
job2 = define_asset_job(name="job2", selection=["might_fail"]) |
|
|
|
|
|
defs = Definitions( |
|
assets=[might_fail], |
|
schedules=[ |
|
ScheduleDefinition(name="sched1", job=job1, cron_schedule="* * * * *"), |
|
ScheduleDefinition(name="sched2", job=job2, cron_schedule="* * * * *"), |
|
ScheduleDefinition( |
|
name="pushmetadata", job=push_metadata_job, cron_schedule="* * * * *" |
|
), |
|
], |
|
) |