Created
August 23, 2024 21:13
-
-
Save cnolanminich/9b967c6effb8df6ff13b0494291213db to your computer and use it in GitHub Desktop.
sensor that checks multiple snowflake tables and ensures they're all
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import ( | |
sensor, | |
EventLogEntry, | |
RunRequest, | |
SensorEvaluationContext, | |
SkipReason, | |
) | |
from datetime import datetime | |
# this would be your job | |
from hooli_data_eng.jobs import predict_job | |
import json | |
# You'll need a SnowflakeResource defined in your definitions. | |
# Define the Snowflake tables you want to monitor | |
TABLE_SCHEMA = "RAW_DATA" | |
table_names = ["LOCATIONS", "ORDERS", "USERS"] | |
asset_keys = [AssetKey(table_name) for table_name in table_names] | |
# returns true if dict1 is | |
def _is_all_newer(context, previous_result, current_result): | |
for key in previous_result: | |
if key not in current_result or current_result[key] <= previous_result[key]: | |
context.log.info(f"Table {key} is not updated") | |
return False | |
return True | |
@sensor( | |
job=predict_job, | |
) | |
def snowflake_table_sensor(context: SensorEvaluationContext, | |
snowflake: SnowflakeResource): | |
with snowflake.get_connection() as conn: | |
freshness_results = fetch_last_updated_timestamps( | |
snowflake_connection=conn, | |
tables=table_names, | |
schema=TABLE_SCHEMA, | |
) | |
previous_freshness_results = json.loads(context.cursor) if context.cursor else {} | |
#_make_dict_value_to_date(previous_freshness_results) | |
freshness_results = {table: timestamp.strftime("%Y-%m-%d %H:%M:%S") for table, timestamp in freshness_results.items()} | |
context.log.info(f"Previous freshness results: {previous_freshness_results}") | |
context.log.info(f"Current freshness results: {freshness_results}") | |
tables_fresher = _is_all_newer(context, previous_freshness_results, freshness_results) | |
context.log.info(f"Tables are fresher: {tables_fresher}") | |
if not tables_fresher: | |
return SkipReason("Not all tables are updated") | |
context.update_cursor(json.dumps(freshness_results)) | |
return RunRequest(run_key=context.cursor) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment