Skip to content

Instantly share code, notes, and snippets.

View kvnkho's full-sized avatar
👋
Feel free to message me. Contact info in profile

Kevin Kho kvnkho

👋
Feel free to message me. Contact info in profile
View GitHub Profile
from typing import List, Dict, Any, Iterable
def map_letter_to_food2(df: List[Dict[str,Any]], mapping: Dict) -> Iterable[Dict[str,Any]]:
for row in df:
row["food"] = mapping[row["value"]]
yield row
def map_letter_to_food3(df: List[List[Any]], mapping: Dict) -> List[List[Any]]:
for row in df:
row.append(mapping[row[1]])
import pandas as pd
from pyspark.sql import SparkSession
from fugue import FugueWorkflow
data = pd.DataFrame({'col1': [1,2,3], 'col2':[2,3,4]})
def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
df['col3'] = df['col1'] + df['col2']
return df
from fugue_sql import fsql
fsql("""SELECT col1, col2
FROM data
TRANSFORM USING make_new_col SCHEMA *,col3:int
PRINT""").run(spark_session)
with FugueWorkflow() as dag:
dag = dag.df(df)
dag.partition_by("col", presort="col2 desc").take(5)
dag.show()
from prefect import flow, task
import time
@task
def add_one(x):
time.sleep(2)
return x+1
@task
def times_two(x):
from prefect import task, flow
@task()
def hello_task(name_input):
raise ValueError()
return name_input
@flow
def hello_flow():
x = hello_task("kevin")
# case 1: min of 2 columns
sub = backend.read_parquet(path)[["c0","c1"]]
sub.min()
# case 2: min, max, and mean of 2 columns
sub = backend.read_parquet(path)[["c0","c1"]]
sub.min()
sub.max()
sub.mean()
# case 1: read file and min of all columns
backend.read_parquet(path).min()
# case 2: read file and min of two columns
backend.read_parquet(path)[["c0","c1"]].min()
# case 1
df.head(10)[["c","d"]]
# case 2
df.tail(10)[["c","d"]]
# case 3
df.iloc[:10, [2,3]]
# case 4
# case 1: more shuffle
df.sort_values(["c","d"]).drop_duplicates(subset=["d"], keep="last")
# case 2: less shuffle
idx = df.groupby("d")["c"].idxmax()
df.merge(idx, left_index=True, right_on="c")