Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created October 15, 2024 20:31
Show Gist options
  • Save cnolanminich/335ab39f8931ae0342a67afcaad5c319 to your computer and use it in GitHub Desktop.
Save cnolanminich/335ab39f8931ae0342a67afcaad5c319 to your computer and use it in GitHub Desktop.
turn on or off schedules in bulk, or have schedules that expire
from dagster import Definitions, load_assets_from_modules, AssetSelection, define_asset_job, op, OpExecutionContext, job, asset, asset_check, ScheduleDefinition
import requests
from datetime import datetime
# can be modified to work with auth in dagster+
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed
# utility function to get all schedules for a code location
def get_all_schedules(graphql_endpoint, code_location_name):
# Define the GraphQL query
get_all_schedules_query = f"""
query SchedulesQuery {{
schedulesOrError(repositorySelector: {{ repositoryName: "__repository__", repositoryLocationName: "{code_location_name}" }}) {{
... on Schedules {{
results {{
id
name
cronSchedule
executionTimezone
tags{{
key
value
}}
scheduleState {{
id
status
}}
}}
}}
}}
}}
"""
# Send the request to the GraphQL endpoint
schedules_response = requests.post(
graphql_endpoint,
json={'query': get_all_schedules_query}
)
return schedules_response
# Dagster op (task) that turns on schedules for a code location if that schedule has a tag with a value of "asset_grouping_1"
@op
def turn_on_asset_grouping_1_schedules(context: OpExecutionContext) -> None:
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed
# Send the request to the GraphQL endpoint
schedules_response = get_all_schedules(graphql_endpoint, code_location_name)
# Filter the data
filtered_ids = [
item['name']
#item['id']
for item in schedules_response.json()['data']['schedulesOrError']['results']
if any(tag['key'] == 'schedule-type' and tag['value'] == 'asset_grouping_1' for tag in item.get('tags', []))
]
for schedule_name in filtered_ids:
activate_schedule_mutation = f"""
mutation StartSchedule {{
startSchedule(scheduleSelector: {{
repositoryName: "__repository__",
repositoryLocationName: "{code_location_name}",
scheduleName: "{schedule_name}"}}) {{
__typename
... on ScheduleStateResult {{
scheduleState {{
id
status
}}
}}
}}
}}
"""
# Send the request to the GraphQL endpoint
activate_schedules_response = requests.post(
graphql_endpoint,
json={'query': activate_schedule_mutation}
)
context.log.info(activate_schedules_response.json())
schedules_responses_updated = get_all_schedules(graphql_endpoint, code_location_name)
# Check for errors in the response
if schedules_response.status_code == 200:
context.log.info(f"GraphQL query successful, Schedules here: {schedules_responses_updated.json()}")
else:
context.log.error(f"GraphQL query failed: {schedules_responses_updated.text}")
#Dagster op (task) that turns off schedules for a code location if that schedule has a tag named "schedule-expiration" that is prior to today
@op
def turn_off_expired_schedules(context: OpExecutionContext) -> None:
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed
schedules_response = get_all_schedules(graphql_endpoint, code_location_name)
# Filter the data
filtered_ids = [
#item['name']
item['id']
for item in schedules_response.json()['data']['schedulesOrError']['results']
if any(tag['key'] == 'schedule-expiration' and datetime.strptime(tag['value'], '%Y-%m-%d').date() <= datetime.today().date() for tag in item.get('tags', []))
]
# Send the request to the GraphQL endpoint
schedules_response = get_all_schedules(graphql_endpoint, code_location_name)
context.log.info(f"All Schedules: {schedules_response.json()}")
for schedule_id in filtered_ids:
activate_schedule_mutation = f"""
mutation StopSchedule {{
stopRunningSchedule( id: "{schedule_id}") {{
__typename
... on ScheduleStateResult {{
scheduleState {{
id
status
}}
}}
... on PythonError {{
message
stack
}}
}}
}}
"""
# Send the request to the GraphQL endpoint
activate_schedules_response = requests.post(
graphql_endpoint,
json={'query': activate_schedule_mutation}
)
context.log.info(f"Updated Schedules: {schedule_id} {activate_schedules_response.json()}")
schedules_responses_updated = get_all_schedules(graphql_endpoint, code_location_name)
# Check for errors in the response
if schedules_response.status_code == 200:
context.log.info(f"GraphQL query successful: {schedules_responses_updated.json()}")
else:
context.log.error(f"GraphQL query failed: {schedules_responses_updated.text}")
@job
def turn_off_expired_schedules_job():
turn_off_expired_schedules()
@job
def turn_on_asset_grouping_1_schedules_job():
turn_on_asset_grouping_1_schedules()
#dummy asset to be a container for the scheduled jobs
@asset
def orders(context) -> None:
context.log.info("Checking the freshness of the average orders asset")
# schedule that will be turned ON with `turn_on_asset_grouping_1_schedules_job` and turned OFF with `turn_off_expired_schedules_job`
orders_schedule = ScheduleDefinition(
name="orders_schedule",
cron_schedule="*/10 * * * *",
tags={"schedule-type": "asset_grouping_1",
"schedule-expiration": "2024-10-14"},
job=define_asset_job(
"orders_job",
selection=AssetSelection.assets("orders"))
)
# schedule that will be turned ON with `turn_on_asset_grouping_1_schedules_job` and kept ON with `turn_off_expired_schedules_job`
orders_schedule_2 = ScheduleDefinition(
name="orders_schedule_2",
cron_schedule="* * * * *",
tags={"schedule-type": "asset_grouping_1",
"schedule-expiration": "2024-10-16"},
job=define_asset_job(
"orders_job_2",
selection=AssetSelection.assets("orders"))
)
defs = Definitions(
assets=[orders],
schedules=[orders_schedule, orders_schedule_2],
jobs=[turn_off_expired_schedules_job, turn_on_asset_grouping_1_schedules_job],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment