Skip to content

Instantly share code, notes, and snippets.

@discdiver
Last active August 2, 2022 18:09
Show Gist options
  • Save discdiver/a6739a9d4bb1d26f5b10d67b07bca4c2 to your computer and use it in GitHub Desktop.
Save discdiver/a6739a9d4bb1d26f5b10d67b07bca4c2 to your computer and use it in GitHub Desktop.
ex3.py
import random
from prefect import flow, task # NEW ****
@task # NEW ****
def call_unreliable_api():
choices = [{"data": 42}, "failure"]
res = random.choice(choices)
if res == "failure":
raise Exception("Our unreliable service failed")
else:
return res
@task # NEW ****
def augment_data(data: dict, msg: str):
data["message"] = msg
return data
@task # NEW ****
def write_results_to_database(data: dict):
print(f"Wrote {data} to database successfully!")
return "Success!"
@flow
def pipeline3(msg: str):
api_result = call_unreliable_api()
augmented_data = augment_data(data=api_result, msg=msg)
write_results_to_database(augmented_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment