Skip to content

Instantly share code, notes, and snippets.

View kvnkho's full-sized avatar
👋
Feel free to message me. Contact info in profile

Kevin Kho kvnkho

👋
Feel free to message me. Contact info in profile
View GitHub Profile
from prefect import Flow
with Flow("distributed") as flow:
X_train, X_test, y_train, y_test = create_data()
models = get_models()
training_runs = train_model.map(models, unmapped(X_train),
unmapped(X_test), unmapped(y_train),
unmapped(y_test))
get_results(training_runs)
import pandas as pd
df = pd.read_csv("titanic.csv")
@task
def get_models():
space1 = Space(model=LogisticRegression, solver="lbfgs", C=Grid(10,20), penalty=Grid("l2","none"))
space2 = Space(model=RandomForestClassifier, max_samples=Rand(0.8,1), max_depth=RandInt(3,4)).sample(4)
space = [x.simple_value for x in list(space1+space2)]
models = []
for model_params in space:
model = model_params.pop("model")
models.append(model(**model_params))
return models
from tune import Space, Grid, RandInt, Rand
space1 = Space(model=LogisticRegression, solver="lbfgs", C=Grid(10,20), penalty=Grid("l2","none"))
space2 = Space(model=RandomForestClassifier, max_samples=Rand(0.8,1), max_depth=RandInt(3,4)).sample(4)
space = [x.simple_value for x in list(space1+space2)]
@task
def get_results(results):
from prefect.backend.artifacts import create_markdown_artifact
res = pd.DataFrame(results)
create_markdown_artifact(res.to_markdown())
return res
from prefect.executors import LocalDaskExecutor
flow.executor = LocalDaskExecutor()
# Pandas
df.groupby("col1")["col2"].median()
# PySpark
from pyspark.sql import Window
import pyspark.sql.functions as F
med_func = F.expr('percentile_approx(col2, 0.5, 20)')
df.groupBy('col1').agg(med_func).show()
import pandas as pd
from typing import Dict
input_df = pd.DataFrame({"id":[0,1,2], "value": (["A", "B", "C"])})
mapping = {"A": "Apple", "B": "Banana", "C": "Carrot"}
def map_letter_to_food(df: pd.DataFrame, mapping: Dict) -> pd.DataFrame:
df["food"] = df["value"].map(mapping)
return df
from fugue import transform
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()
df = transform(input_df,
map_letter_to_food,
schema="*, food:str",
params=dict(mapping=mapping),
engine=spark_session