Skip to content

Instantly share code, notes, and snippets.

View anna-geller's full-sized avatar
🎯
Focusing

Anna Geller anna-geller

🎯
Focusing
View GitHub Profile
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
import time
@task
def extract() -> list:
logger = get_run_logger()
logger.info("extract")
return [1, 2, 3, 4, 5, 6]
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
# Define some tasks for us to run in our flow
@task
def extract() -> list:
logger = get_run_logger()
logger.info("extract")
return [1, 2, 3, 4, 5, 6]
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
# Define some tasks for us to run in our flow
@task
def extract() -> list:
logger = get_run_logger()
logger.info("extract")
return [1, 2, 3, 4, 5, 6]
import asyncio
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket
@flow
async def aws_s3_bucket():
aws_creds = await AwsCredentials.load("default")
s3_bucket = S3Bucket(
###
### A complete description of a Prefect Deployment for flow 'taxi-data'
###
name: yellow
description: null
version: 130746a12ffa6ed31ffe6f21fa338fcc
# The work queue that will handle this deployment's runs
work_queue_name: default
tags: []
parameters:
"""
prefect deployment build week_2_dataflow/main.py:taxi_data -n yellow -q default -a
prefect deployment build week_2_dataflow/main.py:taxi_data -n yellow -q default -a --param table_name=green_tripdata
prefect deployment build week_2_dataflow/main.py:parent -n yellow -q default -a
prefect deployment build week_2_dataflow/main.py:parent -n yellow -q default -a --param table_name=green_tripdata
"""
import awswrangler as wr
import pandas as pd
from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner
from prefect import task, flow, get_run_logger
from dataplatform.blocks import Workspace, SnowflakePandas
from typing import Any, Dict
@task
def update_customers_dashboards() -> None:
logger = get_run_logger()
# your logic here - might be clearing cash of your BI tool
logger.info("Customers dashboard extracts updated! 📊")
from datetime import date
from prefect import flow
from flows.transformation.jaffle_shop.dbt_run_from_manifest import dbt_jaffle_shop
from flows.ingestion.ingest_jaffle_shop import raw_data_jaffle_shop
from flows.analytics.dashboards import dashboards
from flows.ml.sales_forecast import sales_forecast
@flow
def jaffle_shop_ingest_transform(
from prefect import flow
from prefect.filesystems import GitHub
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.credentials import DbtCliProfile
def dbt(command: str, path: str) -> None:
trigger_dbt_cli_command.with_options(name=command)(
command,
project_dir=path,
from dotenv import load_dotenv
import os
from prefect.filesystems import GitHub
load_dotenv()
gh_dbt_jaffle_shop = GitHub(
repository="https://github.com/anna-geller/dbt-jaffle-shop.git",
reference="main",
access_token=os.environ.get("GITHUB_PERSONAL_ACCESS_TOKEN", "dummy"),