Created
October 27, 2020 02:57
-
-
Save asandeep/bc48f2b3af1f12259ba55d2af5ff5fca to your computer and use it in GitHub Desktop.
Flow to cleanup old prefect flow runs.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pendulum | |
import prefect | |
from dynaconf import settings | |
from prefect import schedules | |
from prefect.schedules import clocks | |
# Cron schedule to execute cleanup job. Currently set to run at 00:00 (UTC) | |
# every Sunday. | |
DATA_CLEANUP_SCHEDULE_CRON_STRING = "0 0 * * 0" | |
@prefect.task | |
def get_expired_flow_runs(): | |
"""Returns list of flow run ID that are expired as per retention policy.""" | |
logger = prefect.context.get("logger") | |
query = { | |
"query($updated_before: timestamptz)": { | |
""" | |
flow_run(where: { | |
_and: { | |
state: {_neq: "Scheduled"}, | |
updated: {_lt: $updated_before} | |
} | |
}) | |
""": { | |
"id" | |
} | |
} | |
} | |
retention_period = ( | |
pendulum.now("UTC") | |
.subtract(days=settings.RETENTION_DAYS) | |
.start_of("day") | |
.to_datetime_string() | |
) | |
client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT) | |
flow_runs = client.graphql( | |
query, variables={"updated_before": retention_period} | |
).data.flow_run | |
logger.info("Expired flow run count: %s", len(flow_runs)) | |
expired_flow_run_ids = [flow_run.id for flow_run in flow_runs] | |
logger.debug("Expired flow run ids: %s", expired_flow_run_ids) | |
return expired_flow_run_ids | |
@prefect.task | |
def delete_flow_run(flow_run_id): | |
""" | |
Hits a mutation using Prefect client to delete flow run associated with | |
given ID. | |
As per confirmation in this slack thread: | |
https://prefect-community.slack.com/?redir=%2Farchives%2FCL09KU1K7%2Fp1598535130019500, | |
it should be sufficient to delete just the flow run and database cascades | |
should take care of all the related objects. | |
Args: | |
flow_run_id: ID of flow run to be deleted. | |
""" | |
logger = prefect.context.get("logger") | |
mutation = """ | |
mutation($input: delete_flow_run_input!) { | |
delete_flow_run(input: $input) { | |
success | |
} | |
} | |
""" | |
client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT) | |
response = client.graphql( | |
mutation, variables=dict(input=dict(flow_run_id=flow_run_id)) | |
) | |
logger.debug("Flow id: %s deletion success: %s", flow_run_id, response.data) | |
with prefect.Flow( | |
"cleanup_expired_data", | |
schedule=schedules.Schedule( | |
clocks=[clocks.CronClock(DATA_CLEANUP_SCHEDULE_CRON_STRING)] | |
), | |
) as cleanup_expired_data: | |
# Prefect API currently doesn't support bulk deletion of flow runs. So, for | |
# now we do it one by one until below bug is resolved. | |
# @see: https://github.com/PrefectHQ/server/issues/62 | |
expired_flow_runs = get_expired_flow_runs() | |
delete_flow_run.map(expired_flow_runs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment