Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active April 18, 2023 00:46
Show Gist options
  • Save slopp/32a53f7f9edbf3822ac1ae80f52711e5 to your computer and use it in GitHub Desktop.
Save slopp/32a53f7f9edbf3822ac1ae80f52711e5 to your computer and use it in GitHub Desktop.
Push dagster run metadata to postgres

Push Metadata to External Postgres

Dagster maintains a metadata DB that allows users to filter, search, and take action on the status of past runs. However, in some cases it is useful to view this metadata outside of Dagster. This example shows how a scheduled Dagster job can be used to push the run metadata into another Postgres DB.

Get Started

To run this example:

  1. Create a Postgres DB to be the metadata target
docker pull postgres
docker run --name test-run-sensor -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres

Using your favorite Postgres client:

CREATE DATABASE postgres_metadata;
USE DATANBASE postgres_metadata;
CREATE TABLE runs (run_id TEXT PRIMARY KEY, job_name TEXT, status TEXT, updated_at TIMESTAMP, started_at TIMESTAMP, ended_at TIMESTAMP)
  1. Install the python dependencies for this example
pip install dagster pandas psycopg2-binary
  1. Run
dagster dev -f push_metadata_example.py

With Dagster running you can either turn on all of the schedules and then watch your Postgres DB sync the run status, or manually trigger runs and then run the push_metadata job to see the results.

Extensions

This example syncs some of the relevant run data, but you could also sync any of the data available on run_record.dgster_run including things like run tags or targetted assets.

from dagster import (
asset,
define_asset_job,
ScheduleDefinition,
Definitions,
OpExecutionContext,
RunsFilter,
op,
job,
)
import psycopg2
from datetime import datetime, timedelta
import random
import time
import pandas as pd
def _postgres():
"""Connect to postgres db, assumes"
USE DATABASE dagster_metadata; CREATE TABLE runs (run_id TEXT PRIMARY KEY, job_name TEXT, status TEXT, updated_at TIMESTAMP, started_at TIMESTAMP, ended_at TIMESTAMP)
"""
connection = psycopg2.connect(
user="postgres",
password="password",
host="localhost",
port="5432",
database="dagster_metadata",
)
cursor = connection.cursor()
return (connection, cursor)
def _add_to_pg_cursor(cursor, query, vars):
"""Fake and insecure way to run a postgres query for demo purposes ONLY"""
try:
# scary sql injection opportunity
cursor.execute(query, vars)
except (Exception, psycopg2.Error) as error:
print(f"Error logging {query} operation: {error}")
def _construct_query(run_id, job_name, status, updated_at, started_at, ended_at):
query = """
INSERT INTO runs (run_id, job_name, status, updated_at, started_at, ended_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (run_id) DO UPDATE
SET status = EXCLUDED.status, updated_at = EXCLUDED.updated_at, ended_at = EXCLUDED.ended_at
"""
vars = (run_id, job_name, status, updated_at, started_at, ended_at)
return (query, vars)
def _get_max_update_time(cursor):
query = "SELECT MAX(updated_at) FROM runs"
cursor.execute(query)
result = cursor.fetchone()
if result[0] is not None:
return pd.to_datetime(result[0])
# if the external metadata table is empty, start from a month ago
return datetime.now() - timedelta(days=31)
@op
def push_metadata(context: OpExecutionContext):
"""Pushes updates on Dagster run statuses to external postgres table"""
try:
connection, cursor = _postgres()
last_update = _get_max_update_time(cursor)
run_records = context.instance.get_run_records(
filters=RunsFilter(updated_after=pd.to_datetime(last_update))
)
context.log.debug(f"Potentially syncing {len(run_records)} records")
for run_record in run_records:
# optional ... filter out runs for the job_name "push_metadata" to exclude reporting on this job
updated_at = pd.to_datetime(run_record.update_timestamp)
status = run_record.dagster_run.status.value
run_id = run_record.dagster_run.run_id
job_name = run_record.dagster_run.job_name
started_at = datetime.fromtimestamp(run_record.start_time)
ended_at = run_record.end_time
if ended_at is not None:
ended_at = datetime.fromtimestamp(ended_at)
query, vars = _construct_query(
run_id, job_name, status, updated_at, started_at, ended_at
)
_add_to_pg_cursor(cursor, query, vars)
connection.commit()
finally:
if connection:
cursor.close()
connection.close()
@job
def push_metadata_job():
push_metadata()
@asset
def might_fail():
time.sleep(45)
if random.randint(0, 1):
raise Exception("Random error!")
job1 = define_asset_job(name="job1", selection=["might_fail"])
job2 = define_asset_job(name="job2", selection=["might_fail"])
defs = Definitions(
assets=[might_fail],
schedules=[
ScheduleDefinition(name="sched1", job=job1, cron_schedule="* * * * *"),
ScheduleDefinition(name="sched2", job=job2, cron_schedule="* * * * *"),
ScheduleDefinition(
name="pushmetadata", job=push_metadata_job, cron_schedule="* * * * *"
),
],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment