Created
March 30, 2023 19:39
-
-
Save lukeorland/cf21724b35f406417e51e8869e41696c to your computer and use it in GitHub Desktop.
Prefect Cloud 1.0 - fetch logs for flow run
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Retrieve all the logs for a flow run. | |
E.g. | |
FLOW_RUN_ID=adff99c1-7dcf-40c1-bd1d-1c026ad930cb \ | |
python \ | |
notes-synced/snippets/fetch_flow_run_logs.py \ | |
print_concatenated_log_messages \ | |
$FLOW_RUN_ID \ | |
> logs-flow-run-id-$FLOW_RUN_ID.txt | |
""" | |
from typing import List | |
import fire | |
import prefect | |
LIMIT = 200 | |
CLIENT = prefect.client.Client() | |
def fetch_some_logs(flow_run_id: str, limit: int, offset: int) -> List[dict]: | |
response = CLIENT.graphql( | |
query=""" | |
query { | |
log ( | |
where: { | |
flow_run_id: {_eq: "%s"} | |
} | |
limit: %s | |
offset: %s | |
) { | |
created | |
flow_run_id | |
id | |
info | |
is_audit_log | |
is_loaded_from_archive | |
level | |
message | |
name | |
object_id | |
object_table | |
task_run_id | |
} | |
}""" | |
% (flow_run_id, limit, offset), | |
raise_on_error=True, | |
) | |
result = response["data"]["log"] | |
return result | |
def fetch_all_logs(flow_run_id: str) -> List[dict]: | |
logs = [] | |
offset = 0 | |
try: | |
while True: | |
next_logs = fetch_some_logs(flow_run_id, LIMIT, offset) | |
if not next_logs: | |
# The list of logs is empty when the offset is past the last | |
# log. | |
break | |
logs.extend(next_logs) | |
offset += LIMIT | |
except prefect.exceptions.ClientError: | |
pass | |
return logs | |
def concatenate_log_messages(log_messages: List[dict]) -> str: | |
log_messages_concatenated = "\n".join( | |
[log["message"] for log in log_messages], | |
) | |
return log_messages_concatenated | |
def print_concatenated_log_messages(flow_run_id): | |
log_messages = fetch_all_logs(flow_run_id) | |
print(concatenate_log_messages(log_messages)) | |
if __name__ == "__main__": | |
fire.Fire() |
Author
lukeorland
commented
Mar 30, 2023
- You'll need an API key to auth with Prefect Cloud
- This script uses the fire Python package.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment