Skip to content

Instantly share code, notes, and snippets.

@hdary85
Last active April 7, 2025 14:13
Show Gist options
  • Save hdary85/95ff7f3ce61f7a3c3d092261b97f54b2 to your computer and use it in GitHub Desktop.
Save hdary85/95ff7f3ce61f7a3c3d092261b97f54b2 to your computer and use it in GitHub Desktop.
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.cluster import DBSCAN
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import seaborn as sns
# --- 1. Préparation du jeu de données ---
# Suppose que df_merged est déjà construit avec les colonnes nécessaires
# Liste des colonnes numériques pertinentes
features_cols = ['TV_COUNT', 'TV_TOTAL_AMOUNT', 'TV_AVG_AMOUNT', 'TV_RISQUE_SECTEUR',
'TV_ORIGINATOR_COUNT', 'TV_BENEFICIARY_COUNT', 'TV_ORIGINATOR_SUM', 'TV_BENEFICIARY_SUM',
'ESPECES_NBR_TRX', 'ESPECES_TOTAL', 'RATIO_ESPECES_TO_TV']
# Variables indicatrices : PTYPE_1, PTYPE_2
one_hot_cols = [col for col in df_merged.columns if col.startswith('PTYPE_')]
# Construction du jeu de données final pour le modèle
features = pd.concat([df_merged[features_cols], df_merged[one_hot_cols]], axis=1)
# Mise à l'échelle
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
# --- 2. Isolation Forest ---
# Méthode basée sur l'isolement dans des arbres aléatoires
iso_forest = IsolationForest(contamination=0.05, random_state=42)
df_merged['iforest_label'] = iso_forest.fit_predict(features_scaled)
df_merged['iforest_score'] = iso_forest.decision_function(features_scaled)
# Graphe des scores d'Isolation Forest
plt.figure(figsize=(10, 5))
sns.histplot(df_merged['iforest_score'], bins=50, kde=True)
plt.axvline(df_merged[df_merged['iforest_label'] == -1]['iforest_score'].max(), color='red', linestyle='--')
plt.title("Distribution des scores d'anomalie - Isolation Forest")
plt.xlabel("Score d'isolement")
plt.ylabel("Nombre de clients")
plt.grid(True)
plt.show()
# --- 3. Autoencodeur ---
# Réseau de neurones qui apprend à "reconstruire" les données normales
X_train, X_val = train_test_split(features_scaled, test_size=0.2, random_state=42)
input_dim = features_scaled.shape[1]
encoding_dim = max(int(input_dim / 2), 1)
# Architecture du modèle autoencodeur
input_layer = Input(shape=(input_dim,))
encoded = Dense(encoding_dim, activation='relu')(input_layer)
encoded = Dense(max(int(encoding_dim / 2), 1), activation='relu')(encoded)
decoded = Dense(encoding_dim, activation='relu')(encoded)
decoded = Dense(input_dim, activation='linear')(decoded)
autoencoder = Model(inputs=input_layer, outputs=decoded)
autoencoder.compile(optimizer='adam', loss='mse')
# Entraînement avec arrêt précoce
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
autoencoder.fit(X_train, X_train,
epochs=100,
batch_size=32,
shuffle=True,
validation_data=(X_val, X_val),
callbacks=[early_stop],
verbose=0)
# Calcul des erreurs de reconstruction
reconstructions = autoencoder.predict(features_scaled)
reconstruction_errors = np.mean(np.square(features_scaled - reconstructions), axis=1)
df_merged['ae_reconstruction_error'] = reconstruction_errors
ae_threshold = np.percentile(reconstruction_errors, 95)
df_merged['ae_label'] = (df_merged['ae_reconstruction_error'] > ae_threshold).astype(int)
# Graphe des erreurs de reconstruction
plt.figure(figsize=(10, 5))
sns.histplot(reconstruction_errors, bins=50, kde=True)
plt.axvline(ae_threshold, color='red', linestyle='--', label='Seuil (95e percentile)')
plt.title("Erreur de reconstruction - Autoencodeur")
plt.xlabel("Erreur de reconstruction")
plt.ylabel("Nombre de clients")
plt.legend()
plt.grid(True)
plt.show()
# --- 4. DBSCAN ---
# Méthode non supervisée basée sur la densité : détecte les points isolés
dbscan = DBSCAN(eps=0.5, min_samples=5)
df_merged['dbscan_label'] = dbscan.fit_predict(features_scaled)
df_merged['dbscan_anomaly'] = (df_merged['dbscan_label'] == -1).astype(int)
# Graphe des clusters DBSCAN
plt.figure(figsize=(6, 4))
sns.countplot(x='dbscan_label', data=df_merged)
plt.title("Clusters DBSCAN")
plt.xlabel("Label DBSCAN (-1 = anomalie)")
plt.ylabel("Nombre de clients")
plt.grid(True)
plt.show()
# --- 5. Fusion des méthodes et explication ---
df_merged['combined_anomaly'] = (
(df_merged['iforest_label'] == -1) |
(df_merged['ae_label'] == 1) |
(df_merged['dbscan_anomaly'] == 1)
).astype(int)
# Raison de l'alerte
def determine_alert_reason(row):
reasons = []
if row['iforest_label'] == -1:
reasons.append("IsolationForest")
if row['ae_label'] == 1:
reasons.append("Autoencodeur")
if row['dbscan_anomaly'] == 1:
reasons.append("DBSCAN")
return "Anomalie détectée par " + ", ".join(reasons) if reasons else ""
df_merged['alert_reason'] = df_merged.apply(determine_alert_reason, axis=1)
# --- 6. Analyse des variables les plus contributives ---
# Calcul du z-score (standardisation)
standardized = (df_merged[features_cols] - df_merged[features_cols].mean()) / df_merged[features_cols].std()
suspects = df_merged[df_merged['combined_anomaly'] == 1].copy()
standardized_suspects = standardized.loc[suspects.index]
# Fonction pour extraire les 2 variables les plus anormales
def get_top_2_vars(row_std, row_orig):
top_vars = row_std.abs().sort_values(ascending=False).head(2).index
return [(var, row_orig[var], "élevé" if row_std[var] > 0 else "faible") for var in top_vars]
top2_by_client = []
for idx in suspects.index:
top2_by_client.append(get_top_2_vars(standardized_suspects.loc[idx], df_merged.loc[idx]))
# Enregistrement dans df_merged
df_merged.loc[suspects.index, 'top_var_1'] = [f"{v[0]} ({v[2]}, {v[1]:,.2f})" for v in [x[0] for x in top2_by_client]]
df_merged.loc[suspects.index, 'top_var_2'] = [f"{v[0]} ({v[2]}, {v[1]:,.2f})" for v in [x[1] for x in top2_by_client]]
# --- 7. Résumé final des alertes ---
final_alerts = df_merged[df_merged['combined_anomaly'] == 1][[
'PARTY_KEY', 'alert_reason', 'top_var_1', 'top_var_2',
'iforest_score', 'ae_reconstruction_error', 'dbscan_label'
]]
# Afficher les résultats
print("🔍 Clients suspects :")
print(final_alerts.head(10))
# Export si besoin
final_alerts.to_csv("clients_suspects_anomalies.csv", index=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment