Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created August 23, 2024 21:13
Show Gist options
  • Save cnolanminich/9b967c6effb8df6ff13b0494291213db to your computer and use it in GitHub Desktop.
Save cnolanminich/9b967c6effb8df6ff13b0494291213db to your computer and use it in GitHub Desktop.
sensor that checks multiple snowflake tables and ensures they're all
from dagster import (
sensor,
EventLogEntry,
RunRequest,
SensorEvaluationContext,
SkipReason,
)
from datetime import datetime
# this would be your job
from hooli_data_eng.jobs import predict_job
import json
# You'll need a SnowflakeResource defined in your definitions.
# Define the Snowflake tables you want to monitor
TABLE_SCHEMA = "RAW_DATA"
table_names = ["LOCATIONS", "ORDERS", "USERS"]
asset_keys = [AssetKey(table_name) for table_name in table_names]
# returns true if dict1 is
def _is_all_newer(context, previous_result, current_result):
for key in previous_result:
if key not in current_result or current_result[key] <= previous_result[key]:
context.log.info(f"Table {key} is not updated")
return False
return True
@sensor(
job=predict_job,
)
def snowflake_table_sensor(context: SensorEvaluationContext,
snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
freshness_results = fetch_last_updated_timestamps(
snowflake_connection=conn,
tables=table_names,
schema=TABLE_SCHEMA,
)
previous_freshness_results = json.loads(context.cursor) if context.cursor else {}
#_make_dict_value_to_date(previous_freshness_results)
freshness_results = {table: timestamp.strftime("%Y-%m-%d %H:%M:%S") for table, timestamp in freshness_results.items()}
context.log.info(f"Previous freshness results: {previous_freshness_results}")
context.log.info(f"Current freshness results: {freshness_results}")
tables_fresher = _is_all_newer(context, previous_freshness_results, freshness_results)
context.log.info(f"Tables are fresher: {tables_fresher}")
if not tables_fresher:
return SkipReason("Not all tables are updated")
context.update_cursor(json.dumps(freshness_results))
return RunRequest(run_key=context.cursor)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment