Created
October 15, 2024 20:31
-
-
Save cnolanminich/335ab39f8931ae0342a67afcaad5c319 to your computer and use it in GitHub Desktop.
turn on or off schedules in bulk, or have schedules that expire
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import Definitions, load_assets_from_modules, AssetSelection, define_asset_job, op, OpExecutionContext, job, asset, asset_check, ScheduleDefinition | |
import requests | |
from datetime import datetime | |
# can be modified to work with auth in dagster+ | |
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed | |
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed | |
# utility function to get all schedules for a code location | |
def get_all_schedules(graphql_endpoint, code_location_name): | |
# Define the GraphQL query | |
get_all_schedules_query = f""" | |
query SchedulesQuery {{ | |
schedulesOrError(repositorySelector: {{ repositoryName: "__repository__", repositoryLocationName: "{code_location_name}" }}) {{ | |
... on Schedules {{ | |
results {{ | |
id | |
name | |
cronSchedule | |
executionTimezone | |
tags{{ | |
key | |
value | |
}} | |
scheduleState {{ | |
id | |
status | |
}} | |
}} | |
}} | |
}} | |
}} | |
""" | |
# Send the request to the GraphQL endpoint | |
schedules_response = requests.post( | |
graphql_endpoint, | |
json={'query': get_all_schedules_query} | |
) | |
return schedules_response | |
# Dagster op (task) that turns on schedules for a code location if that schedule has a tag with a value of "asset_grouping_1" | |
@op | |
def turn_on_asset_grouping_1_schedules(context: OpExecutionContext) -> None: | |
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed | |
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed | |
# Send the request to the GraphQL endpoint | |
schedules_response = get_all_schedules(graphql_endpoint, code_location_name) | |
# Filter the data | |
filtered_ids = [ | |
item['name'] | |
#item['id'] | |
for item in schedules_response.json()['data']['schedulesOrError']['results'] | |
if any(tag['key'] == 'schedule-type' and tag['value'] == 'asset_grouping_1' for tag in item.get('tags', [])) | |
] | |
for schedule_name in filtered_ids: | |
activate_schedule_mutation = f""" | |
mutation StartSchedule {{ | |
startSchedule(scheduleSelector: {{ | |
repositoryName: "__repository__", | |
repositoryLocationName: "{code_location_name}", | |
scheduleName: "{schedule_name}"}}) {{ | |
__typename | |
... on ScheduleStateResult {{ | |
scheduleState {{ | |
id | |
status | |
}} | |
}} | |
}} | |
}} | |
""" | |
# Send the request to the GraphQL endpoint | |
activate_schedules_response = requests.post( | |
graphql_endpoint, | |
json={'query': activate_schedule_mutation} | |
) | |
context.log.info(activate_schedules_response.json()) | |
schedules_responses_updated = get_all_schedules(graphql_endpoint, code_location_name) | |
# Check for errors in the response | |
if schedules_response.status_code == 200: | |
context.log.info(f"GraphQL query successful, Schedules here: {schedules_responses_updated.json()}") | |
else: | |
context.log.error(f"GraphQL query failed: {schedules_responses_updated.text}") | |
#Dagster op (task) that turns off schedules for a code location if that schedule has a tag named "schedule-expiration" that is prior to today | |
@op | |
def turn_off_expired_schedules(context: OpExecutionContext) -> None: | |
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed | |
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed | |
schedules_response = get_all_schedules(graphql_endpoint, code_location_name) | |
# Filter the data | |
filtered_ids = [ | |
#item['name'] | |
item['id'] | |
for item in schedules_response.json()['data']['schedulesOrError']['results'] | |
if any(tag['key'] == 'schedule-expiration' and datetime.strptime(tag['value'], '%Y-%m-%d').date() <= datetime.today().date() for tag in item.get('tags', [])) | |
] | |
# Send the request to the GraphQL endpoint | |
schedules_response = get_all_schedules(graphql_endpoint, code_location_name) | |
context.log.info(f"All Schedules: {schedules_response.json()}") | |
for schedule_id in filtered_ids: | |
activate_schedule_mutation = f""" | |
mutation StopSchedule {{ | |
stopRunningSchedule( id: "{schedule_id}") {{ | |
__typename | |
... on ScheduleStateResult {{ | |
scheduleState {{ | |
id | |
status | |
}} | |
}} | |
... on PythonError {{ | |
message | |
stack | |
}} | |
}} | |
}} | |
""" | |
# Send the request to the GraphQL endpoint | |
activate_schedules_response = requests.post( | |
graphql_endpoint, | |
json={'query': activate_schedule_mutation} | |
) | |
context.log.info(f"Updated Schedules: {schedule_id} {activate_schedules_response.json()}") | |
schedules_responses_updated = get_all_schedules(graphql_endpoint, code_location_name) | |
# Check for errors in the response | |
if schedules_response.status_code == 200: | |
context.log.info(f"GraphQL query successful: {schedules_responses_updated.json()}") | |
else: | |
context.log.error(f"GraphQL query failed: {schedules_responses_updated.text}") | |
@job | |
def turn_off_expired_schedules_job(): | |
turn_off_expired_schedules() | |
@job | |
def turn_on_asset_grouping_1_schedules_job(): | |
turn_on_asset_grouping_1_schedules() | |
#dummy asset to be a container for the scheduled jobs | |
@asset | |
def orders(context) -> None: | |
context.log.info("Checking the freshness of the average orders asset") | |
# schedule that will be turned ON with `turn_on_asset_grouping_1_schedules_job` and turned OFF with `turn_off_expired_schedules_job` | |
orders_schedule = ScheduleDefinition( | |
name="orders_schedule", | |
cron_schedule="*/10 * * * *", | |
tags={"schedule-type": "asset_grouping_1", | |
"schedule-expiration": "2024-10-14"}, | |
job=define_asset_job( | |
"orders_job", | |
selection=AssetSelection.assets("orders")) | |
) | |
# schedule that will be turned ON with `turn_on_asset_grouping_1_schedules_job` and kept ON with `turn_off_expired_schedules_job` | |
orders_schedule_2 = ScheduleDefinition( | |
name="orders_schedule_2", | |
cron_schedule="* * * * *", | |
tags={"schedule-type": "asset_grouping_1", | |
"schedule-expiration": "2024-10-16"}, | |
job=define_asset_job( | |
"orders_job_2", | |
selection=AssetSelection.assets("orders")) | |
) | |
defs = Definitions( | |
assets=[orders], | |
schedules=[orders_schedule, orders_schedule_2], | |
jobs=[turn_off_expired_schedules_job, turn_on_asset_grouping_1_schedules_job], | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment