Skip to content

Instantly share code, notes, and snippets.

@dfsnow
Last active July 2, 2024 02:03
Show Gist options
  • Save dfsnow/af039766c88030bbaeb4a7f9f8947fc3 to your computer and use it in GitHub Desktop.
Save dfsnow/af039766c88030bbaeb4a7f9f8947fc3 to your computer and use it in GitHub Desktop.
import numpy as np
import pandas as pd
import pyspark.pandas as ps
# dbt setup and data fetching stuff, feel free to ignore
config_dict = {}
def ref(*args, **kwargs):
refs = {"reporting.ratio_stats_input": "\"awsdatacatalog\".\"reporting\".\"ratio_stats_input\""}
key = '.'.join(args)
version = kwargs.get("v") or kwargs.get("version")
if version:
key += f".v{version}"
dbt_load_df_function = kwargs.get("dbt_load_df_function")
return dbt_load_df_function(refs[key])
def source(*args, dbt_load_df_function):
sources = {}
key = '.'.join(args)
return dbt_load_df_function(sources[key])
class config:
def __init__(self, *args, **kwargs):
pass
@staticmethod
def get(key, default=None):
return config_dict.get(key, default)
class this:
"""dbt.this() or dbt.this.identifier"""
database = "awsdatacatalog"
schema = "z_dev_dsnow_reporting"
identifier = "ratio_stats"
def __repr__(self):
return '"awsdatacatalog"."z_dev_dsnow_reporting"."ratio_stats"'
class dbtObj:
def __init__(self, load_df_function) -> None:
self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
self.ref = lambda *args, **kwargs: ref(*args, **kwargs, dbt_load_df_function=load_df_function)
self.config = config
self.this = this()
self.is_incremental = False
def get_spark_df(identifier):
return spark.table(".".join(identifier.split(".")[1:]).replace('"', ''))
class SparkdbtObj(dbtObj):
def __init__(self):
super().__init__(load_df_function=get_spark_df)
self.source = lambda *args: source(*args, dbt_load_df_function=get_spark_df)
self.ref = lambda *args: ref(*args, dbt_load_df_function=get_spark_df)
def model(dbt, spark_session):
dbt.config(materialized="table")
input = dbt.ref("reporting.ratio_stats_input")
return input
# Define CCAO-specific functions that work with Spark
def boot_ci(fun, nboot=100, alpha=0.05, **kwargs):
num_kwargs = len(kwargs)
kwargs = pd.DataFrame(kwargs)
n = len(kwargs)
ests = []
for i in list(range(1, nboot)):
sample = kwargs.sample(n=n, replace=True)
if fun.__name__ == "cod" or num_kwargs == 1:
ests.append(fun(sample.iloc[:, 0]))
elif fun.__name__ in ["prd", "prb", "mki"]:
ests.append(fun(sample.iloc[:, 0], sample.iloc[:, 1]))
else:
raise Exception(
"Input function should be one of 'cod', 'prd', 'prb', or 'mki'."
)
ests = pd.Series(ests)
ci = [ests.quantile(alpha / 2), ests.quantile(1 - alpha / 2)]
return ci
def cod_boot(ratio, nboot=100, alpha=0.05):
return boot_ci(cod, ratio=ratio, nboot=nboot, alpha=alpha)
def cod(x):
n = x.size
median_ratio = x.median()
# No numpy in here, as Spark doesn't seem to play well with it
ratio_minus_med = x - median_ratio
abs_diff_sum = ratio_minus_med.abs().sum()
cod = 100 / median_ratio * (abs_diff_sum / n)
return cod
def ccao_cod(x):
no_outliers = x.between(
x.quantile(0.05), x.quantile(0.95), inclusive="neither"
)
x_no_outliers = x[no_outliers]
cod_n = x_no_outliers.size
if cod_n >= 20:
cod_val = cod(x_no_outliers)
cod_ci = cod_boot(ratio=x_no_outliers.to_numpy(), nboot=100)
met = 5 <= cod_val <= 15
out = [cod_val, cod_ci[0], cod_ci[1], met, cod_n]
else:
out = [None, None, None, None, cod_n]
return out
def ccao_median(x):
# Remove top and bottom 5% of ratios as per CCAO Data Department SOPs
no_outliers = x.between(
x.quantile(0.05), x.quantile(0.95), inclusive="neither"
)
x_no_outliers = x[no_outliers]
median_n = x_no_outliers.size
median_val = x_no_outliers.median()
out = [median_val, median_n]
return out
# Load the Spark data frame using pandas on spark
# https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html
dbt = SparkdbtObj()
df = model(dbt, spark)
sdf = ps.DataFrame(df.filter(df.ratio.isNotNull()).filter(df.ratio > 0))
# Define the group colomns, default to 'township_code' geography
group_cols = [
"year",
"triad",
"property_group",
"assessment_stage",
"geography_id",
"sale_year",
]
sdf["geography_id"] = sdf["township_code"].astype(str)
sdf.groupby(group_cols).apply(
lambda x: pd.Series(
{
"sale_n": x["triad"].size,
**dict(zip(["median_val", "median_n"], ccao_median(x["ratio"]))),
**dict(zip(["cod_val", "cod_ci_l", "ccao_ci_u", "cod_met", "cod_n"], ccao_cod(x["ratio"])))
}
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment