Skip to content

Instantly share code, notes, and snippets.

View anna-geller's full-sized avatar
🎯
Focusing

Anna Geller anna-geller

🎯
Focusing
View GitHub Profile
from datetime import date
from prefect import flow
from prefect.deployments import run_deployment
@flow
def parent(
start_date: date = date(2022, 11, 1), # parametrized for backfills
end_date: date = date.today(),
deployment_name: str = "local-process",
from datetime import date
from prefect import flow
from flows.transformation.jaffle_shop.dbt_run_from_manifest import dbt_jaffle_shop
from flows.ingestion.ingest_jaffle_shop import raw_data_jaffle_shop
from flows.analytics.dashboards import dashboards
from flows.ml.sales_forecast import sales_forecast
@flow
def jaffle_shop_ingest_transform(
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
@flow
def run_dbt_job_flow():
trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials.load("default"), job_id=154217
from prefect import task, flow, get_run_logger
@task
def get_training_set():
return dict(data=21)
@task
def apply_ml_model(training_set):
from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner
import random
from typing import List
@task
def ingest():
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
import random
from typing import List
def ingest():
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
else:
return 42
from prefect import task, flow
from prefect import get_run_logger
from typing import Any
@task
def say_hi(user_name: str, question: str, answer: Any) -> None:
logger = get_run_logger()
logger.info("Hello from Prefect, %s! 👋", user_name)
logger.info("The answer to the %s question is %s! 🤖", question, answer)
###
### A complete description of a Prefect Deployment for flow 'parametrized'
###
name: dev
description: null
version: 77ecff5259bc026121213109401965ae
# The work queue that will handle this deployment's runs
work_queue_name: dev
tags: []
parameters: {}
from pathlib import Path
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.credentials import DbtCliProfile
def dbt(command: str = "dbt debug") -> None:
trigger_dbt_cli_command.with_options(name=command)(
command,
project_dir=Path(__file__)
prefect deployment run raw-data-jaffle-shop/local-process --param start_date="2022-11-11"