Last active
June 6, 2023 18:56
-
-
Save amuramatsu/1e56a7dec3d80bbacfb6c8bf3b51e3fa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Copyright (c) 2023 MURAMATSU Atsushi <[email protected]> | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import itertools | |
import functools | |
import operator | |
from typing import Any, Union, Optional, Iterable, Sequence, Dict | |
import numpy as np | |
import pandas as pd | |
from scipy import interpolate #type: ignore | |
tqdm = None | |
try: | |
from tqdm import tqdm #type: ignore | |
except ImportError: | |
pass | |
def pandas_addnews( | |
df: pd.DataFrame, | |
x_column: str, | |
x_data: Iterable[Union[float, int]], | |
fill_na: Optional[Iterable[str]]=None) -> pd.DataFrame: | |
"""Add room for new data. | |
This function add new data rows with indicated column. | |
Args: | |
df: DataFrame | |
x_column: column name for data add | |
x_data: adding values | |
fill_na: filled with NaN columns | |
Returns: | |
New DataFrame | |
""" | |
dfone = df[df[x_column] == df[x_column][0]].copy(deep=False) | |
if fill_na is not None: | |
for k in fill_na: | |
dfone[k] = np.nan | |
df_new = df | |
for d in x_data: | |
dfx = dfone.copy(deep=False) | |
dfx[x_column] = d | |
df_new = pd.concat([df_new, dfx]) | |
return df_new | |
def pandas_interpolate( | |
df: pd.DataFrame, | |
x_column: str, | |
x_data: Iterable[Union[float, int]], | |
fill_columns: Iterable[str], | |
kind: str="polybest", | |
progress: Union[None,bool,Dict[str,Any]]=None) -> pd.DataFrame: | |
"""Add interpolated data. | |
This function add new data rows with interpolate data | |
Args: | |
df: DataFrame | |
x_column: column name for data add | |
x_data: adding values | |
fill_columns: target columns for interpolate | |
kind: interpolate function | |
(like as scipy.interpolate.inter1d, or "polybest") | |
Returns: | |
New DataFrame | |
""" | |
df_tmp = pandas_addnews(df, x_column, x_data, fill_na=fill_columns) | |
df_tmp.sort_values( | |
x_column, inplace=True, ignore_index=True) | |
df_mat = df.drop(columns=[x_column] + list(fill_columns)) | |
df_mat.drop_duplicates( | |
inplace=True, ignore_index=True) | |
df_mat_c = df_mat.columns | |
df_mat_v = [ set(df_mat[n].values) for n in df_mat_c ] | |
df_new = None | |
iter_ = itertools.product(*df_mat_v) | |
if tqdm is not None and progress: | |
df_mat_total = functools.reduce(operator.mul, (len(v) for v in df_mat_v), 1) | |
if isinstance(progress, dict): | |
iter_ = tqdm(iter_, total=df_mat_total, **progress) | |
else: | |
iter_ = tqdm(iter_, total=df_mat_total) | |
for vs in iter_: | |
mask = np.array([ True ] * len(df_tmp)) | |
for c, v in zip(df_mat_c, vs): | |
mask *= (df_tmp[c] == v) | |
dfx = df_tmp[mask].copy() | |
X = dfx[x_column].values | |
for y_column in fill_columns: | |
Y = dfx[y_column].values | |
nans = np.isnan(Y) #type: ignore | |
X_ = X[~nans] | |
if kind == "polybest": | |
if len(X_) == 2: | |
k = "sliner" | |
elif len(X_) == 3: | |
k = "quadratic" | |
else: | |
k = "cubic" | |
else: | |
k = kind | |
f = interpolate.interp1d( | |
X_, Y[~nans], | |
kind=k, fill_value="extrapolate", | |
) | |
Y[nans] = f(X[nans]) | |
dfx[y_column] = Y | |
if df_new is None: | |
df_new = dfx | |
else: | |
df_new = pd.concat([df_new, dfx]) | |
if df_new is None: | |
raise Exception() | |
df_new.sort_values(x_column, inplace=True, ignore_index=True) | |
return df_new | |
if __name__ == "__main__": | |
import random | |
from pprint import pprint | |
import matplotlib.pyplot as plt #type: ignore | |
I = np.arange(6) * 0.5 | |
A = [ 1, 2, 3 ] | |
B = [ 10, 11 ] | |
m = { | |
"I": [], "A": [], "B": [], | |
"V1": [], "V2": [], "V3": [], | |
} #type: ignore | |
for i, a, b in itertools.product(I, A, B): | |
m["I"].append(i) | |
m["A"].append(a) | |
m["B"].append(b) | |
m["V1"].append(i * a + b) | |
m["V2"].append(i**a - b) | |
m["V3"].append(i * 2 * a + b - random.random()) | |
df = pd.DataFrame(m) | |
print(df) | |
df2 = pandas_interpolate(df, "I", [ -1.2, 1.3, 5.0 ], | |
[ "V1", "V2", "V3" ], | |
progress={ "ascii": True }) | |
print(df2) | |
ax = df2[(df2.A == 2) & (df2.B == 11)].plot(x="I", y="V1") | |
ax = df2[(df2.A == 2) & (df2.B == 11)].plot(x="I", y="V2", ax=ax) | |
ax = df2[(df2.A == 2) & (df2.B == 11)].plot(x="I", y="V3", ax=ax) | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment