Skip to content

Instantly share code, notes, and snippets.

View elijahbenizzy's full-sized avatar

Elijah ben Izzy elijahbenizzy

View GitHub Profile
def stargazer_url(
stars_by_repo: Dict[str, int], per_page: int = 100
) -> Parallelizable[str]:
"""Generates query objects for each repository, with the correct pagination and offset.
:param stars_by_repo: The star count for each repo
:param per_page: The number of results per page
:return: A query object for each repo, formatted as a generator.
"""
for repo_name, stars in stars_by_repo.items():
import pyspark.sql as ps
from hamilton.function_modifiers import load_from, value, source
@load_from.csv(path=value("data_1.csv"), inject_="raw_data_1", spark=source("spark"))
@load_from.parquet(path=value("data_2.parquet"), inject_="raw_data_2", spark=source("spark"))
def all_initial_data(raw_data_1: ps.DataFrame, raw_data_2: ps.DataFrame) -> ps.DataFrame:
"""Combines the two loaded dataframes"""
return _custom_join(raw_data_1, raw_data_2)
import pandas as pd
def column_3(column_1_from_dataframe: pd.Series) -> pd.Series:
return _some_transform(column_1_from_dataframe)
def column_4(column_2_from_dataframe: pd.Series) -> pd.Series:
return _some_other_transform(column_2_from_dataframe)
def column_5(column_3: pd.Series, column_4: pd.Series) -> pd.Series:
return _yet_another_transform(column_3, column_4)
from hamilton.plugins.h_spark import with_columns
import pyspark.sql as ps
import map_transforms # file defined above
@with_columns(
map_transforms,
columns_to_pass=["column_1_from_dataframe", "column_2_from_dataframe"]
)
def final_result(all_initial_data: ps.DataFrame) -> ps.DataFrame:
@elijahbenizzy
elijahbenizzy / features.py
Created September 2, 2023 22:15
Feature Engineering -- Common Operations
def is_male(gender: pd.Series) -> pd.Series:
return gender == "male"
def is_female(gender: pd.Series) -> pd.Series:
return gender == "female"
def is_high_roller(budget: pd.Series) -> pd.Series:
return budget > 100
#data_loaders.py
@config.when(mode="batch")
@extract_columns("budget", "age", "gender", "client_id")
def survey_results__batch(survey_results_table: str, survey_results_db: str) -> pd.DataFrame:
"""Map operation to explode survey results to all fields
Data comes in JSON, we've grouped it into a series.
"""
return utils.query_table(table=survey_results_table, db=survey_results_db)
@config.when(mode="online")
@extract_columns(
"budget",
"age",
"gender",
)
def survey_results__online(client_id: int) -> pd.DataFrame:
"""Map operation to explode survey results to all fields
Data comes in JSON, we've grouped it into a series.
"""
@config.when(mode="batch")
def age_mean__batch(age: pd.Series) -> float:
return age.mean()
@config.when(mode="batch")
def age_stddev__batch(age: pd.Series) -> float:
return age.std()
@config.when(mode="online")
def age_mean__online() -> float:
return utils.query_scalar("age_mean")
@config.when(mode="online")
def age_stddev__online() -> float:
return utils.query_scalar("age_stddev")
def features(
time_since_last_login: pd.Series,
is_male: pd.Series,
is_female: pd.Series,
is_high_roller: pd.Series,
age_normalized: pd.Series,
) -> pd.DataFrame:
return pd.DataFrame(locals())