Created
January 24, 2022 14:16
-
-
Save klotzambein/79baad26fed59ea3eca4372071ab1640 to your computer and use it in GitHub Desktop.
semi-supervised.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from copy import copy | |
from os import access | |
from pathlib import Path | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from sklearn.semi_supervised import LabelPropagation | |
from sklearn.model_selection import train_test_split | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.metrics import f1_score | |
def to_xy(df): | |
y = df[["Class"]] | |
x = df.filter(regex="V\\d+") | |
return x, y | |
def load_data(): | |
df = pd.read_csv("./creditcard.csv") | |
df = df.sample(frac=1).reset_index(drop=True) | |
classes = df.groupby(df["Class"]) | |
min_group_size = min(map(len, classes.groups.values())) | |
balanced = pd.concat(map(lambda c: c[1].iloc[:min_group_size], classes)) | |
# balanced = balanced.sample(frac=1).reset_index(drop=True) | |
# print(balanced.describe()) | |
train, test = train_test_split( | |
balanced, test_size=0.2, stratify=balanced[["Class"]] | |
) | |
# print(np.mean(train["Class"]), np.mean(test["Class"])) | |
labled, unlabled = train_test_split( | |
train, test_size=0.7, stratify=train[["Class"]] | |
) | |
unlabled["Class"] = -1 | |
train = pd.concat((labled, unlabled)) | |
X_train, y_train = to_xy(train) | |
X_test, y_test = to_xy(test) | |
labled_at = y_train["Class"] != -1 | |
return { | |
"X_train": X_train, | |
"y_train": y_train["Class"], | |
"X_test": X_test, | |
"y_test": y_test["Class"], | |
"X_train_labled": X_train[labled_at], | |
"y_train_labled": y_train["Class"][labled_at], | |
} | |
def meassure(data, model): | |
prediction = model.predict(data["X_test"]) | |
accuracy = np.mean(prediction == data["y_test"]) | |
f1 = f1_score(data["y_test"], prediction) | |
print(f"{model}: A: {accuracy:.4}, F1: {f1:.4}") | |
return accuracy, f1 | |
def meassure_baseline(data, model): | |
model.fit(data["X_train_labled"], data["y_train_labled"]) | |
return meassure(data, model) | |
def meassure_semi_superviseed(data, model): | |
model.fit(data["X_train"], data["y_train"]) | |
return meassure(data, model) | |
def run(): | |
data = load_data() | |
print("Data shape:") | |
print(", ".join([f"{k}: {data[k].shape}" for k in data])) | |
print() | |
b_a, b_f1 = meassure_baseline(data, RandomForestClassifier()) | |
label_prop_model = LabelPropagation("knn", n_neighbors=7) | |
s_a, s_f1 = meassure_semi_superviseed(data, label_prop_model) | |
updated_data = copy(data) | |
updated_data["y_train"] = label_prop_model.transduction_ | |
updated_data["X_train_labled"] = updated_data["X_train"] | |
updated_data["y_train_labled"] = updated_data["y_train"] | |
print() | |
print("Updated data:") | |
print(", ".join([f"{k}: {updated_data[k].shape}" for k in updated_data])) | |
print() | |
ub_a, ub_f1 = meassure_baseline(updated_data, RandomForestClassifier()) | |
return b_a, b_f1, s_a, s_f1, ub_a, ub_f1 | |
def summarize_data(data): | |
b_a, b_f1, s_a, s_f1, ub_a, ub_f1 = data.transpose() | |
best_a = np.argmax(data[:, [0, 2, 4]], axis=1) | |
best_f1 = np.argmax(data[:, [1, 3, 5]], axis=1) | |
winners_a = np.bincount(best_a, minlength=3) | |
winners_f1 = np.bincount(best_f1, minlength=3) | |
columns = [ | |
"model", | |
"mean-acc", | |
"mean-f1", | |
"sd-acc", | |
"sd-f1", | |
"wins-by-acc", | |
"wins-by-f1", | |
] | |
df = pd.DataFrame(columns=columns) | |
bl_row = [ | |
"baseline", | |
np.mean(b_a), | |
np.mean(b_f1), | |
np.std(b_a), | |
np.std(b_f1), | |
winners_a[0], | |
winners_f1[0], | |
] | |
df = df.append(pd.Series(bl_row, index=columns), ignore_index=True) | |
ss_row = [ | |
"semi-super", | |
np.mean(s_a), | |
np.mean(s_f1), | |
np.std(s_a), | |
np.std(s_f1), | |
winners_a[1], | |
winners_f1[1], | |
] | |
df = df.append(pd.Series(ss_row, index=columns), ignore_index=True) | |
ub_row = [ | |
"new-baseline", | |
np.mean(ub_a), | |
np.mean(ub_f1), | |
np.std(ub_a), | |
np.std(ub_f1), | |
winners_a[2], | |
winners_f1[2], | |
] | |
df = df.append(pd.Series(ub_row, index=columns), ignore_index=True) | |
print(df.transpose()) | |
def plot_data(data): | |
b_a, b_f1, s_a, s_f1, ub_a, ub_f1 = data.transpose() | |
iters = np.arange(100) | |
ax1 = plt.subplot(311) | |
plt.plot(iters, b_a, "--x", linewidth=1, label="Accuracy") | |
plt.plot(iters, b_f1, "--o", linewidth=1, label="F1", fillstyle="none") | |
plt.plot([-2, 101], np.full((2,), np.mean(b_a)), label="Mean Accuracy") | |
plt.tick_params("x", labelbottom=False) | |
plt.ylabel("Baseline random forest") | |
plt.xlim(-2, 101) | |
plt.legend() | |
# share x only | |
ax2 = plt.subplot(312, sharex=ax1) | |
plt.plot(iters, s_a, "--x", linewidth=1) | |
plt.plot(iters, s_f1, "--o", linewidth=1, label="F1", fillstyle="none") | |
plt.plot([-2, 101], np.full((2,), np.mean(s_a))) | |
# make these tick labels invisible | |
plt.tick_params("x", labelbottom=False) | |
plt.ylabel("Semi-supervised") | |
# share x and y | |
ax3 = plt.subplot(313, sharex=ax1) | |
plt.plot(iters, ub_a, "--x", linewidth=1) | |
plt.plot(iters, ub_f1, "--o", linewidth=1, label="F1", fillstyle="none") | |
plt.plot([-2, 101], np.full((2,), np.mean(ub_a))) | |
plt.xlabel("Iterations") | |
plt.ylabel("Full data random forest") | |
plt.subplots_adjust(hspace=0.0) | |
plt.show() | |
if __name__ == "__main__": | |
if Path("results.npy").exists(): | |
data = np.load("results.npy") | |
summarize_data(data) | |
plot_data(data) | |
else: | |
N = 100 | |
data = np.zeros((N, 6)) | |
for i in range(N): | |
print() | |
print() | |
print(f"Iteration {i}:") | |
data[i, :] = run() | |
summarize_data(data) | |
np.save("results", data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment