Last active
July 13, 2018 18:53
-
-
Save RockfordWei/15ad8de8611b13b42c2e4635b7649a1a to your computer and use it in GitHub Desktop.
Non-linear scale
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import random | |
import math | |
class EncodedScaler: | |
milestones = [0,0,0] | |
unit = 0 | |
count= 0 | |
size = 0 | |
min_ = 0 | |
max_ = 0 | |
range_ = 0 | |
def __init__(self, dataset: pd.DataFrame, reject: int = 0): | |
columns = list(map(lambda x: dataset[x], dataset.columns.values)) | |
value_set = pd.concat(columns, ignore_index=True).value_counts() | |
value_set = pd.DataFrame({'counter':value_set}) | |
self.milestones = np.sort(value_set.query('counter > %d' % reject).index.values) | |
self.size = self.milestones.size | |
if (self.size < 2): | |
raise NameError('dataset is not scalable: insufficient data to support a transformation') | |
self.count = self.size - 1 | |
self.min_ = self.milestones[0] | |
self.max_ = self.milestones[self.count] | |
self.unit = 2.0 / self.count | |
def describe(self): | |
print('spec:', self.size, self.min_, self.max_, self.count, self.unit) | |
print(list(self.milestones)) | |
def scale(self, series: pd.Series): | |
return series.apply(self.encode) | |
def unscale(self, series: pd.Series): | |
return series.apply(self.decode) | |
def fit(self, dataframe: pd.DataFrame): | |
dat = dataframe.copy() | |
for column in dat: | |
dat[column] = self.scale(dat[column]) | |
return dat | |
def unfit(self, dataframe: pd.DataFrame): | |
dat = dataframe.copy() | |
for column in dat: | |
dat[column] = self.unscale(dat[column]) | |
return dat | |
def encode(self, raw: int = 0) -> float: | |
if (raw < self.min_): | |
first_unit = float(self.milestones[1] - self.min_) | |
diff = float(self.min_ - raw) | |
count = diff / first_unit | |
return -1.0 - count * self.unit | |
if (raw > self.max_): | |
last_unit = float(self.max_ - self.milestones[self.count - 1]) | |
diff = float(raw - self.max_) | |
count = diff / last_unit | |
return 1 + count * self.unit | |
for i in range(0, self.count): | |
a = self.milestones[i] | |
b = self.milestones[i+1] | |
if a <= raw <= b: | |
actual_unit = float(b - a) | |
remain = float(raw - a) | |
remain = remain * self.unit / actual_unit | |
base = -1.0 + i * self.unit | |
final = base + remain | |
return final | |
raise NameError('unexpected out of range when encoding') | |
def decode(self, scaled: float = .0) -> int: | |
if (scaled < -1.0): | |
diff = -1.0 - scaled | |
count = diff / self.unit | |
first_unit = self.milestones[1] - self.min_ | |
return int(round(self.min_ - count * first_unit)) | |
if (scaled > 1.0): | |
diff = scaled - 1.0 | |
count = diff / self.unit | |
last_unit = self.milestones[self.count] - self.milestones[self.count - 1] | |
return int(round(self.max_ + count * last_unit)) | |
diff = scaled + 1.0 # == scaled - (-1.0) | |
measure = diff / self.unit | |
index = int(measure) | |
if (index >= self.count): | |
return self.max_ | |
actual_unit = float(self.milestones[index + 1] - self.milestones[index]) | |
remain = (measure - float(index)) | |
remain = remain * actual_unit | |
return int(round(self.milestones[index] + remain)) | |
testset = [100, 0, -100, 100, 200, 200, 200, 1000, 100,300, 100, 200, 1000,0, 400, 40, 300, 1000, 100, -100, -100, 100, 40, 400,40, 400] | |
testset = pd.DataFrame({'target': testset}) | |
scaler = EncodedScaler(testset) | |
scaler.describe() | |
data = scaler.fit(testset) | |
print(data) | |
dat = scaler.unfit(data) | |
print(dat) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment