Skip to content

Instantly share code, notes, and snippets.

@klotzambein
Created January 24, 2022 14:16
Show Gist options
  • Save klotzambein/79baad26fed59ea3eca4372071ab1640 to your computer and use it in GitHub Desktop.
Save klotzambein/79baad26fed59ea3eca4372071ab1640 to your computer and use it in GitHub Desktop.
semi-supervised.py
from copy import copy
from os import access
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.semi_supervised import LabelPropagation
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
def to_xy(df):
y = df[["Class"]]
x = df.filter(regex="V\\d+")
return x, y
def load_data():
df = pd.read_csv("./creditcard.csv")
df = df.sample(frac=1).reset_index(drop=True)
classes = df.groupby(df["Class"])
min_group_size = min(map(len, classes.groups.values()))
balanced = pd.concat(map(lambda c: c[1].iloc[:min_group_size], classes))
# balanced = balanced.sample(frac=1).reset_index(drop=True)
# print(balanced.describe())
train, test = train_test_split(
balanced, test_size=0.2, stratify=balanced[["Class"]]
)
# print(np.mean(train["Class"]), np.mean(test["Class"]))
labled, unlabled = train_test_split(
train, test_size=0.7, stratify=train[["Class"]]
)
unlabled["Class"] = -1
train = pd.concat((labled, unlabled))
X_train, y_train = to_xy(train)
X_test, y_test = to_xy(test)
labled_at = y_train["Class"] != -1
return {
"X_train": X_train,
"y_train": y_train["Class"],
"X_test": X_test,
"y_test": y_test["Class"],
"X_train_labled": X_train[labled_at],
"y_train_labled": y_train["Class"][labled_at],
}
def meassure(data, model):
prediction = model.predict(data["X_test"])
accuracy = np.mean(prediction == data["y_test"])
f1 = f1_score(data["y_test"], prediction)
print(f"{model}: A: {accuracy:.4}, F1: {f1:.4}")
return accuracy, f1
def meassure_baseline(data, model):
model.fit(data["X_train_labled"], data["y_train_labled"])
return meassure(data, model)
def meassure_semi_superviseed(data, model):
model.fit(data["X_train"], data["y_train"])
return meassure(data, model)
def run():
data = load_data()
print("Data shape:")
print(", ".join([f"{k}: {data[k].shape}" for k in data]))
print()
b_a, b_f1 = meassure_baseline(data, RandomForestClassifier())
label_prop_model = LabelPropagation("knn", n_neighbors=7)
s_a, s_f1 = meassure_semi_superviseed(data, label_prop_model)
updated_data = copy(data)
updated_data["y_train"] = label_prop_model.transduction_
updated_data["X_train_labled"] = updated_data["X_train"]
updated_data["y_train_labled"] = updated_data["y_train"]
print()
print("Updated data:")
print(", ".join([f"{k}: {updated_data[k].shape}" for k in updated_data]))
print()
ub_a, ub_f1 = meassure_baseline(updated_data, RandomForestClassifier())
return b_a, b_f1, s_a, s_f1, ub_a, ub_f1
def summarize_data(data):
b_a, b_f1, s_a, s_f1, ub_a, ub_f1 = data.transpose()
best_a = np.argmax(data[:, [0, 2, 4]], axis=1)
best_f1 = np.argmax(data[:, [1, 3, 5]], axis=1)
winners_a = np.bincount(best_a, minlength=3)
winners_f1 = np.bincount(best_f1, minlength=3)
columns = [
"model",
"mean-acc",
"mean-f1",
"sd-acc",
"sd-f1",
"wins-by-acc",
"wins-by-f1",
]
df = pd.DataFrame(columns=columns)
bl_row = [
"baseline",
np.mean(b_a),
np.mean(b_f1),
np.std(b_a),
np.std(b_f1),
winners_a[0],
winners_f1[0],
]
df = df.append(pd.Series(bl_row, index=columns), ignore_index=True)
ss_row = [
"semi-super",
np.mean(s_a),
np.mean(s_f1),
np.std(s_a),
np.std(s_f1),
winners_a[1],
winners_f1[1],
]
df = df.append(pd.Series(ss_row, index=columns), ignore_index=True)
ub_row = [
"new-baseline",
np.mean(ub_a),
np.mean(ub_f1),
np.std(ub_a),
np.std(ub_f1),
winners_a[2],
winners_f1[2],
]
df = df.append(pd.Series(ub_row, index=columns), ignore_index=True)
print(df.transpose())
def plot_data(data):
b_a, b_f1, s_a, s_f1, ub_a, ub_f1 = data.transpose()
iters = np.arange(100)
ax1 = plt.subplot(311)
plt.plot(iters, b_a, "--x", linewidth=1, label="Accuracy")
plt.plot(iters, b_f1, "--o", linewidth=1, label="F1", fillstyle="none")
plt.plot([-2, 101], np.full((2,), np.mean(b_a)), label="Mean Accuracy")
plt.tick_params("x", labelbottom=False)
plt.ylabel("Baseline random forest")
plt.xlim(-2, 101)
plt.legend()
# share x only
ax2 = plt.subplot(312, sharex=ax1)
plt.plot(iters, s_a, "--x", linewidth=1)
plt.plot(iters, s_f1, "--o", linewidth=1, label="F1", fillstyle="none")
plt.plot([-2, 101], np.full((2,), np.mean(s_a)))
# make these tick labels invisible
plt.tick_params("x", labelbottom=False)
plt.ylabel("Semi-supervised")
# share x and y
ax3 = plt.subplot(313, sharex=ax1)
plt.plot(iters, ub_a, "--x", linewidth=1)
plt.plot(iters, ub_f1, "--o", linewidth=1, label="F1", fillstyle="none")
plt.plot([-2, 101], np.full((2,), np.mean(ub_a)))
plt.xlabel("Iterations")
plt.ylabel("Full data random forest")
plt.subplots_adjust(hspace=0.0)
plt.show()
if __name__ == "__main__":
if Path("results.npy").exists():
data = np.load("results.npy")
summarize_data(data)
plot_data(data)
else:
N = 100
data = np.zeros((N, 6))
for i in range(N):
print()
print()
print(f"Iteration {i}:")
data[i, :] = run()
summarize_data(data)
np.save("results", data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment