Skip to content

Instantly share code, notes, and snippets.

View kvnkho's full-sized avatar
👋
Feel free to message me. Contact info in profile

Kevin Kho kvnkho

👋
Feel free to message me. Contact info in profile
View GitHub Profile
@kvnkho
kvnkho / pipeline.yaml
Last active August 2, 2022 19:07
Ploomber Pipeline
# Content of pipeline.yaml
tasks:
- source: extract.py
product:
# scripts generate executed notebooks as outputs
nb: output/extract.html
# you can define as many outputs as you want
data: data/raw.parquet
- source: transform.py
@kvnkho
kvnkho / pipeline_with_parameters.yaml
Created July 31, 2022 04:40
Ploomber pipeline with parameters
# Content of pipeline.yaml
tasks:
- source: extract.py
product:
# scripts generate executed notebooks as outputs
nb: output/extract.html
# you can define as many outputs as you want
data: 'data/{{raw_filename}}'
params:
engine: '{{engine}}'
@kvnkho
kvnkho / env.yaml
Last active August 1, 2022 17:30
Ploomber env yaml
engine: spark
raw_filename: full_raw.parquet
transformed_filename: full_transformed.parquet
@kvnkho
kvnkho / transform_with_spark.py
Last active August 11, 2022 02:25
Ploomber with Fugue on Spark
import pandas as pd
from fugue import transform
from sklearn.preprocessing import minmax_scale
# %% tags=["parameters"]
# declare a list tasks whose products you want to use as inputs
upstream = ["extract"]
product = None
engine = None
from fugue import FugueWorkflow
import pandas as pd
# + tags=["parameters"]
# declare a list tasks whose products you want to use as inputs
upstream = ["extract"]
product = None
engine = None
# -
@kvnkho
kvnkho / fugue_transform.py
Last active August 2, 2022 19:25
Example of Fugue Transform
import pandas as pd
from fugue import transform
from sklearn.preprocessing import minmax_scale
df = pd.DataFrame({"col1": ["A","A","A","B","B","B"], "col2":[1,2,3,4,5,6]})
def normalize(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(scaled=minmax_scale(df["col2"]))
# run on Pandas
import pandas as pd
# + tags=["parameters"]
# declare a list tasks whose products you want to use as inputs
upstream = None
product = None
# -
df = pd.DataFrame({"col1": [1,2,3], "col2":[2,3,4]})
df.to_parquet(product["data"])
from time import time
import pandas as pd
from statsforecast.utils import generate_series
from statsforecast.models import AutoARIMA
from statsforecast.core import StatsForecast
series = generate_series(n_series=1000000, seed=1)
model = StatsForecast(df=series,
from fugue import transform
def forecast_series(df: pd.DataFrame, models) -> pd.DataFrame:
tdf = df.set_index("unique_id")
model = StatsForecast(df=tdf, models=models, freq='D', n_jobs=1)
return model.forecast(7).reset_index()
transform(series.reset_index(),
forecast_series,
params=dict(models=[AutoARIMA()]),
from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import AutoARIMA
from statsforecast.core import StatsForecast
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})