Skip to content

Instantly share code, notes, and snippets.

@discdiver
Created August 2, 2022 15:17
Show Gist options
  • Save discdiver/aa8d2991b29e9341088ba00e90c7c83c to your computer and use it in GitHub Desktop.
Save discdiver/aa8d2991b29e9341088ba00e90c7c83c to your computer and use it in GitHub Desktop.
import random
def call_unreliable_api():
choices = [{"data": 42}, "failure"]
res = random.choice(choices)
if res == "failure":
raise Exception("Our unreliable service failed")
else:
return res
def augment_data(data: dict, msg: str):
data["message"] = msg
return data
def write_results_to_database(data: dict):
print(f"Wrote {data} to database successfully!")
return "Success!"
def pipeline1(msg: str):
api_result = call_unreliable_api()
augmented_data = augment_data(data=api_result, msg=msg)
write_results_to_database(augmented_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment