Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created October 31, 2022 12:03
Show Gist options
  • Save anna-geller/117dfef25f015537e24f8747a91bdcd3 to your computer and use it in GitHub Desktop.
Save anna-geller/117dfef25f015537e24f8747a91bdcd3 to your computer and use it in GitHub Desktop.
from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner
import random
from typing import List
@task
def ingest():
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
else:
return 42
@task
def transform(x: int) -> int:
return x * 42
@task
def load(x):
print(x)
@task
def clean_up():
logger = get_run_logger()
logger.info("Cleaning up 🧹")
@task
def refresh(dashboard: str):
logger = get_run_logger()
logger.info("Refreshing %s 📊", dashboard)
@task
def get_dashboards():
return ["sales", "growth", "margins", "COGS", "visitors"]
@flow
def refresh_bi_dashboards(dashboards: List[str]):
refresh.map(dashboards)
@flow(task_runner=SequentialTaskRunner())
def orchestrate():
data = ingest.submit()
final = transform.submit(data)
load_state = load.submit(final, return_state=True)
# update dashboards only if ETL completed without errors
if load_state.name == "Completed":
dashboards = get_dashboards.submit()
refresh_bi_dashboards(dashboards)
clean_up.submit() # runs always at the very end
if __name__ == "__main__":
orchestrate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment