Forked from rodrigofp-cit/black_friday_challenge_custom_transformer.py
Created
June 30, 2023 17:56
-
-
Save Sandy4321/f542c7a003c4d8853bdf85f05ea8b1a4 to your computer and use it in GitHub Desktop.
black_friday_challenge_custom_transformer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
from sklearn.base import BaseEstimator | |
from sklearn import preprocessing | |
import json | |
class BlackFridayPreprocess(BaseEstimator): | |
def __init__(self): | |
pass | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X): | |
if not self._check_input(X): | |
raise TypeError(""" | |
X is not a pandas DataFrame nor dict nor np.ndarray, | |
""") | |
if isinstance(X, np.ndarray): # Parse request at CMLE | |
try: | |
X_str = str(X.astype(str)).replace( | |
"u'", "\"").replace("'", "\"").replace("nan","NaN") | |
X_dict = json.loads(X_str) | |
X = pd.DataFrame(X_dict) | |
except Exception as e: | |
raise ValueError( | |
"An error occurred on converting X to DataFrame {}".format(X)) | |
if isinstance(X, dict): | |
_X = pd.DataFrame(X) | |
else: | |
_X = X.copy() | |
_X = _X.fillna(-1) | |
for col_name in _X.columns: | |
_X[col_name] = _X[col_name].apply(self._convert_to_str) | |
return _X | |
def _check_input(self, X): | |
return isinstance(X, pd.DataFrame) or isinstance(X, dict) or isinstance(X, np.ndarray) | |
def _convert_to_str(self, v): | |
if type(v) == float: | |
return str(int(v)) | |
elif type(v) == int: | |
return str(v) | |
else: | |
return v | |
class BlackFridayIdTransformer(BaseEstimator): | |
def __init__(self, min_freq): | |
self.id_list = {} | |
if isinstance(min_freq, dict): | |
for name in min_freq: | |
self.id_list[name] = [] | |
else: | |
raise TypeError('id_name should be a dict str -> int') | |
self.min_freq = min_freq | |
self.unknown_str = '<UNKNOWN>' | |
def fit(self, X, y=None): | |
for id_name in self.id_list: | |
_X = X.groupby(id_name)[id_name].count().to_frame( | |
name='count').reset_index() | |
self.id_list[id_name] = _X[_X['count'] >= | |
self.min_freq[id_name]][id_name].to_list() | |
return self | |
def transform(self, X): | |
if not self._check_input(X): | |
raise TypeError(""" | |
X is not a pandas DataFrame nor dict nor np.ndarray, | |
""") | |
if isinstance(X, np.ndarray): # Parse request at CMLE | |
try: | |
X_str = str(X.astype(str)).replace( | |
"u'", "\"").replace("'", "\"") | |
X_dict = json.loads(X_str) | |
X = pd.DataFrame(X_dict) | |
except Exception as e: | |
raise ValueError( | |
"An error occurred on converting X to DataFrame") | |
if isinstance(X, dict): | |
_X = pd.DataFrame(X) | |
else: | |
_X = X.copy() | |
for id_name in self.id_list: | |
_X[id_name] = _X[id_name].apply( | |
lambda id_value: self._filter_id(id_name, id_value)) | |
return _X | |
def _check_input(self, X): | |
return isinstance(X, pd.DataFrame) or isinstance(X, dict) or isinstance(X, np.ndarray) | |
def _filter_id(self, id_name, id_value): | |
if id_value in self.id_list[id_name]: | |
return id_value | |
else: | |
return self.unknown_str | |
class BlackFridayLabelEncoder(BaseEstimator): | |
def __init__(self, feat_name_list=None): | |
self.label_encoder_dict = {} | |
self.feat_name_list = feat_name_list | |
def fit(self, X, y=None): | |
if self.feat_name_list is not None: | |
_feat_name_list = self.feat_name_list | |
else: | |
if isinstance(X, dict): | |
self.feat_name_list = list(X.keys()) | |
elif isinstance(X, pd.DataFrame): | |
self.feat_name_list = list(X.columns) | |
else: | |
raise TypeError('X is not a pandas DataFrame nor a dictionary') | |
for feat_name in self.feat_name_list: | |
self.label_encoder_dict[feat_name] = preprocessing.LabelEncoder() | |
self.label_encoder_dict[feat_name].fit(X[feat_name]) | |
return self | |
def transform(self, X): | |
if not self._check_input(X): | |
raise TypeError('X is not a pandas DataFrame nor a dictionary') | |
X_transformed = {} | |
for feat_name in self.feat_name_list: | |
X_transformed[feat_name] = self.label_encoder_dict[feat_name].transform( | |
X[feat_name]) | |
if isinstance(X, pd.DataFrame): | |
X_transformed = pd.DataFrame(X_transformed) | |
return X_transformed | |
def _check_input(self, X): | |
return (not isinstance(X, dict) or not isinstance(X, pd.DataFrame)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment