-
-
Save gschivley/4ad877dd280a425f83fe9c3b72807418 to your computer and use it in GitHub Desktop.
PowerGenome: Prepare clusters and cluster profiles
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import pandas as pd | |
import numpy as np | |
import scipy.cluster.hierarchy | |
WEIGHT = "gw" | |
MEANS = [ | |
"lcoe", | |
"interconnect_annuity", | |
"spur_miles", | |
"tx_miles", | |
"site_substation_spur_miles", | |
"substation_metro_tx_miles", | |
"site_metro_spur_miles", | |
] | |
SUMS = ["area", "gw"] | |
PROFILE_KEYS = ["cbsa_id", "cluster_level", "cluster"] | |
HOURS_IN_YEAR = 8784 | |
def format_metadata(df, cap_multiplier=None, by="gw"): | |
# Convert all column names to lowercase | |
df.columns = [name.lower() for name in df.columns] | |
if cap_multiplier: | |
# Solar capacity comes in at 45GW/km2. Derating happens here, | |
# can add more complicated methods (e.g. using m_popden) later. | |
df["gw"] = df["gw"] * cap_multiplier | |
# Initialize sequential unique cluster id | |
cluster_id = 1 | |
all_clusters = [] | |
for cbsa in df["cbsa_id"].unique(): | |
sdf = df[df["cbsa_id"] == cbsa] | |
levels = sdf["cluster_level"].drop_duplicates().sort_values(ascending=False) | |
# Start from base clusters | |
clusters = sdf[sdf["cluster_level"] == levels.max()].to_dict(orient="records") | |
for i, x in enumerate(clusters, start=cluster_id): | |
x["id"] = i | |
for level in levels[1:]: | |
parent_id = cluster_id + len(clusters) | |
new = sdf[sdf["cluster_level"] == level] | |
# Old cluster is a child if: | |
# - not already assigned to a parent | |
# - capacity not duplicated in current cluster level | |
children = [ | |
x | |
for x in clusters | |
if not x.get("parent_id") and (x[by] != new[by]).all() | |
] | |
if len(children) != 2: | |
raise ValueError( | |
f"Found {len(children)} children for level {level} in cbsa_id {cbsa}" | |
) | |
for x in children: | |
x["parent_id"] = parent_id | |
# New cluster is a parent if: | |
# - capacity not present in previous cluster level | |
is_parent = ~new[by].isin(sdf[sdf["cluster_level"] == level + 1][by]) | |
if sum(is_parent) == 1: | |
parent = new[is_parent].iloc[0].to_dict() | |
parent["id"] = parent_id | |
else: | |
raise ValueError( | |
f"Found {sum(is_parent)} parents at level {level} in cbsa_id {cbsa}" | |
) | |
clusters.append(parent) | |
all_clusters.extend(clusters) | |
cluster_id += len(clusters) | |
return pd.DataFrame(all_clusters).set_index("id", drop=False) | |
def format_profiles_inplace(df): | |
# Convert column names to lowercase | |
df.columns = [name.lower() for name in df.columns] | |
# Prepare index | |
# NOTE: Assumes already sorted by hour (ascending) | |
df.set_index(PROFILE_KEYS, inplace=True) | |
def build_clusters(metadata, ipm_regions, min_capacity=None, max_clusters=np.inf): | |
if max_clusters < 1: | |
raise ValueError("Max clusters must be greater than zero") | |
df = metadata | |
cdf = _get_base_clusters(df, ipm_regions).sort_values("lcoe") | |
if min_capacity: | |
# Drop clusters with highest LCOE until min_capacity reached | |
end = cdf["gw"].cumsum().searchsorted(min_capacity) + 1 | |
if end > len(cdf): | |
raise ValueError( | |
f"Capacity in {ipm_regions} ({cdf['gw'].sum()} GW) less than minimum ({min_capacity} GW)" | |
) | |
else: | |
cdf = cdf[:end] | |
# Track ids of base clusters through aggregation | |
cdf["ids"] = [[x] for x in cdf["id"]] | |
# Aggregate clusters within each metro area (cbsa_id) | |
while len(cdf) > max_clusters: | |
# Sort parents by lowest LCOE distance of children | |
diff = lambda x: abs(x.max() - x.min()) | |
parents = ( | |
cdf.groupby("parent_id", sort=False) | |
.agg(child_ids=("id", list), n=("id", "count"), lcoe=("lcoe", diff)) | |
.sort_values(["n", "lcoe"], ascending=[False, True]) | |
) | |
if parents.empty: | |
break | |
if parents["n"].iloc[0] == 2: | |
# Choose parent with lowest LCOE | |
best = parents.iloc[0] | |
# Compute parent | |
parent = pd.Series( | |
_merge_children( | |
cdf.loc[best["child_ids"]], | |
ids=_flat(*cdf.loc[best["child_ids"], "ids"]), | |
**df.loc[best.name], | |
) | |
) | |
# Add parent | |
cdf.loc[best.name] = parent | |
# Drop children | |
cdf.drop(best["child_ids"], inplace=True) | |
else: | |
# Promote child with deepest parent | |
parent_id = df.loc[parents.index, "cluster_level"].idxmax() | |
parent = df.loc[parent_id] | |
child_id = parents.loc[parent_id, "child_ids"][0] | |
# Update child | |
columns = ["id", "parent_id", "cluster_level"] | |
cdf.loc[child_id, columns] = parent[columns] | |
# Update index | |
cdf.rename(index={child_id: parent_id}, inplace=True) | |
# Keep only computed columns | |
columns = _flat(MEANS, SUMS, "ids") | |
cdf = cdf[columns] | |
cdf.reset_index(inplace=True, drop=True) | |
if len(cdf) > max_clusters: | |
# Aggregate singleton metro area clusters | |
Z = scipy.cluster.hierarchy.linkage(cdf[["lcoe"]].values, method="ward") | |
cdf["_keep"] = True | |
for child_idx in Z[:, 0:2].astype(int): | |
cdf.loc[child_idx, "_keep"] = False | |
parent = _merge_children( | |
cdf.loc[child_idx], _keep=True, ids=_flat(*cdf.loc[child_idx, "ids"]) | |
) | |
cdf.loc[len(cdf)] = parent | |
if not cdf["_keep"].sum() > max_clusters: | |
break | |
cdf = cdf[cdf["_keep"]] | |
return cdf[columns] | |
def build_cluster_profiles(clusters, metadata, profiles): | |
results = np.zeros((HOURS_IN_YEAR, len(clusters)), dtype=float) | |
for i, ids in enumerate(clusters["ids"]): | |
idx = [tuple(x) for x in metadata.loc[ids, profiles.index.names].values] | |
capacities = profiles.loc[idx, "capacity_factor"].values.reshape( | |
HOURS_IN_YEAR, -1, order="F" | |
) | |
weights = metadata.loc[ids, "gw"].values | |
weights /= weights.sum() | |
results[:, i] = (capacities * weights).sum(axis=1) | |
return results | |
def clusters_to_capacity_transmission_table(clusters, region, technology): | |
columns = [ | |
"gw", | |
"area", | |
"lcoe", | |
"interconnect_annuity", | |
"spur_miles", | |
"tx_miles", | |
"site_metro_spur_miles", | |
"site_substation_spur_miles", | |
"substation_metro_tx_miles", | |
"ids", | |
] | |
return ( | |
clusters[columns] | |
.assign(region=region, technology=technology, cluster=clusters.index) | |
.reset_index(drop=True) | |
.rename(columns={"gw": "max_capacity"}) | |
) | |
def clusters_to_variable_resource_profiles_table( | |
clusters, cluster_profiles, region, technology | |
): | |
n_hours, n_clusters = cluster_profiles.shape | |
cols = {} | |
cols[("region", "Resource", "cluster")] = np.arange(n_hours) + 1 | |
for i in range(n_clusters): | |
cols[(region, technology, clusters.index[i])] = cluster_profiles[:, i] | |
return pd.DataFrame(cols) | |
def _get_base_clusters(df, ipm_regions): | |
return ( | |
df[df["ipm_region"].isin(ipm_regions)] | |
.groupby("cbsa_id") | |
.apply(lambda g: g[g["cluster_level"] == g["cluster_level"].max()]) | |
.reset_index(level=["cbsa_id"], drop=True) | |
) | |
def _merge_children(df, **kwargs): | |
parent = kwargs | |
for key in SUMS: | |
parent[key] = df[key].sum() | |
for key in MEANS: | |
parent[key] = (df[key] * df[WEIGHT]).sum() / df[WEIGHT].sum() | |
return parent | |
def _flat(*args): | |
args = [x if np.iterable(x) and not isinstance(x, str) else [x] for x in args] | |
return list(itertools.chain(*args)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from clusters import ( | |
format_metadata, | |
format_profiles_inplace, | |
build_clusters, | |
build_cluster_profiles, | |
clusters_to_capacity_transmission_table, | |
clusters_to_variable_resource_profiles_table, | |
) | |
# ---- Constants ---- | |
METADATA_PATHS = { | |
"OnshoreWind": "preliminary_base_wind_metadata.csv", | |
"UtilityPV": "solarpv_base_cluster_metadata.csv", | |
} | |
PROFILE_PATHS = { | |
"OnshoreWind": "preliminary_base_wind_cluster_profiles.parquet", | |
"UtilityPV": "solarpv_base_cluster_profiles.parquet", | |
} | |
CAPACITY_MULTIPLIER = { | |
"OnshoreWind": 1, | |
"UtilityPV": 0.2, | |
} | |
SCENARIOS = { | |
"UtilityPV": { | |
"CA_N": { | |
"ipm_regions": ["WEC_CALN", "WECC_BANC"], | |
"max_clusters": 4, | |
"min_capacity": 200, | |
}, | |
"CA_S": { | |
"ipm_regions": ["WEC_SCE", "WEC_LADW", "WECC_SCE", "WEC_SDGE", "WECC_IID"], | |
"max_clusters": 5, | |
"min_capacity": 150, | |
}, | |
"WECC_N": { | |
"ipm_regions": ["WECC_ID", "WECC_MT", "WECC_NNV", "WECC_SNV", "WECC_UT"], | |
"max_clusters": 20, | |
"min_capacity": 250, | |
}, | |
"WECC_NMAZ": { | |
"ipm_regions": ["WECC_AZ", "WECC_NM"], | |
"max_clusters": 15, | |
"min_capacity": 300, | |
}, | |
# "WECC_PNW": { | |
# "ipm_regions": [], | |
# "max_clusters": 1, | |
# "min_capacity": 4.5, | |
# }, | |
"WECC_WYCO": { | |
"ipm_regions": ["WECC_WY", "WECC_CO"], | |
"max_clusters": 15, | |
"min_capacity": 180, | |
}, | |
}, | |
"OnshoreWind": { | |
"CA_N": { | |
"ipm_regions": ["WEC_CALN", "WECC_BANC"], | |
"max_clusters": 1, | |
"min_capacity": 22, | |
}, | |
"CA_S": { | |
"ipm_regions": ["WEC_SCE", "WEC_LADW", "WECC_SCE", "WEC_SDGE", "WECC_IID"], | |
"max_clusters": 10, | |
"min_capacity": 45, | |
}, | |
"WECC_N": { | |
"ipm_regions": ["WECC_ID", "WECC_MT", "WECC_NNV", "WECC_SNV", "WECC_UT"], | |
"max_clusters": 15, | |
"min_capacity": 100, | |
}, | |
"WECC_NMAZ": { | |
"ipm_regions": ["WECC_AZ", "WECC_NM"], | |
"max_clusters": 15, | |
"min_capacity": 120, | |
}, | |
# "WECC_PNW": { | |
# "ipm_regions": [], | |
# "max_clusters": 1, | |
# "min_capacity": 6, | |
# }, | |
"WECC_WYCO": { | |
"ipm_regions": ["WECC_WY", "WECC_CO"], | |
"max_clusters": 20, | |
"min_capacity": 200, | |
}, | |
}, | |
} | |
# ---- Processing ---- | |
technology = "UtilityPV" | |
region = "CA_N" | |
# Prepare resource metadata and profiles | |
metadata = pd.read_csv(METADATA_PATHS[technology]).pipe( | |
format_metadata, cap_multiplier=CAPACITY_MULTIPLIER[technology], by="lcoe" | |
) | |
profiles = pd.read_parquet(PROFILE_PATHS[technology]) | |
format_profiles_inplace(profiles) | |
# Build clusters | |
scenario = SCENARIOS[technology][region] | |
clusters = build_clusters(metadata, **scenario) | |
# Build cluster profiles | |
cluster_profiles = build_cluster_profiles(clusters, metadata, profiles) | |
# Export results as PowerGenome input | |
tab1 = clusters_to_capacity_transmission_table(clusters, region, technology) | |
tab1.to_csv("test_resource_capacity_spur_line.csv", index=False, float_format="%.3f") | |
tab2 = clusters_to_variable_resource_profiles_table( | |
clusters, cluster_profiles, region, technology | |
) | |
tab2.to_csv("test_variable_resource_profiles.csv", index=False, float_format="%.3f") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import pandas as pd | |
from joblib import Parallel, delayed | |
from IPython import embed as IP | |
from clusters import ( | |
format_metadata, | |
format_profiles_inplace, | |
build_clusters, | |
build_cluster_profiles, | |
clusters_to_capacity_transmission_table, | |
clusters_to_variable_resource_profiles_table, | |
) | |
logger = logging.getLogger(__file__) | |
logger.setLevel(logging.INFO) | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter( | |
# More extensive test-like formatter... | |
"%(asctime)s [%(levelname)8s] %(name)s:%(lineno)s %(message)s", | |
# This is the datetime format string. | |
"%Y-%m-%d %H:%M:%S", | |
) | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
# ---- Constants ---- | |
METADATA_PATHS = { | |
"OnshoreWind": "wind_base_cluster_metadata.csv", | |
"UtilityPV": "solarpv_base_cluster_metadata.csv", | |
} | |
PROFILE_PATHS = { | |
"OnshoreWind": "wind_base_cluster_profiles_partition.parquet", | |
"UtilityPV": "solarpv_base_cluster_profiles.parquet", | |
} | |
CAPACITY_MULTIPLIER = { | |
"OnshoreWind": 1, | |
"UtilityPV": 0.2, | |
} | |
SCENARIOS = { | |
"UtilityPV": { | |
"CA_N": { | |
"ipm_regions": ["WEC_CALN", "WECC_BANC"], | |
"max_clusters": 4, | |
"min_capacity": 200, | |
}, | |
"CA_S": { | |
"ipm_regions": ["WEC_SCE", "WEC_LADW", "WECC_SCE", "WEC_SDGE", "WECC_IID"], | |
"max_clusters": 5, | |
"min_capacity": 150, | |
}, | |
"WECC_N": { | |
"ipm_regions": ["WECC_ID", "WECC_MT", "WECC_NNV", "WECC_SNV", "WECC_UT"], | |
"max_clusters": 20, | |
"min_capacity": 250, | |
}, | |
"WECC_NMAZ": { | |
"ipm_regions": ["WECC_AZ", "WECC_NM"], | |
"max_clusters": 15, | |
"min_capacity": 300, | |
}, | |
# "WECC_PNW": { | |
# "ipm_regions": [], | |
# "max_clusters": 1, | |
# "min_capacity": 4.5, | |
# }, | |
"WECC_WYCO": { | |
"ipm_regions": ["WECC_WY", "WECC_CO"], | |
"max_clusters": 15, | |
"min_capacity": 180, | |
}, | |
}, | |
"OnshoreWind": { | |
"CA_N": { | |
"ipm_regions": ["WEC_CALN", "WECC_BANC"], | |
"max_clusters": 1, | |
"min_capacity": 22, | |
}, | |
"CA_S": { | |
"ipm_regions": ["WEC_SCE", "WEC_LADW", "WECC_SCE", "WEC_SDGE", "WECC_IID"], | |
"max_clusters": 10, | |
"min_capacity": 45, | |
}, | |
"WECC_N": { | |
"ipm_regions": ["WECC_ID", "WECC_MT", "WECC_NNV", "WECC_SNV", "WECC_UT"], | |
"max_clusters": 15, | |
"min_capacity": 100, | |
}, | |
"WECC_NMAZ": { | |
"ipm_regions": ["WECC_AZ", "WECC_NM"], | |
"max_clusters": 15, | |
"min_capacity": 120, | |
}, | |
# "WECC_PNW": { | |
# "ipm_regions": [], | |
# "max_clusters": 1, | |
# "min_capacity": 6, | |
# }, | |
"WECC_WYCO": { | |
"ipm_regions": ["WECC_WY", "WECC_CO"], | |
"max_clusters": 20, | |
"min_capacity": 200, | |
}, | |
}, | |
} | |
def make_region_data(technology, region): | |
print(f"{technology}, {region}") | |
# Prepare resource metadata and profiles | |
metadata = pd.read_csv(METADATA_PATHS[technology]).pipe( | |
format_metadata, cap_multiplier=CAPACITY_MULTIPLIER[technology], by="lcoe" | |
) | |
if tech == "OnshoreWind": | |
filters = [ | |
[("IPM_Region", "=", r)] | |
for r in SCENARIOS[technology][region]["ipm_regions"] | |
] | |
profiles = pd.read_parquet( | |
PROFILE_PATHS[technology], use_pandas_metadata=True, filters=filters | |
) | |
else: | |
profiles = pd.read_parquet(PROFILE_PATHS[technology]) | |
format_profiles_inplace(profiles) | |
# Build clusters | |
scenario = SCENARIOS[technology][region] | |
clusters = build_clusters(metadata, **scenario) | |
# Build cluster profiles | |
cluster_profiles = build_cluster_profiles(clusters, metadata, profiles) | |
# Export results as PowerGenome input | |
tab1 = clusters_to_capacity_transmission_table(clusters, region, technology) | |
tab1["max_capacity"] = tab1["max_capacity"] * CAPACITY_MULTIPLIER[technology] | |
# spur_line = pd.concat([spur_line, tab1], ignore_index=True) | |
tab2 = clusters_to_variable_resource_profiles_table( | |
clusters, cluster_profiles, region, technology | |
) | |
# col = tab2[("region", "Resource", "cluster")] | |
tab2 = tab2.drop([("region", "Resource", "cluster")], axis=1) | |
# resource_variability = pd.concat([resource_variability, tab2], axis=1) | |
return tab1, tab2 | |
PARALLEL = True | |
USE_IP = True | |
# ---- Processing ---- | |
spur_line = pd.DataFrame() | |
resource_variability = pd.DataFrame() | |
cap_tx = [] | |
profiles = [] | |
for tech in SCENARIOS: | |
technology = tech # "UtilityPV" | |
if PARALLEL: | |
_tech_results = Parallel(n_jobs=-2)( | |
delayed(make_region_data)(technology, region) for region in SCENARIOS[tech] | |
) | |
_cap_tx, _profiles = zip(*_tech_results) | |
cap_tx.extend(_cap_tx) | |
profiles.extend(_profiles) | |
else: | |
for reg in SCENARIOS[tech]: | |
region = reg | |
logger.info(f"{technology}, {region}") | |
# Prepare resource metadata and profiles | |
# IP() | |
metadata = pd.read_csv(METADATA_PATHS[technology]).pipe( | |
format_metadata, | |
cap_multiplier=CAPACITY_MULTIPLIER[technology], | |
by="lcoe", | |
) | |
if tech == "OnshoreWind": | |
filters = [ | |
[("IPM_Region", "=", r)] | |
for r in SCENARIOS[technology][region]["ipm_regions"] | |
] | |
profiles = pd.read_parquet( | |
PROFILE_PATHS[technology], use_pandas_metadata=True, filters=filters | |
) | |
else: | |
profiles = pd.read_parquet(PROFILE_PATHS[technology]) | |
format_profiles_inplace(profiles) | |
# Build clusters | |
scenario = SCENARIOS[technology][region] | |
clusters = build_clusters(metadata, **scenario) | |
# Build cluster profiles | |
cluster_profiles = build_cluster_profiles(clusters, metadata, profiles) | |
# Export results as PowerGenome input | |
tab1 = clusters_to_capacity_transmission_table(clusters, region, technology) | |
spur_line = pd.concat([spur_line, tab1], ignore_index=True) | |
tab2 = clusters_to_variable_resource_profiles_table( | |
clusters, cluster_profiles, region, technology | |
) | |
col = tab2[("region", "Resource", "cluster")] | |
tab2 = tab2.drop([("region", "Resource", "cluster")], axis=1) | |
resource_variability = pd.concat([resource_variability, tab2], axis=1) | |
if USE_IP: | |
IP() | |
if PARALLEL: | |
resource_variability = pd.concat(profiles, axis=1) | |
spur_line = pd.concat(cap_tx) | |
else: | |
resource_variability = pd.concat([col, resource_variability], axis=1) | |
spur_line.to_csv( | |
"test_resource_capacity_spur_line.csv", index=False, float_format="%.3f" | |
) | |
resource_variability.to_csv( | |
"test_variable_resource_profiles.csv", index=False, float_format="%.4f" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment