Skip to content

Instantly share code, notes, and snippets.

@discdiver
Created August 2, 2022 18:10
Show Gist options
  • Save discdiver/58f89b5fe13c42df42ed43fdea4893ba to your computer and use it in GitHub Desktop.
Save discdiver/58f89b5fe13c42df42ed43fdea4893ba to your computer and use it in GitHub Desktop.
ex4.py
import random
from prefect import flow, task
@task(name="Get data from API", retries=4, retry_delay_seconds=2) # NEW ****
def call_unreliable_api():
choices = [{"data": 42}, "failure"]
res = random.choice(choices)
if res == "failure":
raise Exception("Our unreliable service failed")
else:
return res
@task(name="Add message to data") # NEW ****
def augment_data(data: dict, msg: str):
data["message"] = msg
return data
@task(name="Write results to database") # NEW ****
def write_results_to_database(data: dict):
print(f"Wrote {data} to database successfully!")
return "Success!"
@flow(name="Previously unreliable pipeline") # NEW ****
def pipeline4(msg: str):
api_result = call_unreliable_api()
augmented_data = augment_data(data=api_result, msg=msg)
write_results_to_database(augmented_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment