Skip to content

Instantly share code, notes, and snippets.

@Xnuvers007
Last active March 30, 2026 10:29
Show Gist options
  • Select an option

  • Save Xnuvers007/6abef6edf37709a668344ee56145f76b to your computer and use it in GitHub Desktop.

Select an option

Save Xnuvers007/6abef6edf37709a668344ee56145f76b to your computer and use it in GitHub Desktop.
Spam or scammer message detection there is first version and seconds version (recommended main and main2 and main3, main3.1 is powerfull) main4 is okay and powerfull but too slow this is my collab : https://colab.research.google.com/drive/1doTrwTWvQ3eRL5Lc-1xsoHdjTVJm7Lah?usp=sharing
while True:
print("kirim pesan 'q' (tanpa kutip) untuk keluar")
questions = input("Masukan Pesan : ")
if questions.lower() in ['keluar', 'exit', 'quit', 'q', '']:
print("Sesi selesai.")
detect_scam(questions)
while True:
print("kirim pesan 'q' (tanpa kutip) untuk keluar")
questions = input("Masukan Pesan : ")
if questions.lower() in ['keluar', 'exit', 'quit', 'q', '']:
print("Sesi selesai.")
explain_for_colab(questions)
import pandas as pd
import numpy as np
import re
import warnings
warnings.filterwarnings('ignore')
from datasets import load_dataset
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import VotingClassifier, RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.metrics import (
accuracy_score, classification_report, confusion_matrix,
roc_auc_score, f1_score
)
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.calibration import CalibratedClassifierCV
from imblearn.over_sampling import SMOTE
import xgboost as xgb
print("βœ… Semua library berhasil diimport!")
# ## πŸ—„οΈ CELL 2 β€” Load & Gabungkan Multi-Dataset
# ============================================================
# CELL 2 β€” Load multi-dataset dari Hugging Face
# ============================================================
all_texts = []
all_labels = []
# --- Dataset 1: Spam Detection ---
try:
print("πŸ“₯ [1/3] Mengunduh dataset spam detection...")
ds1 = load_dataset("Deysi/spam-detection-dataset", split="train")
df1 = pd.DataFrame(ds1)
col_text = 'text' if 'text' in df1.columns else df1.columns[0]
col_label = 'label' if 'label' in df1.columns else df1.columns[1]
df1 = df1[[col_text, col_label]].rename(columns={col_text: 'text', col_label: 'label'})
df1['label'] = df1['label'].apply(lambda x: 1 if str(x).lower() in ['spam', '1', 'true'] else 0)
all_texts.extend(df1['text'].tolist())
all_labels.extend(df1['label'].tolist())
print(f" βœ… Dataset 1 berhasil: {len(df1)} baris (spam: {df1['label'].sum()}, ham: {(df1['label']==0).sum()})")
except Exception as e:
print(f" ⚠️ Dataset 1 gagal: {e}")
# --- Dataset 2: SMS Spam Collection ---
try:
print("πŸ“₯ [2/3] Mengunduh SMS spam collection...")
ds2 = load_dataset("ucirvine/sms_spam", split="train")
df2 = pd.DataFrame(ds2)
col_text = 'sms' if 'sms' in df2.columns else df2.columns[1]
col_label = 'label' if 'label' in df2.columns else df2.columns[0]
df2 = df2[[col_text, col_label]].rename(columns={col_text: 'text', col_label: 'label'})
df2['label'] = df2['label'].apply(lambda x: 1 if str(x).lower() in ['spam', '1', 'true'] else 0)
all_texts.extend(df2['text'].tolist())
all_labels.extend(df2['label'].tolist())
print(f" βœ… Dataset 2 berhasil: {len(df2)} baris")
except Exception as e:
print(f" ⚠️ Dataset 2 gagal: {e}")
# --- Dataset 3: Enron Email (Phishing/Spam) ---
try:
print("πŸ“₯ [3/3] Mengunduh Enron email dataset...")
ds3 = load_dataset("SetFit/enron_spam", split="train")
df3 = pd.DataFrame(ds3)
if 'subject' in df3.columns and 'message' in df3.columns:
df3['text'] = df3['subject'].fillna('') + ' ' + df3['message'].fillna('')
elif 'text' in df3.columns:
pass
else:
df3['text'] = df3.iloc[:, 0]
col_label = 'label' if 'label' in df3.columns else 'spam'
df3 = df3[['text', col_label]].rename(columns={col_label: 'label'})
df3['label'] = df3['label'].apply(lambda x: 1 if str(x).lower() in ['spam', '1', 'true'] else 0)
df3 = df3.sample(min(5000, len(df3)), random_state=42) # batasi agar seimbang
all_texts.extend(df3['text'].tolist())
all_labels.extend(df3['label'].tolist())
print(f" βœ… Dataset 3 berhasil: {len(df3)} baris")
except Exception as e:
print(f" ⚠️ Dataset 3 gagal: {e}")
# --- Gabungkan semua ---
df_all = pd.DataFrame({'text': all_texts, 'label': all_labels})
df_all = df_all.dropna(subset=['text'])
df_all['text'] = df_all['text'].astype(str)
print(f"\nπŸ“Š TOTAL DATASET GABUNGAN: {len(df_all)} baris")
print(f" πŸ”΄ SCAM/SPAM : {df_all['label'].sum()} ({df_all['label'].mean()*100:.1f}%)")
print(f" 🟒 AMAN/HAM : {(df_all['label']==0).sum()} ({(1-df_all['label'].mean())*100:.1f}%)")
# ## βš™οΈ CELL 3 β€” Feature Engineering Khusus Scam
# ============================================================
# CELL 3 β€” Feature Engineering: ekstrak sinyal scam secara manual
# ============================================================
class ScamFeatureExtractor(BaseEstimator, TransformerMixin):
"""
Custom transformer yang mengekstrak 30+ fitur numerik khusus scam:
- URL & link berbahaya
- Kata urgensi & ancaman
- Remote access tools
- Pola penipuan finansial
- Statistik teks mencurigakan
"""
# Kata-kata sinyal scam (dikelompokkan per kategori)
URGENCY_WORDS = [
'urgent', 'immediately', 'asap', 'right now', 'limited time',
'expires', 'deadline', 'act now', 'don\'t wait', 'hurry',
'segera', 'sekarang juga', 'cepat', 'batas waktu', 'darurat'
]
THREAT_WORDS = [
'suspended', 'blocked', 'terminated', 'banned', 'closed',
'account locked', 'verify now', 'confirm identity', 'suspension',
'diblokir', 'ditangguhkan', 'verifikasi', 'konfirmasi'
]
MONEY_WORDS = [
'prize', 'winner', 'won', 'lottery', 'jackpot', 'free money',
'cash', 'reward', 'bonus', 'gift card', 'bitcoin', 'crypto',
'transfer', 'wire', 'western union', 'moneygram', 'bank account',
'hadiah', 'menang', 'gratis', 'uang', 'transfer', 'rekening'
]
REMOTE_ACCESS = [
'anydesk', 'teamviewer', 'remote', 'screen share', 'remote access',
'install', 'download app', 'access your computer', 'take control',
'remote desktop', 'vnc', 'rustdesk', 'ultraviewer'
]
PHISHING_WORDS = [
'click here', 'login', 'password', 'username', 'credential',
'sign in', 'verify your', 'update your', 'confirm your',
'account information', 'billing info', 'credit card', 'cvv',
'social security', 'ssn', 'otp', 'pin', 'kode otp'
]
IMPERSONATION = [
'amazon', 'paypal', 'apple', 'microsoft', 'google', 'facebook',
'instagram', 'netflix', 'bank', 'irs', 'government', 'police',
'bri', 'bca', 'mandiri', 'ojk', 'polisi', 'pemerintah'
]
def fit(self, X, y=None):
return self
def _count_matches(self, text, word_list):
text_lower = text.lower()
return sum(1 for w in word_list if w in text_lower)
def transform(self, X):
features = []
for text in X:
text = str(text)
t = text.lower()
# --- Sinyal kata kunci ---
f_urgency = self._count_matches(t, self.URGENCY_WORDS)
f_threat = self._count_matches(t, self.THREAT_WORDS)
f_money = self._count_matches(t, self.MONEY_WORDS)
f_remote = self._count_matches(t, self.REMOTE_ACCESS)
f_phishing = self._count_matches(t, self.PHISHING_WORDS)
f_imperson = self._count_matches(t, self.IMPERSONATION)
# --- Sinyal URL & Link ---
f_url_count = len(re.findall(r'http[s]?://', t))
f_short_url = len(re.findall(r'(bit\.ly|tinyurl|goo\.gl|t\.co|ow\.ly|buff\.ly)', t))
f_has_ip_url = int(bool(re.search(r'http[s]?://\d+\.\d+\.\d+\.\d+', t)))
f_suspicious_tld = len(re.findall(r'\.(xyz|top|win|click|download|review|loan|work|party)', t))
# --- Sinyal tipografi mencurigakan ---
f_caps_ratio = sum(1 for c in text if c.isupper()) / max(len(text), 1)
f_excl_count = text.count('!')
f_dollar = text.count('$') + text.count('€') + text.count('Β£')
f_has_phone = int(bool(re.search(r'(\+62|\+1|\+44)?[\s.-]?\(?\d{3}\)?[\s.-]?\d{3,4}[\s.-]?\d{4}', text)))
f_num_count = len(re.findall(r'\b\d{4,}\b', text)) # angka panjang (nomor rekening, dsb)
# --- Statistik teks ---
words = t.split()
f_text_len = len(text)
f_word_count = len(words)
f_avg_word_len = np.mean([len(w) for w in words]) if words else 0
f_unique_ratio = len(set(words)) / max(len(words), 1)
# --- Skor agregat ---
f_total_danger = f_urgency + f_threat * 2 + f_money + f_remote * 3 + f_phishing * 2
features.append([
f_urgency, f_threat, f_money, f_remote, f_phishing, f_imperson,
f_url_count, f_short_url, f_has_ip_url, f_suspicious_tld,
f_caps_ratio, f_excl_count, f_dollar, f_has_phone, f_num_count,
f_text_len, f_word_count, f_avg_word_len, f_unique_ratio,
f_total_danger
])
return np.array(features)
print("βœ… ScamFeatureExtractor siap digunakan!")
print(f" πŸ“ Jumlah fitur manual: 20 fitur numerik")
# Preview fitur pada contoh pesan
test_msg = ["Install Anydesk for me to access remotely. You should pass the video/ID verification to avoid suspension."]
extractor = ScamFeatureExtractor()
feat = extractor.transform(test_msg)[0]
nama_fitur = ['urgency','threat','money','remote','phishing','impersonation',
'url_count','short_url','ip_url','bad_tld',
'caps_ratio','exclamation','currency_symbol','phone_number','long_numbers',
'text_length','word_count','avg_word_len','unique_word_ratio','danger_score']
print("\nπŸ” Preview fitur untuk pesan uji:")
for name, val in zip(nama_fitur, feat):
if val > 0:
print(f" ⚠️ {name:25s}: {val:.3f}")
# ## πŸ€– CELL 4 β€” Bangun & Latih Ensemble Model
# ============================================================
# CELL 4 β€” Buat Ensemble Model Powerfull
# ============================================================
class TextSelector(BaseEstimator, TransformerMixin):
"""Selector untuk kolom teks dari DataFrame."""
def fit(self, X, y=None): return self
def transform(self, X):
return X if isinstance(X, (list, np.ndarray)) else X.tolist()
# --- Preprocessing teks ---
def preprocess_text(text):
text = str(text).lower()
text = re.sub(r'<[^>]+>', ' ', text) # hapus HTML tags
text = re.sub(r'http\S+|www\S+', ' URL ', text) # ganti URL dengan token
text = re.sub(r'\b\d{10,}\b', ' LONGNUM ', text) # angka panjang
text = re.sub(r'[^\w\s!?$]', ' ', text) # simpan tanda ekspresi
text = re.sub(r'\s+', ' ', text).strip()
return text
print("πŸ”§ Mempersiapkan data...")
df_all['text_clean'] = df_all['text'].apply(preprocess_text)
X = df_all['text_clean'].values
y = df_all['label'].values
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f" Train: {len(X_train)} | Test: {len(X_test)}")
# --- Komponen TF-IDF (2 level: unigram + bigram) ---
tfidf_word = TfidfVectorizer(
analyzer='word',
ngram_range=(1, 2), # unigram + bigram
max_features=50000,
min_df=2,
max_df=0.95,
sublinear_tf=True, # log normalisasi
strip_accents='unicode'
)
tfidf_char = TfidfVectorizer(
analyzer='char_wb',
ngram_range=(3, 5), # karakter n-gram (menangkap typo scammer)
max_features=20000,
min_df=3,
sublinear_tf=True
)
# --- Bangun fitur gabungan ---
from scipy.sparse import hstack
print("πŸ“Š Membangun matriks fitur TF-IDF...")
X_train_w = tfidf_word.fit_transform(X_train)
X_test_w = tfidf_word.transform(X_test)
X_train_c = tfidf_char.fit_transform(X_train)
X_test_c = tfidf_char.transform(X_test)
# Ekstrak fitur manual
feat_extractor = ScamFeatureExtractor()
X_train_f = feat_extractor.transform(X_train)
X_test_f = feat_extractor.transform(X_test)
# Gabungkan semua fitur
from scipy.sparse import csr_matrix
X_train_combined = hstack([X_train_w, X_train_c, csr_matrix(X_train_f)])
X_test_combined = hstack([X_test_w, X_test_c, csr_matrix(X_test_f)])
print(f" βœ… Dimensi fitur gabungan: {X_train_combined.shape[1]:,} kolom")
# --- Definisi Model ---
print("\nπŸ—οΈ Membangun model-model...")
clf_lr = LogisticRegression(
C=5.0, max_iter=1000, solver='lbfgs',
class_weight='balanced', random_state=42
)
clf_svm = CalibratedClassifierCV(
LinearSVC(C=1.0, max_iter=2000, class_weight='balanced', random_state=42)
)
clf_xgb = xgb.XGBClassifier(
n_estimators=300, max_depth=6,
learning_rate=0.1, subsample=0.8,
colsample_bytree=0.8,
scale_pos_weight=len(y_train[y_train==0]) / max(len(y_train[y_train==1]), 1),
use_label_encoder=False, eval_metric='logloss',
random_state=42, n_jobs=-1
)
# Latih masing-masing model
print("⏳ Melatih Logistic Regression...")
clf_lr.fit(X_train_combined, y_train)
print("⏳ Melatih SVM (dikalibrasi)...")
clf_svm.fit(X_train_combined, y_train)
print("⏳ Melatih XGBoost...")
clf_xgb.fit(X_train_combined, y_train)
print("\nβœ… Semua model selesai dilatih!")
# ## πŸ“Š CELL 5 β€” Evaluasi Lengkap
# ============================================================
# CELL 5 β€” Evaluasi detail semua model
# ============================================================
def evaluate_model(name, clf, X_tr, X_te, y_te):
y_pred = clf.predict(X_te)
y_prob = clf.predict_proba(X_te)[:, 1]
acc = accuracy_score(y_te, y_pred)
f1 = f1_score(y_te, y_pred, average='weighted')
auc = roc_auc_score(y_te, y_prob)
print(f"\n{'='*55}")
print(f" Model: {name}")
print(f"{'='*55}")
print(f" Akurasi : {acc*100:.2f}%")
print(f" F1-Score : {f1*100:.2f}%")
print(f" AUC-ROC : {auc*100:.2f}%")
print(f"\n{classification_report(y_te, y_pred, target_names=['HAM (aman)', 'SPAM/SCAM'])}")
cm = confusion_matrix(y_te, y_pred)
print(f" Confusion Matrix:")
print(f" Prediksi HAM Prediksi SCAM")
print(f" Asli HAM {cm[0,0]:6d} {cm[0,1]:6d}")
print(f" Asli SCAM {cm[1,0]:6d} {cm[1,1]:6d}")
return acc, f1, auc
results = {}
results['Logistic Regression'] = evaluate_model('Logistic Regression', clf_lr, X_train_combined, X_test_combined, y_test)
results['SVM (Calibrated)'] = evaluate_model('SVM (Calibrated)', clf_svm, X_train_combined, X_test_combined, y_test)
results['XGBoost'] = evaluate_model('XGBoost', clf_xgb, X_train_combined, X_test_combined, y_test)
print("\n" + "="*55)
print(" RINGKASAN PERBANDINGAN MODEL")
print("="*55)
print(f" {'Model':<25} {'Akurasi':>8} {'F1':>8} {'AUC':>8}")
print("-"*55)
for name, (acc, f1, auc) in results.items():
print(f" {name:<25} {acc*100:>7.2f}% {f1*100:>7.2f}% {auc*100:>7.2f}%")
# ## πŸ† CELL 6 β€” Voting Ensemble (Model Terbaik)
# ============================================================
# CELL 6 β€” Gabungkan semua model jadi Voting Ensemble
# ============================================================
print("πŸ—³οΈ Membangun Voting Ensemble (soft voting)...")
class EnsembleScamDetector:
"""
Ensemble final: rata-rata probabilitas dari LR + SVM + XGBoost
dengan bobot: LR=0.3, SVM=0.3, XGBoost=0.4
"""
def __init__(self, lr, svm, xgb_model, weights=(0.3, 0.3, 0.4)):
self.lr = lr
self.svm = svm
self.xgb = xgb_model
self.weights = weights
def predict_proba(self, X):
p_lr = self.lr.predict_proba(X)
p_svm = self.svm.predict_proba(X)
p_xgb = self.xgb.predict_proba(X)
w = self.weights
return w[0]*p_lr + w[1]*p_svm + w[2]*p_xgb
def predict(self, X, threshold=0.5):
proba = self.predict_proba(X)
return (proba[:, 1] >= threshold).astype(int)
def classes_(self):
return np.array([0, 1])
ensemble = EnsembleScamDetector(clf_lr, clf_svm, clf_xgb)
# Evaluasi ensemble
y_pred_ens = ensemble.predict(X_test_combined)
y_prob_ens = ensemble.predict_proba(X_test_combined)[:, 1]
acc_ens = accuracy_score(y_test, y_pred_ens)
f1_ens = f1_score(y_test, y_pred_ens, average='weighted')
auc_ens = roc_auc_score(y_test, y_prob_ens)
print("\n" + "="*55)
print(" πŸ† HASIL VOTING ENSEMBLE")
print("="*55)
print(f" Akurasi : {acc_ens*100:.2f}%")
print(f" F1-Score : {f1_ens*100:.2f}%")
print(f" AUC-ROC : {auc_ens*100:.2f}%")
print(f"\n{classification_report(y_test, y_pred_ens, target_names=['HAM (aman)', 'SPAM/SCAM'])}")
# Pilih model terbaik secara otomatis
best_acc = max(acc_ens, results['Logistic Regression'][0], results['SVM (Calibrated)'][0], results['XGBoost'][0])
if acc_ens == best_acc:
print("βœ… Menggunakan VOTING ENSEMBLE sebagai model final")
FINAL_MODEL = 'ensemble'
else:
print(f"ℹ️ Menggunakan model individual terbaik")
# ## πŸ” CELL 7 β€” Fungsi Deteksi Interaktif
# ============================================================
# CELL 7 β€” Fungsi deteksi pesan dengan laporan lengkap
# ============================================================
def detect_scam(pesan, threshold=0.45):
"""
Deteksi apakah sebuah pesan adalah SCAM/SPAM atau AMAN.
Parameters:
pesan : str β€” pesan yang ingin dideteksi
threshold : float β€” ambang batas (default 0.45, lebih sensitif dari 0.5)
Returns:
dict dengan hasil deteksi lengkap
"""
pesan_clean = preprocess_text(pesan)
X_w = tfidf_word.transform([pesan_clean])
X_c = tfidf_char.transform([pesan_clean])
X_f = csr_matrix(feat_extractor.transform([pesan_clean]))
X_combined = hstack([X_w, X_c, X_f])
proba = ensemble.predict_proba(X_combined)[0]
p_aman = proba[0]
p_scam = proba[1]
is_scam = p_scam >= threshold
# Analisis fitur manual
raw_feat = feat_extractor.transform([pesan])[0]
nama_fitur = ['urgency','threat','money','remote_access','phishing',
'impersonation','url_count','short_url','ip_url','bad_tld',
'caps_ratio','exclamation','currency','phone','long_numbers',
'text_length','word_count','avg_word_len','unique_ratio','danger_score']
sinyal_aktif = [(n, v) for n, v in zip(nama_fitur, raw_feat) if v > 0 and n not in
['text_length', 'word_count', 'avg_word_len', 'unique_ratio', 'caps_ratio']]
# Risk level
if p_scam >= 0.85:
risk_level = "πŸ”΄ SANGAT TINGGI"
elif p_scam >= 0.65:
risk_level = "🟠 TINGGI"
elif p_scam >= 0.45:
risk_level = "🟑 SEDANG"
elif p_scam >= 0.25:
risk_level = "πŸ”΅ RENDAH"
else:
risk_level = "🟒 SANGAT RENDAH"
# Cetak laporan
print("\n" + "═"*60)
print(" πŸ›‘οΈ LAPORAN DETEKSI SCAM/PHISHING")
print("═"*60)
print(f" Pesan : {pesan[:100]}{'...' if len(pesan)>100 else ''}")
print(f" Panjang : {len(pesan)} karakter | {len(pesan.split())} kata")
print("─"*60)
print(f" KEPUTUSAN : {'⚠️ SCAM / SPAM / PHISHING' if is_scam else 'βœ… PESAN AMAN (HAM)'}")
print(f" Risk Level: {risk_level}")
print("─"*60)
print(f" Probabilitas SCAM : {p_scam*100:>6.2f}% {'β–ˆ' * int(p_scam*30)}")
print(f" Probabilitas AMAN : {p_aman*100:>6.2f}% {'β–ˆ' * int(p_aman*30)}")
print(f" Threshold : {threshold*100:.0f}%")
print("─"*60)
if sinyal_aktif:
print(" ⚠️ SINYAL BAHAYA YANG TERDETEKSI:")
for nama, val in sinyal_aktif:
label = nama.replace('_', ' ').upper()
print(f" β€’ {label:<25}: {val:.2f}")
else:
print(" βœ… Tidak ada sinyal bahaya yang terdeteksi")
print("═"*60)
return {
'is_scam': bool(is_scam),
'probability_scam': float(p_scam),
'probability_safe': float(p_aman),
'risk_level': risk_level,
'signals': sinyal_aktif
}
print("βœ… Fungsi detect_scam() siap digunakan!")
print("\nCara pakai: detect_scam('isi pesan kamu di sini')")
# ## πŸ§ͺ CELL 8 β€” Uji Coba dengan Berbagai Pesan
# ============================================================
# CELL 8 β€” Uji dengan berbagai jenis pesan
# ============================================================
pesan_uji = [
# --- SCAM CASES ---
"Install Anydesk for me to access remotely. You should be able to pass the video/ID verification to avoid suspension.",
"URGENT: Your account has been suspended. Click here to verify your identity immediately or your account will be terminated.",
"Congratulations! You have won a $5,000 prize. Click http://bit.ly/claim-prize to claim your reward now!",
"Halo, kami dari tim keamanan BCA. Mohon konfirmasi nomor rekening dan PIN Anda untuk menghindari pemblokiran akun.",
"Download TeamViewer and give me remote access to fix your computer problem ASAP. Limited time offer!",
"Dear customer, your PayPal account has been limited. Please login at http://paypal-verify.xyz to restore access.",
# --- SAFE CASES ---
"Hey! Just wanted to check if you're coming to the team lunch tomorrow at 12pm? Let me know!",
"Reminder: Your dentist appointment is scheduled for next Tuesday at 3:00 PM. Please confirm your attendance.",
"Thank you for your order #12345. Your package will be delivered within 3-5 business days.",
"Hi, selamat pagi! Besok ada rapat tim jam 9. Tolong siapkan laporan mingguan kamu ya.",
]
for i, pesan in enumerate(pesan_uji, 1):
print(f"\n{'#'*60}")
print(f" PESAN KE-{i}")
detect_scam(pesan)
input_cont = '' # non-interactive loop
# ## πŸ’¬ CELL 9 β€” Mode Interaktif (Input Manual)
# ============================================================
# CELL 9 β€” Masukkan pesan kamu sendiri untuk dicek!
# ============================================================
print("πŸ›‘οΈ SCAM DETECTOR β€” Mode Interaktif")
print("Ketik pesan yang ingin kamu cek. Ketik 'keluar' untuk berhenti.\n")
while True:
try:
pesan = input("πŸ“© Masukkan pesan: ").strip()
if pesan.lower() in ['keluar', 'exit', 'quit', 'q', '']:
print("πŸ‘‹ Terima kasih telah menggunakan Scam Detector!")
break
detect_scam(pesan)
except (KeyboardInterrupt, EOFError):
print("\nπŸ‘‹ Sesi selesai.")
break
# ## πŸ’Ύ CELL 10 (Opsional) β€” Simpan Model
# ============================================================
# CELL 10 β€” Simpan model ke Google Drive (opsional)
# ============================================================
import pickle, os
# Mount Google Drive dulu jika ingin menyimpan ke sana
# from google.colab import drive
# drive.mount('/content/drive')
# SAVE_PATH = '/content/drive/MyDrive/scam_detector/'
SAVE_PATH = '/content/scam_detector_model/'
os.makedirs(SAVE_PATH, exist_ok=True)
model_bundle = {
'tfidf_word': tfidf_word,
'tfidf_char': tfidf_char,
'feat_extractor': feat_extractor,
'clf_lr': clf_lr,
'clf_svm': clf_svm,
'clf_xgb': clf_xgb,
'ensemble': ensemble,
'metadata': {
'accuracy': acc_ens,
'f1': f1_ens,
'auc': auc_ens,
'train_size': len(X_train)
}
}
with open(SAVE_PATH + 'scam_detector.pkl', 'wb') as f:
pickle.dump(model_bundle, f)
print(f"βœ… Model disimpan ke: {SAVE_PATH}scam_detector.pkl")
print(f" Akurasi: {acc_ens*100:.2f}% | F1: {f1_ens*100:.2f}% | AUC: {auc_ens*100:.2f}%")
import pandas as pd
import numpy as np
import re
import warnings
warnings.filterwarnings('ignore')
from datasets import load_dataset
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import VotingClassifier, RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.metrics import (
accuracy_score, classification_report, confusion_matrix,
roc_auc_score, f1_score
)
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.calibration import CalibratedClassifierCV
from imblearn.over_sampling import SMOTE
import xgboost as xgb
import difflib
from urllib.parse import urlparse
print("βœ… Semua library berhasil diimport!")
# ## πŸ—„οΈ CELL 2 β€” Load & Gabungkan Multi-Dataset
# ============================================================
# CELL 2 β€” Load multi-dataset dari Hugging Face
# ============================================================
all_texts = []
all_labels = []
# --- Dataset 1: Spam Detection ---
try:
print("πŸ“₯ [1/4] Mengunduh dataset spam detection...")
ds1 = load_dataset("Deysi/spam-detection-dataset", split="train")
df1 = pd.DataFrame(ds1)
col_text = 'text' if 'text' in df1.columns else df1.columns[0]
col_label = 'label' if 'label' in df1.columns else df1.columns[1]
df1 = df1[[col_text, col_label]].rename(columns={col_text: 'text', col_label: 'label'})
df1['label'] = df1['label'].apply(lambda x: 1 if str(x).lower() in ['spam', '1', 'true'] else 0)
all_texts.extend(df1['text'].tolist())
all_labels.extend(df1['label'].tolist())
print(f" βœ… Dataset 1 berhasil: {len(df1)} baris (spam: {df1['label'].sum()}, ham: {(df1['label']==0).sum()})")
except Exception as e:
print(f" ⚠️ Dataset 1 gagal: {e}")
# --- Dataset 2: SMS Spam Collection ---
try:
print("πŸ“₯ [2/4] Mengunduh SMS spam collection...")
ds2 = load_dataset("ucirvine/sms_spam", split="train")
df2 = pd.DataFrame(ds2)
col_text = 'sms' if 'sms' in df2.columns else df2.columns[1]
col_label = 'label' if 'label' in df2.columns else df2.columns[0]
df2 = df2[[col_text, col_label]].rename(columns={col_text: 'text', col_label: 'label'})
df2['label'] = df2['label'].apply(lambda x: 1 if str(x).lower() in ['spam', '1', 'true'] else 0)
all_texts.extend(df2['text'].tolist())
all_labels.extend(df2['label'].tolist())
print(f" βœ… Dataset 2 berhasil: {len(df2)} baris")
except Exception as e:
print(f" ⚠️ Dataset 2 gagal: {e}")
# --- Dataset 3: Enron Email (Phishing/Spam) ---
try:
print("πŸ“₯ [3/4] Mengunduh Enron email dataset...")
ds3 = load_dataset("SetFit/enron_spam", split="train")
df3 = pd.DataFrame(ds3)
if 'subject' in df3.columns and 'message' in df3.columns:
df3['text'] = df3['subject'].fillna('') + ' ' + df3['message'].fillna('')
elif 'text' in df3.columns:
pass
else:
df3['text'] = df3.iloc[:, 0]
col_label = 'label' if 'label' in df3.columns else 'spam'
df3 = df3[['text', col_label]].rename(columns={col_label: 'label'})
df3['label'] = df3['label'].apply(lambda x: 1 if str(x).lower() in ['spam', '1', 'true'] else 0)
df3 = df3.sample(min(5000, len(df3)), random_state=42) # batasi agar seimbang
all_texts.extend(df3['text'].tolist())
all_labels.extend(df3['label'].tolist())
print(f" βœ… Dataset 3 berhasil: {len(df3)} baris")
except Exception as e:
print(f" ⚠️ Dataset 3 gagal: {e}")
# --- Dataset 4: Indonesian SMS Spam (Dari Gist GitHub) ---
try:
print("πŸ“₯ [4/4] Mengunduh dataset Spam/Scam Bahasa Indonesia...")
url_indo = "https://gist.githubusercontent.com/Xnuvers007/ba91613fe98deb8d09bb0abdfb17ef88/raw/8bb5c883e8c0e589efe359f6342e25b5754ff5c9/sms_spam_indo.csv"
df4 = pd.read_csv(url_indo)
df4 = df4[['Pesan', 'Kategori']].rename(columns={'Pesan': 'text'})
df4['label'] = df4['Kategori'].apply(lambda x: 1 if str(x).strip().lower() == 'spam' else 0)
all_texts.extend(df4['text'].tolist())
all_labels.extend(df4['label'].tolist())
print(f" βœ… Dataset 4 berhasil: {len(df4)} baris (Scam lokal berhasil ditambahkan!)")
except Exception as e:
print(f" ⚠️ Dataset 4 gagal: {e}")
# --- Gabungkan semua ---
df_all = pd.DataFrame({'text': all_texts, 'label': all_labels})
df_all = df_all.dropna(subset=['text'])
df_all['text'] = df_all['text'].astype(str)
print(f"\nπŸ“Š TOTAL DATASET GABUNGAN: {len(df_all)} baris")
print(f" πŸ”΄ SCAM/SPAM : {df_all['label'].sum()} ({df_all['label'].mean()*100:.1f}%)")
print(f" 🟒 AMAN/HAM : {(df_all['label']==0).sum()} ({(1-df_all['label'].mean())*100:.1f}%)")
# ## βš™οΈ CELL 3 β€” Feature Engineering Khusus Scam
# ============================================================
# CELL 3 β€” Feature Engineering: ekstrak sinyal scam secara manual
# ============================================================
class ScamFeatureExtractor(BaseEstimator, TransformerMixin):
"""
Custom transformer yang mengekstrak 30+ fitur numerik khusus scam:
- URL & link berbahaya
- Kata urgensi & ancaman
- Remote access tools
- Pola penipuan finansial
- Statistik teks mencurigakan
"""
# Kata-kata sinyal scam (dikelompokkan per kategori)
URGENCY_WORDS = [
'urgent', 'immediately', 'asap', 'right now', 'limited time',
'expires', 'deadline', 'act now', 'don\'t wait', 'hurry',
'segera', 'sekarang juga', 'cepat', 'batas waktu', 'darurat'
]
THREAT_WORDS = [
'suspended', 'blocked', 'terminated', 'banned', 'closed',
'account locked', 'verify now', 'confirm identity', 'suspension',
'diblokir', 'ditangguhkan', 'verifikasi', 'konfirmasi'
]
MONEY_WORDS = [
'prize', 'winner', 'won', 'lottery', 'jackpot', 'free money',
'cash', 'reward', 'bonus', 'gift card', 'bitcoin', 'crypto',
'transfer', 'wire', 'western union', 'moneygram', 'bank account',
'hadiah', 'menang', 'gratis', 'uang', 'transfer', 'rekening'
]
REMOTE_ACCESS = [
'anydesk', 'teamviewer', 'remote', 'screen share', 'remote access',
'install', 'download app', 'access your computer', 'take control',
'remote desktop', 'vnc', 'rustdesk', 'ultraviewer'
]
PHISHING_WORDS = [
'click here', 'login', 'password', 'username', 'credential',
'sign in', 'verify your', 'update your', 'confirm your',
'account information', 'billing info', 'credit card', 'cvv',
'social security', 'ssn', 'otp', 'pin', 'kode otp'
]
IMPERSONATION = [
'amazon', 'paypal', 'apple', 'microsoft', 'google', 'facebook',
'instagram', 'netflix', 'bank', 'irs', 'government', 'police',
'bri', 'bca', 'mandiri', 'ojk', 'polisi', 'pemerintah', 'ecommerce', 'belanja'
]
TARGET_DOMAINS = [
'paypal.com', 'google.com', 'apple.com', 'microsoft.com',
'facebook.com', 'bca.co.id', 'bri.co.id', 'mandiri.co.id',
'klikbca.com', 'tokopedia.com', 'shopee.co.id', 'gojek.com',
'lazada.com','lazada.co.id', 'instagram.com', 'twitter.com', 'x.com', 'linkedin.com',
'fb.com', 'blogger.com', 'youtube.com', 'youtu.be','wordpress.com','apple.com',
'wordpress.org', 'googleusercontent.com', 'whatsapp.com', 'play.google.com', 'support.google.com',
'policies.google.com', 'cloudflare.com', 'docs.google.com', 'en.wikipedia.org', 'drive.google.com',
'tiktok.com', 'maps.google.com', 't.me', 'bp.blogspot.com', 'accounts.google.com', 'wa.me', 'europa.eu',
'plus.google.com', 'mozilla.org', 'sites.google.com', 'istockphoto.com', 'facebook.com', 'pt.wikipedia.org',
'vk.com', 'es.wikipedia.org', 'vimeo.com', 'adobe.com', 'weebly.com', 'github.com', 'globo.com', 'forms.gle',
'wikimedia.org', 'afternic.com', 'google.com.br', 'mediafire.com', 'news.google.com', 'yahoo.com', 'jimdofree.com',
'mail.ru', 'files.wordpress.com', 'medium.com', 'who.int', 'opera.com', 'gravatar.com', 'dropbox.com', 'dailymotion.com',
'amazon.com', 'cpanel.net', 'tools.google.com', 'google.es', 'draft.blogger.com', 'uol.com.br', 'bbc.co.uk', 'ok.ru', 'abril.com.br',
'netvibes.com', 'nih.gov', 'nytimes.com', 'cnn.com', 'developers.google.com', 'fr.wikipedia.org', 'google.de', 'paypal.com', 'shopify.com',
'feedburner.com', 'imdb.com', 'gstatic.com', 'googleblog.com', 'myspace.com', 'goo.gl', 'brandbucket.com', 'line.me', 'live.com', 'foxnews.com',
'oracle.com', 'get.google.com', 'amazon.co.uk', 'picasaweb.google.com', '4shared.com', 'ft.com', 'twitch.tv', 'gov.uk', 'huffingtonpost.com',
'ytimg.com', 'namebright.com', 'businessinsider.com', 'slideshare.net', 'issuu.com', 'nature.com', 'nicsell.com',
'domainmarket.com', 'cdc.gov', 'ig.com.br', 'tinyurl.com', 'hugedomains.com', 'dailymail.co.uk', 'estadao.com.br', 'expireddomains.com',
'messenger.com', 'aliexpress.com', 'independent.co.uk', 'discord.com', 'pixabay.com', 'instagram.com', 'usatoday.com', 'photos.google.com',
'researchgate.net', 'theguardian.com', 'wikia.com', 'scribd.com', 'storage.googleapis.com', 'google.it', 'telegram.me', 'correios.com.br', 'archive.org',
'washingtonpost.com', 'bloomberg.com', 'google.fr', 'fandom.com', 'bbc.com', 'linktr.ee', 'myaccount.google.com', 'buydomains.com', 'google.co.jp',
'msn.com', 'wiley.com', 'dan.com', 'amazon.co.jp', 'list-manage.com', 'webmd.com', 'indiatimes.com', 'nginx.com', 'fb.com', 'google.co.uk', 'wix.com',
'un.org', 'forbes.com', 'thesun.co.uk', '3ds.com', 'adssettings.google.com', 'dropcatch.com', 'mail.google.com', 'hatena.ne.jp', 'w3.org', 'plesk.com',
'spotify.com', 'mirror.co.uk', 'telegraph.co.uk', 'youronlinechoices.com', 'marketingplatform.google.com', 'typepad.com', 'news.yahoo.com', 'nginx.org',
'bit.ly', 't.co', 'booking.com', 'terra.com.br', 'huffpost.com', 'pinterest.com', 'reuters.com', 'wsj.com', 'creativecommons.org', 'office.com',
'ovhcloud.com', 'time.com', 'sedo.com', 'ru.wikipedia.org', 'wp.com', 'aboutads.info', 'huawei.com', 'planalto.gov.br', 'elpais.com', 'gov.br',
'de.wikipedia.org', 'enable-javascript.com', 'ibm.com', 'techcrunch.com', 'nhk.or.jp', 'ebay.com', 'zoom.us', 'lemonde.fr', 'buzzfeed.com',
'home.pl', 'photos1.blogger.com', 'britannica.com', 'secureserver.net', 'hp.com', 'imageshack.us', 'newsweek.com', 'amazon.es', 'economist.com',
'nasa.gov', 'livejournal.com', 'tmz.com', 'amzn.to', 'example.com', 'yahoo.co.jp', 'akamaihd.net', 'addthis.com', 'perfectdomain.com', 'm.wikipedia.org',
'bandcamp.com', 'ssl-images-amazon.com', 'latimes.com', 'steampowered.com', 'liveinternet.ru', 'change.org', 'walmart.com', 'ign.com',
'instructables.com', 'ouest-france.fr', 'cointernet.com.co', 'abc.net.au', 'hotmart.com', 'npr.org', 'dreamstime.com', 'groups.google.com',
'calameo.com', 'kickstarter.com', 'ovh.com', 'clickbank.net', 'hollywoodreporter.com', 'trustpilot.com', 'guardian.co.uk', 'samsung.com',
'francetvinfo.fr', 'canva.com', 'cnet.com', 'as.com', 'berkeley.edu', 'cbsnews.com', 'playstation.com', 'namecheap.com', 'google.nl', 'plos.org',
'thenai.org', 'networkadvertising.org', 'lin.ee', 'ted.com', 'yelp.com', 'amazon.fr', 'search.yahoo.com', 'discord.gg', 'news.com.au', 'disqus.com',
'loc.gov', 'my.yahoo.com', 'php.net', 'id.wikipedia.org', 'rakuten.co.jp', 'bloglovin.com', 'it.wikipedia.org', 'telegram.org', 'g.page', 'ipv4.google.com',
'books.google.com', 'netflix.com', 'leparisien.fr', 'ja.wikipedia.org', 'express.co.uk', 'g.co', 'privacyshield.gov', 'ggpht.com', 'themeforest.net',
'yandex.ru', 'picasa.google.com', 'abcnews.go.com', 'dw.com', 'lefigaro.fr', 'zippyshare.com', 'detik.com', 'nydailynews.com', 'sagepub.com', 'mega.nz',
't-online.de', 'unesco.org', 'arxiv.org', 'mystrikingly.com', 'deezer.com', 'pexels.com', 'addtoany.com', 'code.google.com', 'shutterstock.com',
'unsplash.com', 'outlook.com', 'dailystar.co.uk', 'sky.com', 'abc.es', 'pl.wikipedia.org', 'psychologytoday.com', 'quora.com', 'gizmodo.com', 'weibo.com',
'business.google.com', 'workspace.google.com', 'skype.com', 'cpanel.com', 'gofundme.com', 'rtve.es', 'welt.de', 'cornell.edu', 'pbs.org', 'eventbrite.com',
'nypost.com', 'hubspot.com', 'tripadvisor.com', 'timeweb.ru', 'wikihow.com', 'stanford.edu', 'rambler.ru', 'soundcloud.com', 'google.pl', 'mozilla.com',
'cnil.fr', 'rt.com', 'bing.com', 'google.ru', 'sakura.ne.jp', 'metro.co.uk', 'android.com', 'safety.google', 'cnbc.com', 'academia.edu', 'godaddy.com',
'nbcnews.com', 'apache.org', 'lavanguardia.com', 'offset.com', 'surveymonkey.com', 'springer.com', 'netlify.app', 'sapo.pt', 'amazon.de', 'gmail.com',
'sendspace.com', 'cambridge.org', 'redbull.com', 'taringa.net', 'ikea.com', 'qq.com', 'thetimes.co.uk', 'wiktionary.org', 'vistaprint.com',
'zendesk.com', 'amazonaws.com', 'aol.com', 'wikipedia.org', 'engadget.com', 'translate.google.com', 'firefox.com', 'photobucket.com', 'cbc.ca',
'behance.net', 'ameblo.jp', 'sciencedirect.com', 'nationalgeographic.com', 'spiegel.de', '20minutos.es', 'mashable.com', 'mit.edu',
'rapidshare.com', 'wired.com', 'dovendi.com', 'icann.org', 'doi.org', 'alicdn.com', 'harvard.edu', 'espn.com', 'finance.yahoo.com',
'marca.com', 'nintendo.com', 'ziddu.com', 'hindustantimes.com', 'statista.com', 'amazon.it', 'elmundo.es', 'goodreads.com', 'doubleclick.net',
'variety.com', 'sciencedaily.com', 'insider.com', 'theverge.com', 'clarin.com', 'naver.com', 'theatlantic.com', 'about.com', 'sputniknews.com',
'yadi.sk', 'cutt.ly', 'telegra.ph', 'yandex.com', 'reg.ru', 'oup.com', 'franceinfo.fr', 'google.ca', 'corriere.it', 'airbnb.com', 'pages.dev',
'strato-hosting.eu', 'slate.com', 'google.co.id', 'pornhub.com', 'thestar.com', 'over-blog.com', 'kotaku.com', 'bp1.blogger.com', 'kompas.com',
'onamae.com', 'getbootstrap.com', 'barnesandnoble.com', 'openai.com', 'orange.fr', 'focus.de', 'worldbank.org', 'scholar.google.com', 'hatena.blog',
'onelink.me', 'natro.com', '123rf.com', 'oecd.org', 'bfmtv.com', 'e-monsite.com', 'wallpapers.com', 'mailchi.mp', 'usgs.gov', 'politico.com', 'pnas.org',
'xinhuanet.com', 'substack.com', 't.ly', 'weforum.org', 'thefreedictionary.com', 'amazon.ca', 'dell.com', 'house.gov', 'xbox.com', 'giphy.com',
'hoax.com', 'itch.io', 'm.me', 'ebay.co.uk', 'fifa.com', 'washington.edu', 'geocities.com', 'fbsbx.com', 'bild.de', 'ndtv.com', 'hilton.com',
'udemy.com', 'asus.com', 'greenpeace.org', 'search.google.com', 'merriam-webster.com', 'prtimes.jp', 'target.com', 'fortune.com', 'ca.gov',
'sfgate.com', 'legifrance.gouv.fr', 'news.livejournal.com', 'lycos.com', 'dribbble.com', 'utexas.edu', 'wetransfer.com', 'automattic.com',
'rollingstone.com', 'newyorker.com', 'canada.ca', 'nymag.com', 'epa.gov', 'thedailybeast.com', 'ea.com', 'xing.com', 'usda.gov', 'allaboutcookies.org',
'pcmag.com', 'mdpi.com', 'zdnet.com', 'evernote.com', 'history.com', 'box.com', 'public-api.wordpress.com', 'answers.com', 'nikkei.com',
'vice.com', 'ox.ac.uk', 'impress.co.jp', 'bp0.blogger.com', 'cbslocal.com', 'ads.google.com', 'intel.com','shopee.co.id','bukalapak.com','akulaku.com'
]
def fit(self, X, y=None):
return self
def _count_matches(self, text, word_list):
text_lower = text.lower()
return sum(1 for w in word_list if w in text_lower)
def transform(self, X):
features = []
for text in X:
text = str(text)
t = text.lower()
# --- Sinyal kata kunci ---
f_urgency = self._count_matches(t, self.URGENCY_WORDS)
f_threat = self._count_matches(t, self.THREAT_WORDS)
f_money = self._count_matches(t, self.MONEY_WORDS)
f_remote = self._count_matches(t, self.REMOTE_ACCESS)
f_phishing = self._count_matches(t, self.PHISHING_WORDS)
f_imperson = self._count_matches(t, self.IMPERSONATION)
# --- Sinyal URL & Link ---
# f_url_count = len(re.findall(r'http[s]?://', t))
# f_short_url = len(re.findall(r'(bit\.ly|tinyurl|goo\.gl|t\.co|ow\.ly|buff\.ly)', t))
# f_has_ip_url = int(bool(re.search(r'http[s]?://\d+\.\d+\.\d+\.\d+', t)))
# f_suspicious_tld = len(re.findall(r'\.(xyz|top|win|click|download|review|loan|work|party)', t))
# --- Sinyal URL & Link ---
# --- Sinyal URL & Link ---
urls = re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', t)
f_url_count = len(urls)
f_short_url = len(re.findall(r'(bit\.ly|tinyurl|goo\.gl|t\.co|ow\.ly|buff\.ly|s\.id|shorturl\.at|cutt\.ly)', t))
f_has_ip_url = int(bool(re.search(r'http[s]?://\d+\.\d+\.\d+\.\d+', t)))
f_suspicious_tld = len(re.findall(r'\.(xyz|top|win|click|download|review|loan|work|party|cc|biz|info)', t))
# Logika Typosquatting & Subdomain Spoofing (ULTRA STRICT)
f_typosquat = 0
for url in urls:
try:
import tldextract
ext = tldextract.extract(url)
# root_domain = "google.com" atau "bca.co.id"
root_domain = f"{ext.domain}.{ext.suffix}".lower()
# full_domain = "mail.google.com" atau "bca.co.id.scam.net"
full_domain = f"{ext.subdomain}.{ext.domain}.{ext.suffix}".strip('.').lower()
for brand in self.TARGET_DOMAINS:
nama_brand = brand.split('.')[0] # Ambil 'bca' dari 'bca.co.id'
# 1. CEK AMAN: Apakah root domain-nya sama persis dengan brand asli?
if root_domain == brand:
# Jika ya, berarti ini aman (bahkan jika ada subdomain resminya seperti klik.bca.co.id)
continue
# 2. CEK IMPERSONATION / SUBDOMAIN SPOOFING
# Jika root domain-nya BEDA, tapi ada nama brand nangkring di dalam URL-nya
# Contoh bahaya: bca.co.id.scam.com (mengandung 'bca')
# Contoh bahaya: login-paypal-update.com (mengandung 'paypal')
if nama_brand in full_domain:
f_typosquat += 1
break # Langsung vonis scam, lanjut ke URL berikutnya
# 3. CEK TYPOSQUATTING (Salah ketik / mirip)
# Contoh bahaya: paypaI.com (mirip paypal.com)
kemiripan = difflib.SequenceMatcher(None, root_domain, brand).ratio()
if 0.80 < kemiripan < 1.0:
f_typosquat += 1
break
except:
pass
# --- Sinyal tipografi mencurigakan ---
f_caps_ratio = sum(1 for c in text if c.isupper()) / max(len(text), 1)
f_excl_count = text.count('!')
f_dollar = text.count('$') + text.count('€') + text.count('Β£')
f_has_phone = int(bool(re.search(r'(\+62|\+1|\+44)?[\s.-]?\(?\d{3}\)?[\s.-]?\d{3,4}[\s.-]?\d{4}', text)))
f_num_count = len(re.findall(r'\b\d{4,}\b', text)) # angka panjang (nomor rekening, dsb)
# --- Statistik teks ---
words = t.split()
f_text_len = len(text)
f_word_count = len(words)
f_avg_word_len = np.mean([len(w) for w in words]) if words else 0
f_unique_ratio = len(set(words)) / max(len(words), 1)
# --- Skor agregat ---
f_total_danger = f_urgency + f_threat * 2 + f_money + f_remote * 3 + f_phishing * 2 + (f_typosquat * 5)
features.append([
f_urgency, f_threat, f_money, f_remote, f_phishing, f_typosquat, f_imperson,
f_url_count, f_short_url, f_has_ip_url, f_suspicious_tld,
f_caps_ratio, f_excl_count, f_dollar, f_has_phone, f_num_count,
f_text_len, f_word_count, f_avg_word_len, f_unique_ratio,
f_total_danger
])
return np.array(features)
print("βœ… ScamFeatureExtractor siap digunakan!")
print(f" πŸ“ Jumlah fitur manual: 20 fitur numerik")
# Preview fitur pada contoh pesan
test_msg = ["Install Anydesk for me to access remotely. You should pass the video/ID verification to avoid suspension."]
extractor = ScamFeatureExtractor()
feat = extractor.transform(test_msg)[0]
nama_fitur = ['urgency','threat','money','remote','phishing','impersonation',
'url_count','short_url','ip_url','bad_tld',
'caps_ratio','exclamation','currency_symbol','phone_number','long_numbers',
'text_length','word_count','avg_word_len','unique_word_ratio','danger_score']
print("\nπŸ” Preview fitur untuk pesan uji:")
for name, val in zip(nama_fitur, feat):
if val > 0:
print(f" ⚠️ {name:25s}: {val:.3f}")
# ## πŸ€– CELL 4 β€” Bangun & Latih Ensemble Model
# ============================================================
# CELL 4 β€” Buat Ensemble Model Powerfull
# ============================================================
class TextSelector(BaseEstimator, TransformerMixin):
"""Selector untuk kolom teks dari DataFrame."""
def fit(self, X, y=None): return self
def transform(self, X):
return X if isinstance(X, (list, np.ndarray)) else X.tolist()
# --- Preprocessing teks ---
def preprocess_text(text):
text = str(text).lower()
text = re.sub(r'<[^>]+>', ' ', text) # hapus HTML tags
text = re.sub(r'http\S+|www\S+', ' URL ', text) # ganti URL dengan token
text = re.sub(r'\b\d{10,}\b', ' LONGNUM ', text) # angka panjang
text = re.sub(r'[^\w\s!?$]', ' ', text) # simpan tanda ekspresi
text = re.sub(r'\s+', ' ', text).strip()
return text
print("πŸ”§ Mempersiapkan data...")
df_all['text_clean'] = df_all['text'].apply(preprocess_text)
X = df_all['text_clean'].values
y = df_all['label'].values
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f" Train: {len(X_train)} | Test: {len(X_test)}")
# --- Komponen TF-IDF (2 level: unigram + bigram) ---
tfidf_word = TfidfVectorizer(
analyzer='word',
ngram_range=(1, 2), # unigram + bigram
max_features=50000,
min_df=2,
max_df=0.95,
sublinear_tf=True, # log normalisasi
strip_accents='unicode'
)
tfidf_char = TfidfVectorizer(
analyzer='char_wb',
ngram_range=(3, 5), # karakter n-gram (menangkap typo scammer)
max_features=20000,
min_df=3,
sublinear_tf=True
)
# --- Bangun fitur gabungan ---
from scipy.sparse import hstack
print("πŸ“Š Membangun matriks fitur TF-IDF...")
X_train_w = tfidf_word.fit_transform(X_train)
X_test_w = tfidf_word.transform(X_test)
X_train_c = tfidf_char.fit_transform(X_train)
X_test_c = tfidf_char.transform(X_test)
# Ekstrak fitur manual
feat_extractor = ScamFeatureExtractor()
X_train_f = feat_extractor.transform(X_train)
X_test_f = feat_extractor.transform(X_test)
# Gabungkan semua fitur
from scipy.sparse import csr_matrix
X_train_combined = hstack([X_train_w, X_train_c, csr_matrix(X_train_f)])
X_test_combined = hstack([X_test_w, X_test_c, csr_matrix(X_test_f)])
print(f" βœ… Dimensi fitur gabungan: {X_train_combined.shape[1]:,} kolom")
# ==========================================
# βš–οΈ PROSES SMOTE (Penyeimbang Data)
# ==========================================
print("βš–οΈ Menyeimbangkan data minoritas dengan SMOTE...")
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train_combined, y_train)
print(f" βœ… Dimensi sebelum SMOTE : {X_train_combined.shape[0]} baris")
print(f" βœ… Dimensi sesudah SMOTE : {X_train_resampled.shape[0]} baris")
# ==========================================
# --- Definisi Model ---
print("\nπŸ—οΈ Membangun model-model...")
clf_lr = LogisticRegression(
C=5.0, max_iter=1000, solver='lbfgs',
class_weight='balanced', random_state=42
)
clf_svm = CalibratedClassifierCV(
LinearSVC(C=1.0, max_iter=2000, class_weight='balanced', random_state=42)
)
clf_xgb = xgb.XGBClassifier(
n_estimators=300, max_depth=6,
learning_rate=0.1, subsample=0.8,
colsample_bytree=0.8,
scale_pos_weight=len(y_train[y_train==0]) / max(len(y_train[y_train==1]), 1),
use_label_encoder=False, eval_metric='logloss',
random_state=42, n_jobs=-1
)
# Latih masing-masing model
print("⏳ Melatih Logistic Regression...")
clf_lr.fit(X_train_resampled, y_train_resampled)
print("⏳ Melatih SVM (dikalibrasi)...")
clf_svm.fit(X_train_resampled, y_train_resampled)
print("⏳ Melatih XGBoost...")
clf_xgb.fit(X_train_resampled, y_train_resampled)
print("\nβœ… Semua model selesai dilatih!")
# ## πŸ“Š CELL 5 β€” Evaluasi Lengkap
# ============================================================
# CELL 5 β€” Evaluasi detail semua model
# ============================================================
def evaluate_model(name, clf, X_tr, X_te, y_te):
y_pred = clf.predict(X_te)
y_prob = clf.predict_proba(X_te)[:, 1]
acc = accuracy_score(y_te, y_pred)
f1 = f1_score(y_te, y_pred, average='weighted')
auc = roc_auc_score(y_te, y_prob)
print(f"\n{'='*55}")
print(f" Model: {name}")
print(f"{'='*55}")
print(f" Akurasi : {acc*100:.2f}%")
print(f" F1-Score : {f1*100:.2f}%")
print(f" AUC-ROC : {auc*100:.2f}%")
print(f"\n{classification_report(y_te, y_pred, target_names=['HAM (aman)', 'SPAM/SCAM'])}")
cm = confusion_matrix(y_te, y_pred)
print(f" Confusion Matrix:")
print(f" Prediksi HAM Prediksi SCAM")
print(f" Asli HAM {cm[0,0]:6d} {cm[0,1]:6d}")
print(f" Asli SCAM {cm[1,0]:6d} {cm[1,1]:6d}")
return acc, f1, auc
results = {}
results['Logistic Regression'] = evaluate_model('Logistic Regression', clf_lr, X_train_combined, X_test_combined, y_test)
results['SVM (Calibrated)'] = evaluate_model('SVM (Calibrated)', clf_svm, X_train_combined, X_test_combined, y_test)
results['XGBoost'] = evaluate_model('XGBoost', clf_xgb, X_train_combined, X_test_combined, y_test)
print("\n" + "="*55)
print(" RINGKASAN PERBANDINGAN MODEL")
print("="*55)
print(f" {'Model':<25} {'Akurasi':>8} {'F1':>8} {'AUC':>8}")
print("-"*55)
for name, (acc, f1, auc) in results.items():
print(f" {name:<25} {acc*100:>7.2f}% {f1*100:>7.2f}% {auc*100:>7.2f}%")
# ## πŸ† CELL 6 β€” Voting Ensemble (Model Terbaik)
# ============================================================
# CELL 6 β€” Gabungkan semua model jadi Voting Ensemble
# ============================================================
print("πŸ—³οΈ Membangun Voting Ensemble (soft voting)...")
class EnsembleScamDetector:
"""
Ensemble final: rata-rata probabilitas dari LR + SVM + XGBoost
dengan bobot: LR=0.3, SVM=0.3, XGBoost=0.4
"""
def __init__(self, lr, svm, xgb_model, weights=(0.3, 0.3, 0.4)):
self.lr = lr
self.svm = svm
self.xgb = xgb_model
self.weights = weights
def predict_proba(self, X):
p_lr = self.lr.predict_proba(X)
p_svm = self.svm.predict_proba(X)
p_xgb = self.xgb.predict_proba(X)
w = self.weights
return w[0]*p_lr + w[1]*p_svm + w[2]*p_xgb
def predict(self, X, threshold=0.5):
proba = self.predict_proba(X)
return (proba[:, 1] >= threshold).astype(int)
def classes_(self):
return np.array([0, 1])
ensemble = EnsembleScamDetector(clf_lr, clf_svm, clf_xgb)
# Evaluasi ensemble
y_pred_ens = ensemble.predict(X_test_combined)
y_prob_ens = ensemble.predict_proba(X_test_combined)[:, 1]
acc_ens = accuracy_score(y_test, y_pred_ens)
f1_ens = f1_score(y_test, y_pred_ens, average='weighted')
auc_ens = roc_auc_score(y_test, y_prob_ens)
print("\n" + "="*55)
print(" πŸ† HASIL VOTING ENSEMBLE")
print("="*55)
print(f" Akurasi : {acc_ens*100:.2f}%")
print(f" F1-Score : {f1_ens*100:.2f}%")
print(f" AUC-ROC : {auc_ens*100:.2f}%")
print(f"\n{classification_report(y_test, y_pred_ens, target_names=['HAM (aman)', 'SPAM/SCAM'])}")
# Pilih model terbaik secara otomatis
best_acc = max(acc_ens, results['Logistic Regression'][0], results['SVM (Calibrated)'][0], results['XGBoost'][0])
if acc_ens == best_acc:
print("βœ… Menggunakan VOTING ENSEMBLE sebagai model final")
FINAL_MODEL = 'ensemble'
else:
print(f"ℹ️ Menggunakan model individual terbaik")
# ## πŸ” CELL 7 β€” Fungsi Deteksi Interaktif
# ============================================================
# CELL 7 β€” Fungsi deteksi pesan dengan laporan lengkap
# ============================================================
def detect_scam(pesan, threshold=0.45):
"""
Deteksi apakah sebuah pesan adalah SCAM/SPAM atau AMAN.
Parameters:
pesan : str β€” pesan yang ingin dideteksi
threshold : float β€” ambang batas (default 0.45, lebih sensitif dari 0.5)
Returns:
dict dengan hasil deteksi lengkap
"""
pesan_clean = preprocess_text(pesan)
X_w = tfidf_word.transform([pesan_clean])
X_c = tfidf_char.transform([pesan_clean])
X_f = csr_matrix(feat_extractor.transform([pesan_clean]))
X_combined = hstack([X_w, X_c, X_f])
proba = ensemble.predict_proba(X_combined)[0]
p_aman = proba[0]
p_scam = proba[1]
is_scam = p_scam >= threshold
# Analisis fitur manual
raw_feat = feat_extractor.transform([pesan])[0]
nama_fitur = ['urgency','threat','money','remote_access','phishing',
'impersonation','url_count','short_url','ip_url','bad_tld',
'caps_ratio','exclamation','currency','phone','long_numbers',
'text_length','word_count','avg_word_len','unique_ratio','danger_score']
sinyal_aktif = [(n, v) for n, v in zip(nama_fitur, raw_feat) if v > 0 and n not in
['text_length', 'word_count', 'avg_word_len', 'unique_ratio', 'caps_ratio']]
# Risk level
if p_scam >= 0.85:
risk_level = "πŸ”΄ SANGAT TINGGI"
elif p_scam >= 0.65:
risk_level = "🟠 TINGGI"
elif p_scam >= 0.45:
risk_level = "🟑 SEDANG"
elif p_scam >= 0.25:
risk_level = "πŸ”΅ RENDAH"
else:
risk_level = "🟒 SANGAT RENDAH"
# Cetak laporan
print("\n" + "═"*60)
print(" πŸ›‘οΈ LAPORAN DETEKSI SCAM/PHISHING")
print("═"*60)
print(f" Pesan : {pesan[:100]}{'...' if len(pesan)>100 else ''}")
print(f" Panjang : {len(pesan)} karakter | {len(pesan.split())} kata")
print("─"*60)
print(f" KEPUTUSAN : {'⚠️ SCAM / SPAM / PHISHING' if is_scam else 'βœ… PESAN AMAN (HAM)'}")
print(f" Risk Level: {risk_level}")
print("─"*60)
print(f" Probabilitas SCAM : {p_scam*100:>6.2f}% {'β–ˆ' * int(p_scam*30)}")
print(f" Probabilitas AMAN : {p_aman*100:>6.2f}% {'β–ˆ' * int(p_aman*30)}")
print(f" Threshold : {threshold*100:.0f}%")
print("─"*60)
if sinyal_aktif:
print(" ⚠️ SINYAL BAHAYA YANG TERDETEKSI:")
for nama, val in sinyal_aktif:
label = nama.replace('_', ' ').upper()
print(f" β€’ {label:<25}: {val:.2f}")
else:
print(" βœ… Tidak ada sinyal bahaya yang terdeteksi")
print("═"*60)
return {
'is_scam': bool(is_scam),
'probability_scam': float(p_scam),
'probability_safe': float(p_aman),
'risk_level': risk_level,
'signals': sinyal_aktif
}
print("βœ… Fungsi detect_scam() siap digunakan!")
print("\nCara pakai: detect_scam('isi pesan kamu di sini')")
# ## πŸ§ͺ CELL 8 β€” Uji Coba dengan Berbagai Pesan
# ============================================================
# CELL 8 β€” Uji dengan berbagai jenis pesan
# ============================================================
pesan_uji = [
# --- SCAM CASES ---
"Install Anydesk for me to access remotely. You should be able to pass the video/ID verification to avoid suspension.",
"URGENT: Your account has been suspended. Click here to verify your identity immediately or your account will be terminated.",
"Congratulations! You have won a $5,000 prize. Click http://bit.ly/claim-prize to claim your reward now!",
"Halo, kami dari tim keamanan BCA. Mohon konfirmasi nomor rekening dan PIN Anda untuk menghindari pemblokiran akun.",
"Download TeamViewer and give me remote access to fix your computer problem ASAP. Limited time offer!",
"Dear customer, your PayPal account has been limited. Please login at http://paypal-verify.xyz to restore access.",
# --- SAFE CASES ---
"Hey! Just wanted to check if you're coming to the team lunch tomorrow at 12pm? Let me know!",
"Reminder: Your dentist appointment is scheduled for next Tuesday at 3:00 PM. Please confirm your attendance.",
"Thank you for your order #12345. Your package will be delivered within 3-5 business days.",
"Hi, selamat pagi! Besok ada rapat tim jam 9. Tolong siapkan laporan mingguan kamu ya.",
]
for i, pesan in enumerate(pesan_uji, 1):
print(f"\n{'#'*60}")
print(f" PESAN KE-{i}")
detect_scam(pesan)
input_cont = '' # non-interactive loop
# ## πŸ’¬ CELL 9 β€” Mode Interaktif (Input Manual)
# ============================================================
# CELL 9 β€” Masukkan pesan kamu sendiri untuk dicek!
# ============================================================
print("πŸ›‘οΈ SCAM DETECTOR β€” Mode Interaktif")
print("Ketik pesan yang ingin kamu cek. Ketik 'keluar' untuk berhenti.\n")
while True:
try:
pesan = input("πŸ“© Masukkan pesan: ").strip()
if pesan.lower() in ['keluar', 'exit', 'quit', 'q', '']:
print("πŸ‘‹ Terima kasih telah menggunakan Scam Detector!")
break
detect_scam(pesan)
except (KeyboardInterrupt, EOFError):
print("\nπŸ‘‹ Sesi selesai.")
break
# ## πŸ’Ύ CELL 10 (Opsional) β€” Simpan Model
# ============================================================
# CELL 10 β€” Simpan model ke Google Drive (opsional)
# ============================================================
import pickle, os
# Mount Google Drive dulu jika ingin menyimpan ke sana
# from google.colab import drive
# drive.mount('/content/drive')
# SAVE_PATH = '/content/drive/MyDrive/scam_detector/'
SAVE_PATH = '/content/scam_detector_model/'
os.makedirs(SAVE_PATH, exist_ok=True)
model_bundle = {
'tfidf_word': tfidf_word,
'tfidf_char': tfidf_char,
'feat_extractor': feat_extractor,
'clf_lr': clf_lr,
'clf_svm': clf_svm,
'clf_xgb': clf_xgb,
'ensemble': ensemble,
'metadata': {
'accuracy': acc_ens,
'f1': f1_ens,
'auc': auc_ens,
'train_size': len(X_train)
}
}
with open(SAVE_PATH + 'scam_detector.pkl', 'wb') as f:
pickle.dump(model_bundle, f)
print(f"βœ… Model disimpan ke: {SAVE_PATH}scam_detector.pkl")
print(f" Akurasi: {acc_ens*100:.2f}% | F1: {f1_ens*100:.2f}% | AUC: {auc_ens*100:.2f}%")
import os
import re
import pickle
import html
import importlib
from datetime import datetime
from dataclasses import dataclass
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, average_precision_score, f1_score, fbeta_score, confusion_matrix
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.calibration import CalibratedClassifierCV
from scipy.sparse import hstack, csr_matrix
import difflib
from urllib.parse import urlparse
datasets_spec = importlib.util.find_spec('datasets')
if datasets_spec is not None:
load_dataset = importlib.import_module('datasets').load_dataset
else:
load_dataset = None
tldextract_spec = importlib.util.find_spec('tldextract')
tldextract = importlib.import_module('tldextract') if tldextract_spec is not None else None
xgboost_spec = importlib.util.find_spec('xgboost')
xgb = importlib.import_module('xgboost') if xgboost_spec is not None else None
print("βœ… Semua library berhasil diimport!")
# ============================================================
# 1. DEFINISI KELAS & FUNGSI (Wajib di atas agar dikenali saat Load)
# ============================================================
class ScamFeatureExtractor(BaseEstimator, TransformerMixin):
URGENCY_WORDS = ['urgent', 'immediately', 'asap', 'right now', 'limited time', 'expires', 'deadline', 'act now', 'don\'t wait', 'hurry', 'segera', 'sekarang juga', 'cepat', 'batas waktu', 'darurat']
THREAT_WORDS = ['suspended', 'blocked', 'terminated', 'banned', 'closed', 'account locked', 'verify now', 'confirm identity', 'suspension', 'diblokir', 'ditangguhkan', 'verifikasi', 'konfirmasi']
MONEY_WORDS = ['prize', 'winner', 'won', 'lottery', 'jackpot', 'free money', 'cash', 'reward', 'bonus', 'gift card', 'bitcoin', 'crypto', 'transfer', 'wire', 'western union', 'moneygram', 'bank account', 'hadiah', 'menang', 'gratis', 'uang', 'transfer', 'rekening']
REMOTE_ACCESS = ['anydesk', 'teamviewer', 'remote', 'screen share', 'remote access', 'install', 'download app', 'access your computer', 'take control', 'remote desktop', 'vnc', 'rustdesk', 'ultraviewer']
PHISHING_WORDS = ['click here', 'login', 'password', 'username', 'credential', 'sign in', 'verify your', 'update your', 'confirm your', 'account information', 'billing info', 'credit card', 'cvv', 'social security', 'ssn', 'otp', 'pin', 'kode otp']
IMPERSONATION = ['amazon', 'paypal', 'apple', 'microsoft', 'google', 'facebook', 'instagram', 'netflix', 'bank', 'irs', 'government', 'police', 'bri', 'bca', 'mandiri', 'ojk', 'polisi', 'pemerintah', 'ecommerce', 'belanja']
TARGET_DOMAINS = ['paypal.com', 'google.com', 'apple.com', 'microsoft.com', 'facebook.com', 'bca.co.id', 'bri.co.id', 'mandiri.co.id', 'klikbca.com', 'tokopedia.com', 'shopee.co.id', 'gojek.com', 'lazada.com','lazada.co.id', 'instagram.com', 'twitter.com', 'x.com', 'linkedin.com', 'fb.com', 'blogger.com', 'youtube.com', 'youtu.be','wordpress.com','apple.com', 'wordpress.org', 'googleusercontent.com', 'whatsapp.com', 'play.google.com', 'support.google.com', 'policies.google.com', 'cloudflare.com', 'docs.google.com', 'en.wikipedia.org', 'drive.google.com', 'tiktok.com', 'maps.google.com', 't.me', 'bp.blogspot.com', 'accounts.google.com', 'wa.me', 'europa.eu', 'plus.google.com', 'mozilla.org', 'sites.google.com', 'istockphoto.com', 'facebook.com', 'pt.wikipedia.org', 'vk.com', 'es.wikipedia.org', 'vimeo.com', 'adobe.com', 'weebly.com', 'github.com', 'globo.com', 'forms.gle', 'wikimedia.org', 'afternic.com', 'google.com.br', 'mediafire.com', 'news.google.com', 'yahoo.com', 'jimdofree.com', 'mail.ru', 'files.wordpress.com', 'medium.com', 'who.int', 'opera.com', 'gravatar.com', 'dropbox.com', 'dailymotion.com', 'amazon.com', 'cpanel.net', 'tools.google.com', 'google.es', 'draft.blogger.com', 'uol.com.br', 'bbc.co.uk', 'ok.ru', 'abril.com.br', 'netvibes.com', 'nih.gov', 'nytimes.com', 'cnn.com', 'developers.google.com', 'fr.wikipedia.org', 'google.de', 'paypal.com', 'shopify.com', 'feedburner.com', 'imdb.com', 'gstatic.com', 'googleblog.com', 'myspace.com', 'goo.gl', 'brandbucket.com', 'line.me', 'live.com', 'foxnews.com', 'oracle.com', 'get.google.com', 'amazon.co.uk', 'picasaweb.google.com', '4shared.com', 'ft.com', 'twitch.tv', 'gov.uk', 'huffingtonpost.com', 'ytimg.com', 'namebright.com', 'businessinsider.com', 'slideshare.net', 'issuu.com', 'nature.com', 'nicsell.com', 'domainmarket.com', 'cdc.gov', 'ig.com.br', 'tinyurl.com', 'hugedomains.com', 'dailymail.co.uk', 'estadao.com.br', 'expireddomains.com', 'messenger.com', 'aliexpress.com', 'independent.co.uk', 'discord.com', 'pixabay.com', 'instagram.com', 'usatoday.com', 'photos.google.com', 'researchgate.net', 'theguardian.com', 'wikia.com', 'scribd.com', 'storage.googleapis.com', 'google.it', 'telegram.me', 'correios.com.br', 'archive.org', 'washingtonpost.com', 'bloomberg.com', 'google.fr', 'fandom.com', 'bbc.com', 'linktr.ee', 'myaccount.google.com', 'buydomains.com', 'google.co.jp', 'msn.com', 'wiley.com', 'dan.com', 'amazon.co.jp', 'list-manage.com', 'webmd.com', 'indiatimes.com', 'nginx.com', 'fb.com', 'google.co.uk', 'wix.com', 'un.org', 'forbes.com', 'thesun.co.uk', '3ds.com', 'adssettings.google.com', 'dropcatch.com', 'mail.google.com', 'hatena.ne.jp', 'w3.org', 'plesk.com', 'spotify.com', 'mirror.co.uk', 'telegraph.co.uk', 'youronlinechoices.com', 'marketingplatform.google.com', 'typepad.com', 'news.yahoo.com', 'nginx.org', 'bit.ly', 't.co', 'booking.com', 'terra.com.br', 'huffpost.com', 'pinterest.com', 'reuters.com', 'wsj.com', 'creativecommons.org', 'office.com', 'ovhcloud.com', 'time.com', 'sedo.com', 'ru.wikipedia.org', 'wp.com', 'aboutads.info', 'huawei.com', 'planalto.gov.br', 'elpais.com', 'gov.br', 'de.wikipedia.org', 'enable-javascript.com', 'ibm.com', 'techcrunch.com', 'nhk.or.jp', 'ebay.com', 'zoom.us', 'lemonde.fr', 'buzzfeed.com', 'home.pl', 'photos1.blogger.com', 'britannica.com', 'secureserver.net', 'hp.com', 'imageshack.us', 'newsweek.com', 'amazon.es', 'economist.com', 'nasa.gov', 'livejournal.com', 'tmz.com', 'amzn.to', 'example.com', 'yahoo.co.jp', 'akamaihd.net', 'addthis.com', 'perfectdomain.com', 'm.wikipedia.org', 'bandcamp.com', 'ssl-images-amazon.com', 'latimes.com', 'steampowered.com', 'liveinternet.ru', 'change.org', 'walmart.com', 'ign.com', 'instructables.com', 'ouest-france.fr', 'cointernet.com.co', 'abc.net.au', 'hotmart.com', 'npr.org', 'dreamstime.com', 'groups.google.com', 'calameo.com', 'kickstarter.com', 'ovh.com', 'clickbank.net', 'hollywoodreporter.com', 'trustpilot.com', 'guardian.co.uk', 'samsung.com', 'francetvinfo.fr', 'canva.com', 'cnet.com', 'as.com', 'berkeley.edu', 'cbsnews.com', 'playstation.com', 'namecheap.com', 'google.nl', 'plos.org', 'thenai.org', 'networkadvertising.org', 'lin.ee', 'ted.com', 'yelp.com', 'amazon.fr', 'search.yahoo.com', 'discord.gg', 'news.com.au', 'disqus.com', 'loc.gov', 'my.yahoo.com', 'php.net', 'id.wikipedia.org', 'rakuten.co.jp', 'bloglovin.com', 'it.wikipedia.org', 'telegram.org', 'g.page', 'ipv4.google.com', 'books.google.com', 'netflix.com', 'leparisien.fr', 'ja.wikipedia.org', 'express.co.uk', 'g.co', 'privacyshield.gov', 'ggpht.com', 'themeforest.net', 'yandex.ru', 'picasa.google.com', 'abcnews.go.com', 'dw.com', 'lefigaro.fr', 'zippyshare.com', 'detik.com', 'nydailynews.com', 'sagepub.com', 'mega.nz', 't-online.de', 'unesco.org', 'arxiv.org', 'mystrikingly.com', 'deezer.com', 'pexels.com', 'addtoany.com', 'code.google.com', 'shutterstock.com', 'unsplash.com', 'outlook.com', 'dailystar.co.uk', 'sky.com', 'abc.es', 'pl.wikipedia.org', 'psychologytoday.com', 'quora.com', 'gizmodo.com', 'weibo.com', 'business.google.com', 'workspace.google.com', 'skype.com', 'cpanel.com', 'gofundme.com', 'rtve.es', 'welt.de', 'cornell.edu', 'pbs.org', 'eventbrite.com', 'nypost.com', 'hubspot.com', 'tripadvisor.com', 'timeweb.ru', 'wikihow.com', 'stanford.edu', 'rambler.ru', 'soundcloud.com', 'google.pl', 'mozilla.com', 'cnil.fr', 'rt.com', 'bing.com', 'google.ru', 'sakura.ne.jp', 'metro.co.uk', 'android.com', 'safety.google', 'cnbc.com', 'academia.edu', 'godaddy.com', 'nbcnews.com', 'apache.org', 'lavanguardia.com', 'offset.com', 'surveymonkey.com', 'springer.com', 'netlify.app', 'sapo.pt', 'amazon.de', 'gmail.com', 'sendspace.com', 'cambridge.org', 'redbull.com', 'taringa.net', 'ikea.com', 'qq.com', 'thetimes.co.uk', 'wiktionary.org', 'vistaprint.com', 'zendesk.com', 'amazonaws.com', 'aol.com', 'wikipedia.org', 'engadget.com', 'translate.google.com', 'firefox.com', 'photobucket.com', 'cbc.ca', 'behance.net', 'ameblo.jp', 'sciencedirect.com', 'nationalgeographic.com', 'spiegel.de', '20minutos.es', 'mashable.com', 'mit.edu', 'rapidshare.com', 'wired.com', 'dovendi.com', 'icann.org', 'doi.org', 'alicdn.com', 'harvard.edu', 'espn.com', 'finance.yahoo.com', 'marca.com', 'nintendo.com', 'ziddu.com', 'hindustantimes.com', 'statista.com', 'amazon.it', 'elmundo.es', 'goodreads.com', 'doubleclick.net', 'variety.com', 'sciencedaily.com', 'insider.com', 'theverge.com', 'clarin.com', 'naver.com', 'theatlantic.com', 'about.com', 'sputniknews.com', 'yadi.sk', 'cutt.ly', 'telegra.ph', 'yandex.com', 'reg.ru', 'oup.com', 'franceinfo.fr', 'google.ca', 'corriere.it', 'airbnb.com', 'pages.dev', 'strato-hosting.eu', 'slate.com', 'google.co.id', 'pornhub.com', 'thestar.com', 'over-blog.com', 'kotaku.com', 'bp1.blogger.com', 'kompas.com', 'onamae.com', 'getbootstrap.com', 'barnesandnoble.com', 'openai.com', 'orange.fr', 'focus.de', 'worldbank.org', 'scholar.google.com', 'hatena.blog', 'onelink.me', 'natro.com', '123rf.com', 'oecd.org', 'bfmtv.com', 'e-monsite.com', 'wallpapers.com', 'mailchi.mp', 'usgs.gov', 'politico.com', 'pnas.org', 'xinhuanet.com', 'substack.com', 't.ly', 'weforum.org', 'thefreedictionary.com', 'amazon.ca', 'dell.com', 'house.gov', 'xbox.com', 'giphy.com', 'hoax.com', 'itch.io', 'm.me', 'ebay.co.uk', 'fifa.com', 'washington.edu', 'geocities.com', 'fbsbx.com', 'bild.de', 'ndtv.com', 'hilton.com', 'udemy.com', 'asus.com', 'greenpeace.org', 'search.google.com', 'merriam-webster.com', 'prtimes.jp', 'target.com', 'fortune.com', 'ca.gov', 'sfgate.com', 'legifrance.gouv.fr', 'news.livejournal.com', 'lycos.com', 'dribbble.com', 'utexas.edu', 'wetransfer.com', 'automattic.com', 'rollingstone.com', 'newyorker.com', 'canada.ca', 'nymag.com', 'epa.gov', 'thedailybeast.com', 'ea.com', 'xing.com', 'usda.gov', 'allaboutcookies.org', 'pcmag.com', 'mdpi.com', 'zdnet.com', 'evernote.com', 'history.com', 'box.com', 'public-api.wordpress.com', 'answers.com', 'nikkei.com', 'vice.com', 'ox.ac.uk', 'impress.co.jp', 'bp0.blogger.com', 'cbslocal.com', 'ads.google.com', 'intel.com','shopee.co.id','bukalapak.com','akulaku.com']
FEATURE_NAMES = [
'urgency', 'threat', 'money', 'remote_access', 'phishing', 'typosquat', 'impersonation',
'url_count', 'short_url', 'ip_url', 'bad_tld',
'caps_ratio', 'exclamation', 'currency', 'phone', 'long_numbers',
'text_length', 'word_count', 'avg_word_len', 'unique_ratio', 'danger_score'
]
URL_RE = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
SHORT_URL_RE = re.compile(r'(bit\.ly|tinyurl|goo\.gl|t\.co|ow\.ly|buff\.ly|s\.id|shorturl\.at|cutt\.ly)')
IP_URL_RE = re.compile(r'http[s]?://\d+\.\d+\.\d+\.\d+')
BAD_TLD_RE = re.compile(r'\.(xyz|top|win|click|download|review|loan|work|party|cc|biz|info)(/|\b)')
PHONE_RE = re.compile(r'(\+62|\+1|\+44)?[\s.-]?\(?\d{3}\)?[\s.-]?\d{3,4}[\s.-]?\d{4}')
LONG_NUM_RE = re.compile(r'\b\d{4,}\b')
CURRENCY_RE = re.compile(r'(\$|€|Β£|\brp\b|\busd\b)', re.IGNORECASE)
def __init__(self, similarity_threshold=0.84):
self.similarity_threshold = similarity_threshold
self.target_domains = sorted(set(d.lower().strip() for d in self.TARGET_DOMAINS))
self.target_by_suffix = {}
for domain in self.target_domains:
parts = domain.split('.')
suffix = '.'.join(parts[1:]) if len(parts) > 1 else domain
self.target_by_suffix.setdefault(suffix, []).append(domain)
self._extractor = tldextract.TLDExtract(suffix_list_urls=None) if tldextract is not None else None
def fit(self, X, y=None):
return self
@staticmethod
def _count_matches(text, word_list):
text_lower = text.lower()
return sum(1 for w in word_list if w in text_lower)
def _count_typosquat(self, urls):
f_typosquat = 0
for url in urls:
try:
if self._extractor is not None:
ext = self._extractor(url)
if not ext.domain or not ext.suffix:
continue
root_domain = f"{ext.domain}.{ext.suffix}".lower()
full_domain = f"{ext.subdomain}.{ext.domain}.{ext.suffix}".strip('.').lower()
suffix_key = ext.suffix.lower()
else:
host = urlparse(url).netloc.split(':')[0].lower().strip('.')
parts = host.split('.')
if len(parts) < 2:
continue
root_domain = '.'.join(parts[-2:])
full_domain = host
suffix_key = root_domain.split('.', 1)[1]
if root_domain in self.target_domains:
continue
candidates = self.target_by_suffix.get(suffix_key, self.target_domains)
for brand in candidates:
nama_brand = brand.split('.')[0]
if nama_brand in full_domain:
f_typosquat += 1
break
kemiripan = difflib.SequenceMatcher(None, root_domain, brand).ratio()
if self.similarity_threshold < kemiripan < 1.0:
f_typosquat += 1
break
except Exception:
continue
return f_typosquat
def transform(self, X):
features = []
for text in X:
text = str(text)
t = text.lower()
f_urgency = self._count_matches(t, self.URGENCY_WORDS)
f_threat = self._count_matches(t, self.THREAT_WORDS)
f_money = self._count_matches(t, self.MONEY_WORDS)
f_remote = self._count_matches(t, self.REMOTE_ACCESS)
f_phishing = self._count_matches(t, self.PHISHING_WORDS)
f_imperson = self._count_matches(t, self.IMPERSONATION)
urls = self.URL_RE.findall(t)
f_url_count = len(urls)
f_short_url = len(self.SHORT_URL_RE.findall(t))
f_has_ip_url = int(bool(self.IP_URL_RE.search(t)))
f_suspicious_tld = len(self.BAD_TLD_RE.findall(t))
f_typosquat = self._count_typosquat(urls)
f_caps_ratio = sum(1 for c in text if c.isupper()) / max(len(text), 1)
f_excl_count = text.count('!')
f_currency = len(self.CURRENCY_RE.findall(text))
f_has_phone = int(bool(self.PHONE_RE.search(text)))
f_num_count = len(self.LONG_NUM_RE.findall(text))
words = t.split()
f_text_len = len(text)
f_word_count = len(words)
f_avg_word_len = np.mean([len(w) for w in words]) if words else 0.0
f_unique_ratio = len(set(words)) / max(len(words), 1)
f_total_danger = f_urgency + (f_threat * 2) + f_money + (f_remote * 3) + (f_phishing * 2) + (f_typosquat * 5)
features.append([
f_urgency, f_threat, f_money, f_remote, f_phishing, f_typosquat, f_imperson,
f_url_count, f_short_url, f_has_ip_url, f_suspicious_tld,
f_caps_ratio, f_excl_count, f_currency, f_has_phone, f_num_count,
f_text_len, f_word_count, f_avg_word_len, f_unique_ratio,
f_total_danger
])
return np.array(features, dtype=np.float32)
class EnsembleScamDetector:
def __init__(self, lr, svm, xgb_model, weights=(0.3, 0.3, 0.4)):
self.lr = lr
self.svm = svm
self.xgb = xgb_model
self.weights = np.asarray(weights, dtype=np.float64)
total = self.weights.sum()
self.weights = self.weights / total if total > 0 else np.asarray([0.3, 0.3, 0.4], dtype=np.float64)
@property
def classes_(self):
return np.array([0, 1])
def predict_proba(self, X):
probs = self.weights[0] * self.lr.predict_proba(X) + self.weights[1] * self.svm.predict_proba(X) + self.weights[2] * self.xgb.predict_proba(X)
row_sum = np.clip(probs.sum(axis=1, keepdims=True), 1e-12, None)
return probs / row_sum
def predict(self, X, threshold=0.45):
return (self.predict_proba(X)[:, 1] >= threshold).astype(int)
def preprocess_text(text):
text = html.unescape(str(text)).lower()
text = re.sub(r'<[^>]+>', ' ', text)
text = re.sub(r'\b[\w\.-]+@[\w\.-]+\.\w+\b', ' EMAIL ', text)
text = re.sub(r'http\S+|www\S+', ' URL ', text)
text = re.sub(r'\b\d{10,}\b', ' LONGNUM ', text)
text = re.sub(r'(.)\1{3,}', r'\1\1', text)
text = re.sub(r'[^\w\s!?$]', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def evaluate_model(name, y_true, y_prob, threshold=0.5):
y_pred = (y_prob >= threshold).astype(int)
acc = accuracy_score(y_true, y_pred)
pre = precision_score(y_true, y_pred, zero_division=0)
rec = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
auc = roc_auc_score(y_true, y_prob)
pr_auc = average_precision_score(y_true, y_prob)
cm = confusion_matrix(y_true, y_pred)
print(f"\n{'='*70}")
print(f" Model: {name}")
print(
f" Akurasi: {acc*100:.2f}% | Precision: {pre*100:.2f}% | Recall: {rec*100:.2f}% | "
f"F1: {f1*100:.2f}% | AUC: {auc*100:.2f}% | PR-AUC: {pr_auc*100:.2f}%"
)
print(f" Confusion Matrix:\n{cm}")
return {
'accuracy': float(acc),
'precision': float(pre),
'recall': float(rec),
'f1': float(f1),
'roc_auc': float(auc),
'pr_auc': float(pr_auc),
'threshold': float(threshold)
}
@dataclass
class ThresholdResult:
threshold: float
fbeta: float
precision: float
recall: float
def normalize_label(value):
if pd.isna(value):
return None
if isinstance(value, (int, np.integer)):
return int(value > 0)
if isinstance(value, (float, np.floating)):
return int(value > 0.0)
s = str(value).strip().lower()
if s in {'spam', 'scam', 'phishing', '1', 'true', 'yes', 'fraud'}:
return 1
if s in {'ham', 'safe', 'normal', '0', 'false', 'no', 'legit'}:
return 0
return None
def search_best_threshold(y_true, y_prob, beta=1.5):
best = ThresholdResult(threshold=0.45, fbeta=-1.0, precision=0.0, recall=0.0)
for th in np.arange(0.2, 0.81, 0.01):
y_pred = (y_prob >= th).astype(int)
pre = precision_score(y_true, y_pred, zero_division=0)
rec = recall_score(y_true, y_pred, zero_division=0)
fbeta = fbeta_score(y_true, y_pred, beta=beta, zero_division=0)
if (fbeta > best.fbeta) or (np.isclose(fbeta, best.fbeta) and rec > best.recall):
best = ThresholdResult(threshold=float(th), fbeta=float(fbeta), precision=float(pre), recall=float(rec))
return best
def tune_ensemble_weights(clf_lr, clf_svm, clf_xgb, X_val, y_val):
p_lr = clf_lr.predict_proba(X_val)[:, 1]
p_svm = clf_svm.predict_proba(X_val)[:, 1]
p_xgb = clf_xgb.predict_proba(X_val)[:, 1]
best_w = (0.3, 0.3, 0.4)
best_th = ThresholdResult(threshold=0.45, fbeta=-1.0, precision=0.0, recall=0.0)
grid = np.arange(0.0, 1.01, 0.1)
for w1 in grid:
for w2 in grid:
w3 = 1.0 - w1 - w2
if w3 < 0:
continue
p = (w1 * p_lr) + (w2 * p_svm) + (w3 * p_xgb)
cur = search_best_threshold(y_val, p, beta=1.5)
if cur.fbeta > best_th.fbeta:
best_w = (float(w1), float(w2), float(w3))
best_th = cur
total = max(sum(best_w), 1e-12)
best_w = tuple(round(w / total, 4) for w in best_w)
return best_w, best_th
def apply_rule_boost(base_prob, raw_feat):
idx = {name: i for i, name in enumerate(ScamFeatureExtractor.FEATURE_NAMES)}
reasons = []
boost = 0.0
if raw_feat[idx['typosquat']] > 0 and raw_feat[idx['url_count']] > 0:
boost += 0.14
reasons.append('Domain mirip brand resmi (typosquatting)')
if raw_feat[idx['remote_access']] > 0 and raw_feat[idx['phishing']] > 0:
boost += 0.12
reasons.append('Permintaan remote access + indikator phishing')
if raw_feat[idx['threat']] > 0 and raw_feat[idx['urgency']] > 0:
boost += 0.08
reasons.append('Pola ancaman + urgensi tinggi')
if raw_feat[idx['short_url']] > 0 and raw_feat[idx['phishing']] > 0:
boost += 0.08
reasons.append('Short URL + indikator phishing')
if raw_feat[idx['danger_score']] >= 8:
boost += 0.10
reasons.append('Skor bahaya tinggi')
final_prob = min(0.995, max(0.0, base_prob + boost))
return final_prob, reasons
def _extract_svm_linear_coef(calibrated_svm):
"""Ambil rata-rata koefisien LinearSVC dari model yang sudah dikalibrasi."""
if not hasattr(calibrated_svm, 'calibrated_classifiers_'):
return None
coef_list = []
for calibrated in calibrated_svm.calibrated_classifiers_:
estimator = getattr(calibrated, 'estimator', None)
if estimator is None:
estimator = getattr(calibrated, 'base_estimator', None)
if estimator is not None and hasattr(estimator, 'coef_'):
coef_list.append(np.asarray(estimator.coef_[0], dtype=np.float64))
if not coef_list:
return None
return np.mean(np.vstack(coef_list), axis=0)
def explain_prediction_tokens(pesan_clean, top_n=8):
"""
Jelaskan kontribusi token pada prediksi scam berdasarkan model linear
(kombinasi Logistic Regression + LinearSVC).
"""
try:
X_word = tfidf_word.transform([pesan_clean])
non_zero_idx = X_word.nonzero()[1]
if non_zero_idx.size == 0:
return {
'method': 'LR+SVM-linear',
'scam_tokens': [],
'safe_tokens': [],
'note': 'Tidak ada token dari pesan yang cocok dengan vocabulary TF-IDF.'
}
token_names = tfidf_word.get_feature_names_out()
lr_coef = np.asarray(ensemble.lr.coef_[0], dtype=np.float64) if hasattr(ensemble.lr, 'coef_') else None
svm_coef = _extract_svm_linear_coef(ensemble.svm)
if lr_coef is None and svm_coef is None:
return {
'method': 'unavailable',
'scam_tokens': [],
'safe_tokens': [],
'note': 'Koefisien linear tidak tersedia untuk explainability token-level.'
}
if lr_coef is None:
combined_coef = svm_coef
method = 'SVM-linear'
elif svm_coef is None:
combined_coef = lr_coef
method = 'LR-linear'
else:
combined_coef = (ensemble.weights[0] * lr_coef) + (ensemble.weights[1] * svm_coef)
method = 'Weighted(LR+SVM)-linear'
contrib_dense = np.asarray(X_word.multiply(combined_coef).toarray()).ravel()
positive_idx = [i for i in non_zero_idx if contrib_dense[i] > 0]
negative_idx = [i for i in non_zero_idx if contrib_dense[i] < 0]
top_pos = sorted(positive_idx, key=lambda i: contrib_dense[i], reverse=True)[:top_n]
top_neg = sorted(negative_idx, key=lambda i: contrib_dense[i])[:top_n]
scam_tokens = [(str(token_names[i]), float(contrib_dense[i])) for i in top_pos]
safe_tokens = [(str(token_names[i]), float(contrib_dense[i])) for i in top_neg]
return {
'method': method,
'scam_tokens': scam_tokens,
'safe_tokens': safe_tokens,
'note': ''
}
except Exception as e:
return {
'method': 'error',
'scam_tokens': [],
'safe_tokens': [],
'note': f'Explainability gagal: {e}'
}
def explain_for_colab(pesan, top_n=8, threshold=None):
"""
Helper untuk Google Colab.
Return:
- summary: ringkasan hasil deteksi
- token_table: tabel kontribusi token
"""
hasil = detect_scam(pesan, threshold=threshold, show_detail=False)
info = hasil.get('token_explanation', {})
rows = []
for token, score in info.get('scam_tokens', []):
rows.append({'token': token, 'kontribusi': score, 'arah': 'menaikkan_skor_scam'})
for token, score in info.get('safe_tokens', []):
rows.append({'token': token, 'kontribusi': score, 'arah': 'menurunkan_skor_scam'})
token_table = pd.DataFrame(rows)
if not token_table.empty:
token_table['abs_kontribusi'] = token_table['kontribusi'].abs()
token_table = token_table.sort_values('abs_kontribusi', ascending=False).drop(columns=['abs_kontribusi'])
summary = {
'decision': hasil['decision'],
'risk_level': hasil['risk_level'],
'threshold': hasil['threshold'],
'model_prob': hasil['model_prob'],
'final_prob': hasil['final_prob'],
'safe_prob': hasil['safe_prob'],
'explain_method': info.get('method', 'unknown'),
'explain_note': info.get('note', '')
}
return summary, token_table.head(max(1, top_n * 2))
# ============================================================
# 2. LOGIKA BYPASS (LOAD FILE .PKL JIKA ADA, ATAU TRAINING)
# ============================================================
MODEL_PATH = 'scam_detector.pkl'
MODEL_VERSION = '2.0.0'
RANDOM_STATE = 42
DEFAULT_THRESHOLD = 0.45
def _make_dataset_frame(df, text_col, label_col, source_name):
if text_col not in df.columns or label_col not in df.columns:
raise ValueError(f"Kolom tidak cocok untuk {source_name}: text={text_col}, label={label_col}")
out = df[[text_col, label_col]].copy()
out.columns = ['text', 'label']
out['label'] = out['label'].apply(normalize_label)
out = out.dropna(subset=['text', 'label'])
out['text'] = out['text'].astype(str).str.strip()
out = out[out['text'].str.len() >= 3]
out['label'] = out['label'].astype(int)
print(f"βœ… {source_name}: {len(out)} data")
return out
def load_training_data():
frames = []
errors = []
try:
print("πŸ“₯ [1/4] Mengunduh dataset spam detection...")
ds1 = load_dataset("Deysi/spam-detection-dataset", split="train")
df1 = pd.DataFrame(ds1)
col_text = 'text' if 'text' in df1.columns else df1.columns[0]
col_label = 'label' if 'label' in df1.columns else df1.columns[1]
frames.append(_make_dataset_frame(df1, col_text, col_label, 'Deysi/spam-detection-dataset'))
except Exception as e:
errors.append(f"Dataset 1 gagal: {e}")
try:
print("πŸ“₯ [2/4] Mengunduh SMS spam collection...")
ds2 = load_dataset("ucirvine/sms_spam", split="train")
df2 = pd.DataFrame(ds2)
col_text = 'sms' if 'sms' in df2.columns else df2.columns[0]
col_label = 'label' if 'label' in df2.columns else df2.columns[1]
frames.append(_make_dataset_frame(df2, col_text, col_label, 'ucirvine/sms_spam'))
except Exception as e:
errors.append(f"Dataset 2 gagal: {e}")
try:
print("πŸ“₯ [3/4] Mengunduh Enron email dataset...")
ds3 = load_dataset("SetFit/enron_spam", split="train")
df3 = pd.DataFrame(ds3)
if 'subject' in df3.columns and 'message' in df3.columns:
df3['text'] = df3['subject'].fillna('') + ' ' + df3['message'].fillna('')
elif 'text' not in df3.columns:
df3['text'] = df3.iloc[:, 0]
label_col = 'label' if 'label' in df3.columns else 'spam'
df3 = df3.sample(min(5000, len(df3)), random_state=RANDOM_STATE)
frames.append(_make_dataset_frame(df3, 'text', label_col, 'SetFit/enron_spam'))
except Exception as e:
errors.append(f"Dataset 3 gagal: {e}")
try:
print("πŸ“₯ [4/4] Mengunduh dataset Spam/Scam Bahasa Indonesia...")
url_indo = "https://gist.githubusercontent.com/Xnuvers007/ba91613fe98deb8d09bb0abdfb17ef88/raw/8bb5c883e8c0e589efe359f6342e25b5754ff5c9/sms_spam_indo.csv"
df4 = pd.read_csv(url_indo)
frames.append(_make_dataset_frame(df4, 'Pesan', 'Kategori', 'SMS spam Indonesia'))
except Exception as e:
errors.append(f"Dataset 4 gagal: {e}")
if not frames:
raise RuntimeError('Semua dataset gagal dimuat. ' + ' | '.join(errors))
df_all = pd.concat(frames, ignore_index=True)
df_all = df_all.drop_duplicates(subset=['text', 'label']).reset_index(drop=True)
print(f"βœ… Total data training setelah dedup: {len(df_all)}")
print(f"ℹ️ Distribusi label: {df_all['label'].value_counts().to_dict()}")
if errors:
print('⚠️ Sebagian dataset gagal dimuat:')
for err in errors:
print(f" - {err}")
return df_all
def train_and_save_model():
print("\nβš™οΈ Memulai training model...")
df_all = load_training_data()
df_all['text_clean'] = df_all['text'].apply(preprocess_text)
X_clean = df_all['text_clean'].values
X_raw = df_all['text'].values
y = df_all['label'].values
idx = np.arange(len(df_all))
idx_dev, idx_test, y_dev, y_test = train_test_split(
idx,
y,
test_size=0.2,
random_state=RANDOM_STATE,
stratify=y
)
idx_train, idx_val, y_train, y_val = train_test_split(
idx_dev,
y_dev,
test_size=0.15,
random_state=RANDOM_STATE,
stratify=y_dev
)
X_train_clean, X_val_clean, X_test_clean = X_clean[idx_train], X_clean[idx_val], X_clean[idx_test]
X_train_raw, X_val_raw, X_test_raw = X_raw[idx_train], X_raw[idx_val], X_raw[idx_test]
print("πŸ“Š Membangun matriks fitur TF-IDF...")
tfidf_word = TfidfVectorizer(
analyzer='word',
ngram_range=(1, 2),
max_features=60000,
min_df=2,
max_df=0.95,
sublinear_tf=True,
strip_accents='unicode'
)
tfidf_char = TfidfVectorizer(
analyzer='char_wb',
ngram_range=(3, 5),
max_features=30000,
min_df=2,
sublinear_tf=True
)
feat_extractor = ScamFeatureExtractor()
X_train_w = tfidf_word.fit_transform(X_train_clean)
X_val_w = tfidf_word.transform(X_val_clean)
X_test_w = tfidf_word.transform(X_test_clean)
X_train_c = tfidf_char.fit_transform(X_train_clean)
X_val_c = tfidf_char.transform(X_val_clean)
X_test_c = tfidf_char.transform(X_test_clean)
# Handcrafted feature extractor tetap memakai teks asli agar sinyal URL/domain tidak hilang.
X_train_f = csr_matrix(feat_extractor.transform(X_train_raw))
X_val_f = csr_matrix(feat_extractor.transform(X_val_raw))
X_test_f = csr_matrix(feat_extractor.transform(X_test_raw))
X_train_combined = hstack([X_train_w, X_train_c, X_train_f], format='csr')
X_val_combined = hstack([X_val_w, X_val_c, X_val_f], format='csr')
X_test_combined = hstack([X_test_w, X_test_c, X_test_f], format='csr')
print("\nπŸ—οΈ Membangun dan melatih model-model...")
clf_lr = LogisticRegression(C=4.0, max_iter=1500, solver='lbfgs', class_weight='balanced', random_state=RANDOM_STATE)
clf_lr.fit(X_train_combined, y_train)
clf_svm = CalibratedClassifierCV(
estimator=LinearSVC(C=1.0, max_iter=3000, class_weight='balanced', random_state=RANDOM_STATE),
method='sigmoid',
cv=3,
n_jobs=-1
)
clf_svm.fit(X_train_combined, y_train)
if xgb is not None:
scale_pos = float(np.sum(y_train == 0) / max(np.sum(y_train == 1), 1))
clf_xgb = xgb.XGBClassifier(
n_estimators=420,
max_depth=6,
learning_rate=0.06,
subsample=0.85,
colsample_bytree=0.85,
reg_alpha=0.15,
reg_lambda=1.2,
scale_pos_weight=scale_pos,
objective='binary:logistic',
eval_metric='logloss',
random_state=RANDOM_STATE,
n_jobs=-1,
tree_method='hist'
)
clf_xgb.fit(X_train_combined, y_train)
print("βœ… XGBoost aktif")
else:
clf_xgb = LogisticRegression(C=1.5, max_iter=1200, solver='lbfgs', class_weight='balanced', random_state=RANDOM_STATE)
clf_xgb.fit(X_train_combined, y_train)
print("⚠️ xgboost tidak tersedia. Menggunakan fallback model.")
print("\nπŸ“ˆ Evaluasi base model di test set...")
metrics_lr = evaluate_model('Logistic Regression', y_test, clf_lr.predict_proba(X_test_combined)[:, 1], threshold=0.5)
metrics_svm = evaluate_model('SVM (Calibrated)', y_test, clf_svm.predict_proba(X_test_combined)[:, 1], threshold=0.5)
metrics_xgb = evaluate_model('XGBoost/Fallback', y_test, clf_xgb.predict_proba(X_test_combined)[:, 1], threshold=0.5)
print("\n🎯 Tuning bobot ensemble + threshold di validation set...")
best_weights, best_th = tune_ensemble_weights(clf_lr, clf_svm, clf_xgb, X_val_combined, y_val)
ensemble = EnsembleScamDetector(clf_lr, clf_svm, clf_xgb, weights=best_weights)
print(
f"Bobot terbaik -> LR={best_weights[0]:.2f}, SVM={best_weights[1]:.2f}, XGB={best_weights[2]:.2f} | "
f"Threshold={best_th.threshold:.2f} | F-beta(val)={best_th.fbeta:.4f}"
)
metrics_ensemble = evaluate_model(
'Ensemble (Tuned)',
y_test,
ensemble.predict_proba(X_test_combined)[:, 1],
threshold=best_th.threshold
)
model_bundle = {
'version': MODEL_VERSION,
'created_at': datetime.utcnow().isoformat(timespec='seconds') + 'Z',
'tfidf_word': tfidf_word,
'tfidf_char': tfidf_char,
'feat_extractor': feat_extractor,
'ensemble': ensemble,
'threshold': float(best_th.threshold),
'weights': tuple(float(v) for v in best_weights),
'validation_search': {
'threshold': float(best_th.threshold),
'fbeta': float(best_th.fbeta),
'precision': float(best_th.precision),
'recall': float(best_th.recall)
},
'test_metrics': {
'lr': metrics_lr,
'svm': metrics_svm,
'xgb': metrics_xgb,
'ensemble': metrics_ensemble
}
}
with open(MODEL_PATH, 'wb') as f:
pickle.dump(model_bundle, f)
print(f"\nπŸ’Ύ Model berhasil disimpan ke: {MODEL_PATH}")
return model_bundle
def load_or_train_model():
if os.path.exists(MODEL_PATH):
print(f"\nβœ… FILE MODEL DITEMUKAN: {MODEL_PATH}")
print("⏳ Sedang memuat model dari disk...\n")
try:
with open(MODEL_PATH, 'rb') as f:
model_bundle = pickle.load(f)
required_keys = ['tfidf_word', 'tfidf_char', 'feat_extractor', 'ensemble']
missing = [k for k in required_keys if k not in model_bundle]
if missing:
raise ValueError(f"Model bundle tidak lengkap, key hilang: {missing}")
loaded_version = model_bundle.get('version', '1.x')
print(f"πŸš€ AI SIAP DIGUNAKAN! Versi model: {loaded_version}")
if loaded_version != MODEL_VERSION:
print("⚠️ Versi model lama terdeteksi. Disarankan retrain untuk kualitas terbaru.")
return model_bundle
except Exception as e:
print(f"⚠️ Gagal memuat model lama: {e}")
print("πŸ” Memulai training ulang model...\n")
return train_and_save_model()
print("\n⚠️ File model belum ada. Memulai Download & Training...\n")
return train_and_save_model()
model_bundle = load_or_train_model()
tfidf_word = model_bundle['tfidf_word']
tfidf_char = model_bundle['tfidf_char']
feat_extractor = model_bundle['feat_extractor']
ensemble = model_bundle['ensemble']
model_threshold = float(model_bundle.get('threshold', DEFAULT_THRESHOLD))
print("\nTip Colab: gunakan explain_for_colab('teks pesan') untuk melihat token pemicu prediksi.")
# ============================================================
# 3. FUNGSI DETEKSI & MODE INTERAKTIF (SELALU BERJALAN)
# ============================================================
def get_risk_level(prob):
if prob >= 0.85:
return "SANGAT TINGGI"
if prob >= 0.65:
return "TINGGI"
if prob >= 0.45:
return "SEDANG"
if prob >= 0.25:
return "RENDAH"
return "SANGAT RENDAH"
def get_recommendation(prob):
if prob >= 0.85:
return "Blokir pengirim, jangan klik link, jangan beri OTP/PIN, lalu laporkan."
if prob >= 0.65:
return "Verifikasi via kanal resmi sebelum merespons atau transfer."
if prob >= 0.45:
return "Jangan berikan data sensitif sebelum validasi manual."
return "Risiko rendah, tetap waspada bila diminta data pribadi."
def detect_scam(pesan, threshold=None, show_detail=True):
if threshold is None:
threshold = model_threshold
pesan_clean = preprocess_text(pesan)
X_w = tfidf_word.transform([pesan_clean])
X_c = tfidf_char.transform([pesan_clean])
X_f = csr_matrix(feat_extractor.transform([pesan]))
X_combined = hstack([X_w, X_c, X_f], format='csr')
base_prob = float(ensemble.predict_proba(X_combined)[0, 1])
raw_feat = feat_extractor.transform([pesan])[0]
p_scam, rule_reasons = apply_rule_boost(base_prob, raw_feat)
p_aman = 1.0 - p_scam
is_scam = p_scam >= threshold
nama_fitur = ScamFeatureExtractor.FEATURE_NAMES
non_core = {'text_length', 'word_count', 'avg_word_len', 'unique_ratio', 'caps_ratio'}
sinyal_aktif = [(n, float(v)) for n, v in zip(nama_fitur, raw_feat) if v > 0 and n not in non_core]
sinyal_aktif.sort(key=lambda x: x[1], reverse=True)
token_explanation = explain_prediction_tokens(pesan_clean, top_n=8)
risk_level = get_risk_level(p_scam)
keputusan = 'SCAM / SPAM / PHISHING' if is_scam else 'PESAN AMAN (HAM)'
if show_detail:
print("\n" + "=" * 70)
print("LAPORAN DETEKSI SCAM/PHISHING")
print("=" * 70)
print(f"Pesan : {pesan[:140]}{'...' if len(pesan) > 140 else ''}")
print(f"Keputusan : {keputusan}")
print(f"Risk Level : {risk_level}")
print(f"Threshold : {threshold:.2f}")
print("-" * 70)
print(f"Prob SCAM (model) : {base_prob * 100:6.2f}%")
print(f"Prob SCAM (akhir) : {p_scam * 100:6.2f}%")
print(f"Prob AMAN : {p_aman * 100:6.2f}%")
if rule_reasons:
print("Rule-based boost:")
for r in rule_reasons:
print(f" - {r}")
if sinyal_aktif:
print("Sinyal bahaya terdeteksi (top 8):")
for nama, val in sinyal_aktif[:8]:
print(f" - {nama.replace('_', ' '):<24}: {val:.2f}")
else:
print("Tidak ada sinyal bahaya dominan.")
print("Token-level explainability:")
if token_explanation.get('scam_tokens'):
print(f" - Token pemicu SCAM ({token_explanation.get('method', 'N/A')}):")
for token, score in token_explanation['scam_tokens']:
print(f" + {token:<24}: {score:.5f}")
if token_explanation.get('safe_tokens'):
print(" - Token yang menurunkan skor scam:")
for token, score in token_explanation['safe_tokens']:
print(f" - {token:<24}: {score:.5f}")
if (not token_explanation.get('scam_tokens')) and (not token_explanation.get('safe_tokens')):
note = token_explanation.get('note', 'Token explainability tidak tersedia untuk input ini.')
print(f" - {note}")
print("Rekomendasi : " + get_recommendation(p_scam))
print("=" * 70)
return {
'is_scam': bool(is_scam),
'decision': keputusan,
'risk_level': risk_level,
'threshold': float(threshold),
'model_prob': float(base_prob),
'final_prob': float(p_scam),
'safe_prob': float(p_aman),
'signals': sinyal_aktif,
'rule_reasons': rule_reasons,
'token_explanation': token_explanation
}
# Uji coba cepat
print("\n" + "#"*60)
print(" UJI COBA SISTEM OTOMATIS")
detect_scam("Install Anydesk for me to access remotely. You should pass the video/ID verification to avoid suspension.")
# Mode Interaktif
print("\nSCAM DETECTOR - Mode Interaktif")
print("Ketik pesan yang ingin dicek. Perintah:")
print(" - 'keluar' untuk berhenti")
print(" - '/threshold 0.50' untuk mengubah ambang keputusan")
print(" - '/detail on' atau '/detail off' untuk toggle detail output\n")
is_colab = importlib.util.find_spec('google.colab') is not None
if is_colab:
print("Google Colab terdeteksi. Mode interaktif otomatis dilewati.")
print("Gunakan detect_scam(...) atau explain_for_colab(...) langsung di cell berikutnya.")
else:
detail_mode = True
while True:
try:
pesan_input = input("Masukkan pesan: ").strip()
if pesan_input.lower() in ['keluar', 'exit', 'quit', 'q', '']:
print("Sesi selesai.")
break
if pesan_input.lower().startswith('/threshold'):
parts = pesan_input.split()
if len(parts) == 2:
try:
new_th = float(parts[1])
if 0 < new_th < 1:
model_threshold = new_th
print(f"Threshold diubah menjadi {model_threshold:.2f}")
else:
print("Threshold harus di antara 0 dan 1")
except ValueError:
print("Format salah. Contoh: /threshold 0.50")
else:
print("Format salah. Contoh: /threshold 0.50")
continue
if pesan_input.lower().startswith('/detail'):
parts = pesan_input.split()
if len(parts) == 2 and parts[1].lower() in ['on', 'off']:
detail_mode = (parts[1].lower() == 'on')
print(f"Detail mode: {'ON' if detail_mode else 'OFF'}")
else:
print("Format salah. Gunakan /detail on atau /detail off")
continue
hasil = detect_scam(pesan_input, threshold=model_threshold, show_detail=detail_mode)
if not detail_mode:
print(
f"Keputusan={hasil['decision']} | Risk={hasil['risk_level']} | "
f"Prob={hasil['final_prob'] * 100:.2f}%"
)
except (KeyboardInterrupt, EOFError):
print("\nSesi selesai.")
break
# Generated from: scam_detector_v3.ipynb
# Converted at: 2026-03-26T23:50:16.036Z
# Next step (optional): refactor into modules & generate tests with RunCell
# Quick start: pip install runcell
# # πŸ›‘οΈ Advanced AI Scam & Phishing Detector β€” v3.0
#
# **Perubahan dari v2.0:**
# - βœ… **BERT / IndoBERT** β€” transformer model untuk akurasi maksimal
# - βœ… **SHAP Explainability** β€” visualisasi kata-kata yang paling mempengaruhi deteksi
# - βœ… **Optuna Hyperparameter Tuning** β€” pencarian parameter otomatis
# - βœ… **StratifiedKFold Cross-Validation** β€” evaluasi yang lebih terpercaya
# - βœ… **Feedback Loop (Active Learning)** β€” model makin pintar dari koreksi user
# - βœ… Semua fitur v2.0 tetap ada (ensemble, SMOTE, typosquatting, dll)
#
# > πŸ’‘ **Tip**: Aktifkan GPU di Colab β†’ Runtime β†’ Change runtime type β†’ T4 GPU
# > Diperlukan untuk BERT (Cell 6). Model TF-IDF tetap jalan tanpa GPU.
# ## πŸ“¦ CELL 1 β€” Install & Import Library
# ============================================================
# CELL 1 β€” Install semua library (jalankan sekali)
# ============================================================
!pip install datasets xgboost imbalanced-learn shap optuna tldextract transformers accelerate -q
import pandas as pd
import numpy as np
import re
import os
import json
import pickle
import difflib
import warnings
warnings.filterwarnings('ignore')
from datasets import load_dataset
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.metrics import (
accuracy_score, classification_report, confusion_matrix,
roc_auc_score, f1_score, precision_score, recall_score
)
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.calibration import CalibratedClassifierCV
from sklearn.preprocessing import StandardScaler
from scipy.sparse import hstack, csr_matrix
from imblearn.over_sampling import SMOTE
import xgboost as xgb
import shap
import optuna
import tldextract
optuna.logging.set_verbosity(optuna.logging.WARNING)
import torch
print(f"PyTorch version : {torch.__version__}")
print(f"GPU tersedia : {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"GPU : {torch.cuda.get_device_name(0)}")
print("\nβœ… Semua library berhasil diimport!")
# ## πŸ—„οΈ CELL 2 β€” Load & Gabungkan Multi-Dataset
# ============================================================
# CELL 2 β€” Load multi-dataset dari Hugging Face
# ============================================================
all_texts, all_labels = [], []
def safe_load(name, loader_fn):
try:
texts, labels = loader_fn()
all_texts.extend(texts)
all_labels.extend(labels)
spam_n = sum(labels)
print(f" βœ… {name}: {len(texts)} baris (spam={spam_n}, ham={len(texts)-spam_n})")
except Exception as e:
print(f" ⚠️ {name} gagal: {e}")
def norm_label(x):
return 1 if str(x).strip().lower() in ['spam','1','true','1.0'] else 0
print("πŸ“₯ Mengunduh dataset...")
def load_ds1():
ds = load_dataset("Deysi/spam-detection-dataset", split="train")
df = pd.DataFrame(ds)
ct = 'text' if 'text' in df.columns else df.columns[0]
cl = 'label' if 'label' in df.columns else df.columns[1]
return df[ct].tolist(), [norm_label(v) for v in df[cl]]
safe_load("[1/4] Spam Detection Dataset", load_ds1)
def load_ds2():
ds = load_dataset("ucirvine/sms_spam", split="train")
df = pd.DataFrame(ds)
ct = 'sms' if 'sms' in df.columns else df.columns[1]
cl = 'label' if 'label' in df.columns else df.columns[0]
return df[ct].tolist(), [norm_label(v) for v in df[cl]]
safe_load("[2/4] SMS Spam (UCI)", load_ds2)
def load_ds3():
ds = load_dataset("SetFit/enron_spam", split="train")
df = pd.DataFrame(ds)
if 'subject' in df.columns and 'message' in df.columns:
df['text'] = df['subject'].fillna('') + ' ' + df['message'].fillna('')
elif 'text' not in df.columns:
df['text'] = df.iloc[:, 0]
cl = 'label' if 'label' in df.columns else 'spam'
df = df.sample(min(5000, len(df)), random_state=42)
return df['text'].tolist(), [norm_label(v) for v in df[cl]]
safe_load("[3/4] Enron Spam", load_ds3)
def load_ds4():
url = "https://gist.githubusercontent.com/Xnuvers007/ba91613fe98deb8d09bb0abdfb17ef88/raw/8bb5c883e8c0e589efe359f6342e25b5754ff5c9/sms_spam_indo.csv"
df = pd.read_csv(url)
df = df[['Pesan','Kategori']].dropna()
labels = [1 if str(x).strip().lower()=='spam' else 0 for x in df['Kategori']]
return df['Pesan'].tolist(), labels
safe_load("[4/4] SMS Spam Indonesia", load_ds4)
df_all = pd.DataFrame({'text': all_texts, 'label': all_labels})
df_all = df_all.dropna(subset=['text'])
df_all['text'] = df_all['text'].astype(str)
df_all = df_all.drop_duplicates(subset=['text']).reset_index(drop=True)
print(f"\nπŸ“Š TOTAL DATASET GABUNGAN : {len(df_all):,} baris (setelah dedup)")
print(f" πŸ”΄ SCAM/SPAM : {df_all['label'].sum():,} ({df_all['label'].mean()*100:.1f}%)")
print(f" 🟒 AMAN/HAM : {(df_all['label']==0).sum():,} ({(1-df_all['label'].mean())*100:.1f}%)")
# ## βš™οΈ CELL 3 β€” ScamFeatureExtractor (Feature Engineering)
# ============================================================
# CELL 3 β€” Feature Engineering 21 fitur numerik khusus scam
# ============================================================
class ScamFeatureExtractor(BaseEstimator, TransformerMixin):
URGENCY_WORDS = [
'urgent','immediately','asap','right now','limited time','expires',
'deadline','act now','don\'t wait','hurry','segera','sekarang juga',
'cepat','batas waktu','darurat'
]
THREAT_WORDS = [
'suspended','blocked','terminated','banned','closed','account locked',
'verify now','confirm identity','suspension','diblokir','ditangguhkan',
'verifikasi','konfirmasi'
]
MONEY_WORDS = [
'prize','winner','won','lottery','jackpot','free money','cash','reward',
'bonus','gift card','bitcoin','crypto','transfer','wire','western union',
'moneygram','bank account','hadiah','menang','gratis','uang','rekening'
]
REMOTE_ACCESS = [
'anydesk','teamviewer','remote','screen share','remote access','install',
'download app','access your computer','take control','remote desktop',
'vnc','rustdesk','ultraviewer'
]
PHISHING_WORDS = [
'click here','login','password','username','credential','sign in',
'verify your','update your','confirm your','account information',
'billing info','credit card','cvv','social security','ssn','otp','pin','kode otp'
]
IMPERSONATION = [
'amazon','paypal','apple','microsoft','google','facebook','instagram',
'netflix','bank','irs','government','police','bri','bca','mandiri',
'ojk','polisi','pemerintah','tokopedia','shopee','gojek'
]
TARGET_DOMAINS = [
'paypal.com','google.com','apple.com','microsoft.com','facebook.com',
'bca.co.id','bri.co.id','mandiri.co.id','klikbca.com','tokopedia.com',
'shopee.co.id','gojek.com','lazada.co.id','instagram.com','twitter.com',
'amazon.com','netflix.com','youtube.com','tiktok.com','whatsapp.com',
'linkedin.com','discord.com','spotify.com','ebay.com','dropbox.com',
'bukalapak.com','akulaku.com'
]
def fit(self, X, y=None): return self
def _count(self, text, word_list):
t = text.lower()
return sum(1 for w in word_list if w in t)
def transform(self, X):
features = []
for text in X:
text = str(text)
t = text.lower()
f_urgency = self._count(t, self.URGENCY_WORDS)
f_threat = self._count(t, self.THREAT_WORDS)
f_money = self._count(t, self.MONEY_WORDS)
f_remote = self._count(t, self.REMOTE_ACCESS)
f_phishing = self._count(t, self.PHISHING_WORDS)
f_imperson = self._count(t, self.IMPERSONATION)
urls = re.findall(r'http[s]?://(?:[a-zA-Z0-9$\-_.+!*(),]|(?:%[0-9a-fA-F]{2}))+', t)
f_url_count = len(urls)
f_short_url = len(re.findall(r'(bit\.ly|tinyurl|goo\.gl|t\.co|ow\.ly|s\.id|cutt\.ly)', t))
f_has_ip_url = int(bool(re.search(r'http[s]?://\d+\.\d+\.\d+\.\d+', t)))
f_bad_tld = len(re.findall(r'\.(xyz|top|win|click|download|review|loan|work|party|cc|biz)', t))
f_typosquat = 0
for url in urls:
try:
ext = tldextract.extract(url)
root = f"{ext.domain}.{ext.suffix}".lower()
full = f"{ext.subdomain}.{ext.domain}.{ext.suffix}".strip('.').lower()
for brand in self.TARGET_DOMAINS:
brand_name = brand.split('.')[0]
if root == brand: continue
if brand_name in full:
f_typosquat += 1; break
sim = difflib.SequenceMatcher(None, root, brand).ratio()
if 0.80 < sim < 1.0:
f_typosquat += 1; break
except: pass
f_caps = sum(1 for c in text if c.isupper()) / max(len(text), 1)
f_excl = text.count('!')
f_dollar = text.count('$') + text.count('€') + text.count('Β£')
f_phone = int(bool(re.search(r'(\+62|\+1|\+44)?[\s.-]?\(?\d{3}\)?[\s.-]?\d{3,4}[\s.-]?\d{4}', text)))
f_longnum = len(re.findall(r'\b\d{4,}\b', text))
words = t.split()
f_len = len(text)
f_words = len(words)
f_avgwl = float(np.mean([len(w) for w in words])) if words else 0.0
f_unique = len(set(words)) / max(len(words), 1)
f_danger = (f_urgency + f_threat*2 + f_money + f_remote*3 +
f_phishing*2 + f_typosquat*5)
features.append([
f_urgency, f_threat, f_money, f_remote, f_phishing, f_typosquat, f_imperson,
f_url_count, f_short_url, f_has_ip_url, f_bad_tld,
f_caps, f_excl, f_dollar, f_phone, f_longnum,
f_len, f_words, f_avgwl, f_unique, f_danger
])
return np.array(features, dtype=np.float32)
FEAT_NAMES = [
'urgency','threat','money','remote','phishing','typosquat','impersonation',
'url_count','short_url','ip_url','bad_tld',
'caps_ratio','exclamation','currency','phone','long_numbers',
'text_length','word_count','avg_word_len','unique_ratio','danger_score'
]
# Quick test
_ext = ScamFeatureExtractor()
_feat = _ext.transform(["Install Anydesk for remote access. Avoid account suspension NOW!"])[0]
print("βœ… ScamFeatureExtractor OK β€” fitur aktif:")
for n, v in zip(FEAT_NAMES, _feat):
if v > 0 and n not in ['text_length','word_count','avg_word_len','unique_ratio','caps_ratio']:
print(f" ⚠️ {n:<22}: {v:.2f}")
# ## πŸ”§ CELL 4 β€” Preprocessing & Bangun Fitur TF-IDF
# ============================================================
# CELL 4 β€” Preprocessing teks + build feature matrix
# ============================================================
def preprocess(text):
text = str(text).lower()
text = re.sub(r'<[^>]+>', ' ', text)
text = re.sub(r'http\S+|www\S+', ' _URL_ ', text)
text = re.sub(r'\b\d{10,}\b', ' _LONGNUM_ ', text)
text = re.sub(r'[^\w\s!?$@#]', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
print("πŸ”§ Preprocessing teks...")
df_all['text_clean'] = df_all['text'].apply(preprocess)
X_raw = df_all['text_clean'].values
X_orig = df_all['text'].values # untuk SHAP & BERT
y = df_all['label'].values
X_train_raw, X_test_raw, y_train, y_test, X_train_orig, X_test_orig = train_test_split(
X_raw, y, X_orig, test_size=0.2, random_state=42, stratify=y
)
print(f" Train: {len(X_train_raw):,} | Test: {len(X_test_raw):,}")
# ---- TF-IDF ----
print("πŸ“Š Membangun TF-IDF...")
tfidf_word = TfidfVectorizer(
analyzer='word', ngram_range=(1,2), max_features=50000,
min_df=2, max_df=0.95, sublinear_tf=True, strip_accents='unicode'
)
tfidf_char = TfidfVectorizer(
analyzer='char_wb', ngram_range=(3,5), max_features=20000,
min_df=3, sublinear_tf=True
)
feat_ext = ScamFeatureExtractor()
Xtw_tr = tfidf_word.fit_transform(X_train_raw)
Xtw_te = tfidf_word.transform(X_test_raw)
Xtc_tr = tfidf_char.fit_transform(X_train_raw)
Xtc_te = tfidf_char.transform(X_test_raw)
Xf_tr = csr_matrix(feat_ext.transform(X_train_raw))
Xf_te = csr_matrix(feat_ext.transform(X_test_raw))
X_tr = hstack([Xtw_tr, Xtc_tr, Xf_tr])
X_te = hstack([Xtw_te, Xtc_te, Xf_te])
print(f" βœ… Dimensi fitur: {X_tr.shape[1]:,} kolom")
# ---- SMOTE ----
print("βš–οΈ SMOTE class balancing...")
smote = SMOTE(random_state=42)
X_tr_sm, y_tr_sm = smote.fit_resample(X_tr, y_train)
print(f" Before: {X_tr.shape[0]:,} | After SMOTE: {X_tr_sm.shape[0]:,}")
# ## πŸ“ CELL 5 β€” StratifiedKFold Cross-Validation
# ============================================================
# CELL 5 β€” StratifiedKFold: evaluasi lebih terpercaya
# ============================================================
print("πŸ“ Menjalankan 5-Fold Stratified Cross-Validation...")
print(" (menggunakan data SEBELUM SMOTE untuk evaluasi yang realistis)\n")
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
# Hanya pakai TF-IDF word (cepat) untuk CV baseline
clf_cv = LogisticRegression(C=5.0, max_iter=500, class_weight='balanced', random_state=42)
cv_acc = cross_val_score(clf_cv, Xtw_tr, y_train, cv=skf, scoring='accuracy', n_jobs=-1)
cv_f1 = cross_val_score(clf_cv, Xtw_tr, y_train, cv=skf, scoring='f1_weighted', n_jobs=-1)
cv_auc = cross_val_score(clf_cv, Xtw_tr, y_train, cv=skf, scoring='roc_auc', n_jobs=-1)
print(" Cross-Validation Results (Logistic Regression, TF-IDF word):")
print(f" {'Fold':<8} {'Accuracy':>10} {'F1':>10} {'AUC':>10}")
print(" " + "-"*42)
for i, (a, f, u) in enumerate(zip(cv_acc, cv_f1, cv_auc), 1):
print(f" Fold {i:<4} {a*100:>9.2f}% {f*100:>9.2f}% {u*100:>9.2f}%")
print(" " + "-"*42)
print(f" {'Mean':<8} {cv_acc.mean()*100:>9.2f}% {cv_f1.mean()*100:>9.2f}% {cv_auc.mean()*100:>9.2f}%")
print(f" {'Β±Std':<8} {cv_acc.std()*100:>9.2f}% {cv_f1.std()*100:>9.2f}% {cv_auc.std()*100:>9.2f}%")
print(f"\nβœ… Std rendah = model stabil, tidak overfit ke satu split")
# ## πŸ” CELL 6 β€” Optuna Hyperparameter Tuning (XGBoost)
# ============================================================
# CELL 6 β€” Optuna: cari hyperparameter XGBoost terbaik
# ============================================================
print("πŸ” Optuna Hyperparameter Tuning untuk XGBoost...")
print(" Menjalankan 30 trial otomatis (Β±3–5 menit)...\n")
def objective(trial):
params = {
'n_estimators' : trial.suggest_int('n_estimators', 100, 500),
'max_depth' : trial.suggest_int('max_depth', 3, 9),
'learning_rate' : trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'subsample' : trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree' : trial.suggest_float('colsample_bytree', 0.5, 1.0),
'min_child_weight' : trial.suggest_int('min_child_weight', 1, 10),
'gamma' : trial.suggest_float('gamma', 0, 0.5),
'reg_alpha' : trial.suggest_float('reg_alpha', 1e-5, 1.0, log=True),
'reg_lambda' : trial.suggest_float('reg_lambda', 1e-5, 1.0, log=True),
'use_label_encoder': False,
'eval_metric' : 'logloss',
'random_state' : 42,
'n_jobs' : 1, #-1,
'scale_pos_weight' : (y_train==0).sum() / max((y_train==1).sum(), 1)
}
clf = xgb.XGBClassifier(**params)
# 3-fold CV di dalam Optuna (lebih cepat dari 5-fold)
cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
# scores = cross_val_score(clf, X_tr_sm, y_tr_sm, cv=cv, scoring='roc_auc', n_jobs=-1)
scores = cross_val_score(clf, X_tr_sm, y_tr_sm, cv=cv, scoring='roc_auc', n_jobs=1)
return scores.mean()
study = optuna.create_study(direction='maximize',
sampler=optuna.samplers.TPESampler(seed=42))
study.optimize(objective, n_trials=30, show_progress_bar=True)
BEST_PARAMS = study.best_params
BEST_PARAMS.update({
'use_label_encoder': False,
'eval_metric': 'logloss',
'random_state': 42,
'n_jobs': -1,
'scale_pos_weight': (y_train==0).sum() / max((y_train==1).sum(), 1)
})
print(f"\nπŸ† Best AUC dari Optuna : {study.best_value*100:.3f}%")
print(" Parameter terbaik:")
for k, v in study.best_params.items():
print(f" {k:<22}: {v}")
# ## πŸ€– CELL 7 β€” Latih Semua Model dengan Parameter Terbaik
# ============================================================
# CELL 7 β€” Latih LR + SVM + XGBoost (tuned)
# ============================================================
print("πŸ—οΈ Melatih model dengan parameter terbaik...")
# Logistic Regression
print("⏳ Logistic Regression...")
clf_lr = LogisticRegression(
C=5.0, max_iter=1000, solver='lbfgs',
class_weight='balanced', random_state=42
)
clf_lr.fit(X_tr_sm, y_tr_sm)
# SVM
print("⏳ SVM (calibrated)...")
clf_svm = CalibratedClassifierCV(
LinearSVC(C=1.0, max_iter=2000, class_weight='balanced', random_state=42)
)
clf_svm.fit(X_tr_sm, y_tr_sm)
# XGBoost dengan parameter dari Optuna
print("⏳ XGBoost (Optuna-tuned)...")
clf_xgb = xgb.XGBClassifier(**BEST_PARAMS)
clf_xgb.fit(X_tr_sm, y_tr_sm)
print("\nβœ… Semua model selesai dilatih!")
# ---- Evaluasi masing-masing ----
def eval_model(name, clf):
yp = clf.predict(X_te)
ypr = clf.predict_proba(X_te)[:, 1]
return {
'name': name,
'acc' : accuracy_score(y_test, yp),
'f1' : f1_score(y_test, yp, average='weighted'),
'auc' : roc_auc_score(y_test, ypr),
'prec': precision_score(y_test, yp, zero_division=0),
'rec' : recall_score(y_test, yp, zero_division=0)
}
results = [
eval_model('Logistic Regression', clf_lr),
eval_model('SVM (calibrated)', clf_svm),
eval_model('XGBoost (tuned)', clf_xgb)
]
print(f"\n{'='*65}")
print(f" {'Model':<24} {'Acc':>7} {'F1':>7} {'AUC':>7} {'Prec':>7} {'Recall':>7}")
print("-"*65)
for r in results:
print(f" {r['name']:<24} {r['acc']*100:>6.2f}% {r['f1']*100:>6.2f}% "
f"{r['auc']*100:>6.2f}% {r['prec']*100:>6.2f}% {r['rec']*100:>6.2f}%")
print('='*65)
# ## πŸ† CELL 8 β€” Voting Ensemble Final
# ============================================================
# CELL 8 β€” Ensemble: gabungkan LR + SVM + XGBoost
# ============================================================
class EnsembleDetector:
"""Weighted soft-voting ensemble."""
def __init__(self, lr, svm, xgb_m, w=(0.25, 0.25, 0.50)):
self.lr, self.svm, self.xgb = lr, svm, xgb_m
self.w = w
def predict_proba(self, X):
return (self.w[0]*self.lr.predict_proba(X) +
self.w[1]*self.svm.predict_proba(X) +
self.w[2]*self.xgb.predict_proba(X))
def predict(self, X, thr=0.45):
return (self.predict_proba(X)[:, 1] >= thr).astype(int)
ensemble = EnsembleDetector(clf_lr, clf_svm, clf_xgb)
yp_ens = ensemble.predict(X_te)
ypr_ens = ensemble.predict_proba(X_te)[:, 1]
acc_ens = accuracy_score(y_test, yp_ens)
f1_ens = f1_score(y_test, yp_ens, average='weighted')
auc_ens = roc_auc_score(y_test, ypr_ens)
prec_ens = precision_score(y_test, yp_ens, zero_division=0)
rec_ens = recall_score(y_test, yp_ens, zero_division=0)
print("="*55)
print(" πŸ† VOTING ENSEMBLE β€” HASIL FINAL")
print("="*55)
print(f" Akurasi : {acc_ens*100:.2f}%")
print(f" F1-Score : {f1_ens*100:.2f}%")
print(f" AUC-ROC : {auc_ens*100:.2f}%")
print(f" Precision: {prec_ens*100:.2f}%")
print(f" Recall : {rec_ens*100:.2f}%")
print("="*55)
print(f"\n{classification_report(y_test, yp_ens, target_names=['HAM','SPAM/SCAM'])}")
cm = confusion_matrix(y_test, yp_ens)
print(" Confusion Matrix:")
print(f" Pred HAM Pred SCAM")
print(f" Asli HAM {cm[0,0]:>8d} {cm[0,1]:>9d}")
print(f" Asli SCAM {cm[1,0]:>8d} {cm[1,1]:>9d}")
# ## πŸ€— CELL 9 β€” IndoBERT / DistilBERT (Transformer, butuh GPU)
# ============================================================
# CELL 9 β€” Fine-tuning IndoBERT untuk Bahasa Indonesia
# (DistilBERT untuk teks Inggris)
#
# ⚑ BUTUH GPU β€” aktifkan: Runtime β†’ Change runtime type β†’ T4 GPU
# ============================================================
USE_GPU = torch.cuda.is_available()
TRAIN_BERT = True # <-- ubah ke False untuk skip cell ini
if not TRAIN_BERT:
print("⏭️ Cell ini di-skip (TRAIN_BERT=False)")
elif not USE_GPU:
print("⚠️ GPU tidak ditemukan β€” BERT di-skip.")
print(" Aktifkan di: Runtime β†’ Change runtime type β†’ T4 GPU")
BERT_AVAILABLE = False
else:
from transformers import (
AutoTokenizer, AutoModelForSequenceClassification,
TrainingArguments, Trainer
)
from torch.utils.data import Dataset as TorchDataset
from sklearn.metrics import accuracy_score
# Pilih model: IndoBERT untuk Indo-heavy, DistilBERT untuk Inggris
# MODEL_NAME = "indolem/indobert-base-uncased" # lebih baik untuk Bahasa Indonesia
MODEL_NAME = "distilbert-base-uncased" # lebih cepat, universal
print(f"πŸ€— Fine-tuning: {MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
# Batasi dataset untuk fine-tuning (lebih cepat)
MAX_TRAIN = 6000
MAX_TEST = 1500
idx_tr = np.random.RandomState(42).choice(len(X_train_orig), min(MAX_TRAIN, len(X_train_orig)), replace=False)
idx_te = np.random.RandomState(42).choice(len(X_test_orig), min(MAX_TEST, len(X_test_orig)), replace=False)
class ScamDataset(TorchDataset):
def __init__(self, texts, labels, tok, maxlen=128):
self.enc = tok(list(texts), truncation=True, padding=True,
max_length=maxlen, return_tensors='pt')
self.labels = torch.tensor(labels)
def __len__(self): return len(self.labels)
def __getitem__(self, i):
return {k: v[i] for k, v in self.enc.items()} | {'labels': self.labels[i]}
ds_train = ScamDataset(X_train_orig[idx_tr], y_train[idx_tr], tokenizer)
ds_test = ScamDataset(X_test_orig[idx_te], y_test[idx_te], tokenizer)
bert_model = AutoModelForSequenceClassification.from_pretrained(
MODEL_NAME, num_labels=2
)
def compute_metrics(p):
preds = np.argmax(p.predictions, axis=1)
return {'accuracy': accuracy_score(p.label_ids, preds),
'f1': f1_score(p.label_ids, preds, average='weighted')}
training_args = TrainingArguments(
output_dir='./bert_scam',
num_train_epochs=3,
per_device_train_batch_size=32,
per_device_eval_batch_size=64,
warmup_steps=100,
weight_decay=0.01,
evaluation_strategy='epoch',
save_strategy='epoch',
load_best_model_at_end=True,
metric_for_best_model='f1',
fp16=True,
report_to='none',
logging_steps=50
)
trainer = Trainer(
model=bert_model,
args=training_args,
train_dataset=ds_train,
eval_dataset=ds_test,
compute_metrics=compute_metrics
)
print("⏳ Fine-tuning BERT (Β±5–10 menit dengan T4 GPU)...")
trainer.train()
bert_results = trainer.evaluate()
print(f"\nβœ… BERT Accuracy : {bert_results['eval_accuracy']*100:.2f}%")
print(f" BERT F1 : {bert_results['eval_f1']*100:.2f}%")
BERT_AVAILABLE = True
bert_model.eval()
# ## πŸ“Š CELL 10 β€” SHAP Explainability
# ============================================================
# CELL 10 β€” SHAP: lihat kata mana paling pengaruhi prediksi
# ============================================================
print("πŸ” Menghitung SHAP values (Β±1–2 menit)...")
# Gunakan sampel kecil supaya cepat
SHAP_SAMPLE = 500
idx_shap = np.random.RandomState(42).choice(X_tr_sm.shape[0], SHAP_SAMPLE, replace=False)
X_shap_bg = X_tr_sm[idx_shap] # background untuk SHAP
# SHAP TreeExplainer untuk XGBoost
explainer = shap.TreeExplainer(clf_xgb, X_shap_bg)
# Hitung SHAP values pada test set
idx_test_shap = np.random.RandomState(0).choice(X_te.shape[0], min(200, X_te.shape[0]), replace=False)
shap_values = explainer.shap_values(X_te[idx_test_shap])
# Feature names: TF-IDF word + char + manual
fn_word = tfidf_word.get_feature_names_out()
fn_char = [f"char:{c}" for c in tfidf_char.get_feature_names_out()]
fn_all = list(fn_word) + fn_char + FEAT_NAMES
print("\nπŸ“Š Plot SHAP Summary (kata-kata paling berpengaruh):")
if hasattr(shap_values, '__len__') and len(shap_values) == 2:
sv_use = shap_values[1] # untuk kelas SCAM (label=1)
else:
sv_use = shap_values
shap.summary_plot(
sv_use[:, :len(fn_word)], # hanya TF-IDF word agar plot tidak penuh
X_te[idx_test_shap][:, :len(fn_word)].toarray(),
feature_names=list(fn_word),
max_display=20,
plot_type='bar',
show=True
)
print("\nβœ… Semakin panjang bar merah = semakin kuat sinyal SCAM")
print(" Semakin panjang bar biru = semakin kuat sinyal AMAN")
# ## πŸ” CELL 11 β€” Fungsi Deteksi Lengkap
# ============================================================
# CELL 11 β€” detect_scam() dengan laporan lengkap + BERT option
# ============================================================
def detect_scam(pesan, threshold=0.45, use_bert=False):
"""
Deteksi scam/phishing dengan laporan lengkap.
Parameters:
pesan : str β€” pesan yang ingin dicek
threshold : float β€” ambang batas (default 0.45)
use_bert : bool β€” gunakan BERT jika tersedia (default False)
"""
pesan_clean = preprocess(pesan)
Xw = tfidf_word.transform([pesan_clean])
Xc = tfidf_char.transform([pesan_clean])
Xf = csr_matrix(feat_ext.transform([pesan_clean]))
X = hstack([Xw, Xc, Xf])
# Probabilitas dari ensemble TF-IDF
proba = ensemble.predict_proba(X)[0]
p_aman = float(proba[0])
p_scam_e = float(proba[1])
# Gabungkan dengan BERT jika tersedia
p_scam_final = p_scam_e
bert_info = ""
if use_bert and 'BERT_AVAILABLE' in globals() and BERT_AVAILABLE:
enc = tokenizer([pesan], truncation=True, padding=True,
max_length=128, return_tensors='pt')
with torch.no_grad():
logits = bert_model(**enc).logits
bert_prob = torch.softmax(logits, dim=1)[0][1].item()
p_scam_final = 0.5 * p_scam_e + 0.5 * bert_prob # rata-rata
bert_info = f" BERT Prob (SCAM) : {bert_prob*100:>6.2f}%"
p_aman_final = 1 - p_scam_final
is_scam = p_scam_final >= threshold
# Fitur manual
raw_f = feat_ext.transform([pesan])[0]
sinyal = [(n, v) for n, v in zip(FEAT_NAMES, raw_f)
if v > 0 and n not in ['text_length','word_count','avg_word_len',
'unique_ratio','caps_ratio']]
# Risk level
if p_scam_final >= 0.85: risk = "πŸ”΄ SANGAT TINGGI"
elif p_scam_final >= 0.65: risk = "🟠 TINGGI"
elif p_scam_final >= 0.45: risk = "🟑 SEDANG"
elif p_scam_final >= 0.25: risk = "πŸ”΅ RENDAH"
else: risk = "🟒 SANGAT RENDAH"
bar_s = 'β–ˆ' * min(int(p_scam_final * 30), 30)
bar_a = 'β–ˆ' * min(int(p_aman_final * 30), 30)
print("\n" + "═"*62)
print(" πŸ›‘οΈ LAPORAN DETEKSI SCAM/PHISHING β€” v3.0")
print("═"*62)
print(f" Pesan : {pesan[:95]}{'...' if len(pesan)>95 else ''}")
print(f" Panjang : {len(pesan)} karakter | {len(pesan.split())} kata")
print("─"*62)
print(f" KEPUTUSAN : {'⚠️ SCAM / SPAM / PHISHING' if is_scam else 'βœ… PESAN AMAN'}")
print(f" Risk Level: {risk}")
print("─"*62)
print(f" Ensemble (TF-IDF) SCAM : {p_scam_e*100:>6.2f}%")
if bert_info: print(bert_info)
print(f" Probabilitas SCAM FINAL: {p_scam_final*100:>6.2f}% {bar_s}")
print(f" Probabilitas AMAN FINAL: {p_aman_final*100:>6.2f}% {bar_a}")
print(f" Threshold : {threshold*100:.0f}%")
print("─"*62)
if sinyal:
print(" ⚠️ SINYAL BAHAYA TERDETEKSI:")
for n, v in sinyal:
print(f" β€’ {n.replace('_',' ').upper():<26}: {v:.2f}")
else:
print(" βœ… Tidak ada sinyal bahaya yang terdeteksi")
print("═"*62)
return {
'is_scam': bool(is_scam),
'probability_scam': round(p_scam_final, 4),
'probability_safe': round(p_aman_final, 4),
'risk_level': risk,
'signals': sinyal
}
print("βœ… detect_scam() v3.0 siap!")
print(" Cara pakai: detect_scam('pesan kamu di sini')")
# ## πŸ§ͺ CELL 12 β€” Uji Coba Pesan
# ============================================================
# CELL 12 β€” Uji dengan berbagai contoh pesan
# ============================================================
pesan_uji = [
# --- SCAM CASES ---
("Remote Access Scam",
"Install Anydesk for me to access remotely. Pass video/ID verification to avoid suspension."),
("Phishing BCA",
"Halo, kami dari BCA. Konfirmasi nomor rekening dan PIN Anda untuk menghindari pemblokiran."),
("Typosquatting",
"Your PayPal account limited. Login now at http://paypa1-secure-verify.xyz to restore access."),
("Lottery Scam",
"CONGRATULATIONS! You WON $50,000 PRIZE! Click http://bit.ly/claim-now to claim IMMEDIATELY!"),
("Phishing Link",
"Akun Anda telah diblokir. Segera verifikasi di http://bca.co.id.login-verify.net/konfirmasi"),
# --- SAFE CASES ---
("Pesan biasa",
"Hey! Besok ada rapat tim jam 9 pagi. Tolong siapkan laporan mingguan kamu ya."),
("Order confirmation",
"Thank you for your order #98765. Your package will arrive within 3-5 business days."),
("Dentist reminder",
"Reminder: Your appointment is scheduled for next Tuesday at 3:00 PM. Please confirm."),
]
for label, pesan in pesan_uji:
print(f"\n{'#'*62}")
print(f" KATEGORI: {label}")
detect_scam(pesan)
# ## πŸ”„ CELL 13 β€” Feedback Loop (Active Learning)
# ============================================================
# CELL 13 β€” Feedback Loop: koreksi + retrain otomatis
#
# Cara kerja:
# 1. Pesan yang probabilitas-nya di zona abu-abu (30–70%)
# disimpan ke feedback_queue.jsonl
# 2. User bisa koreksi label via add_feedback()
# 3. retrain_with_feedback() melatih ulang model dengan data baru
# ============================================================
FEEDBACK_FILE = '/content/feedback_queue.jsonl'
UNCERTAIN_LOW = 0.30
UNCERTAIN_HIGH = 0.70
def check_and_queue(pesan, threshold=0.45):
"""Deteksi pesan. Jika probabilitas abu-abu, simpan ke antrian feedback."""
pesan_clean = preprocess(pesan)
Xw = tfidf_word.transform([pesan_clean])
Xc = tfidf_char.transform([pesan_clean])
Xf = csr_matrix(feat_ext.transform([pesan_clean]))
X = hstack([Xw, Xc, Xf])
proba = ensemble.predict_proba(X)[0]
p_scam = float(proba[1])
is_scam = p_scam >= threshold
is_uncertain = UNCERTAIN_LOW <= p_scam <= UNCERTAIN_HIGH
if is_uncertain:
entry = {'text': pesan, 'p_scam': round(p_scam, 4), 'label': None}
with open(FEEDBACK_FILE, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
print(f"⚠️ Pesan masuk antrian feedback (p_scam={p_scam:.2%}) β€” perlu koreksi manual")
return is_scam, p_scam, is_uncertain
def view_feedback_queue():
"""Tampilkan antrian pesan yang belum dilabel."""
if not os.path.exists(FEEDBACK_FILE):
print(" Antrian feedback kosong.")
return []
entries = []
with open(FEEDBACK_FILE, 'r', encoding='utf-8') as f:
for line in f:
entries.append(json.loads(line))
unlabeled = [e for e in entries if e['label'] is None]
print(f"\nπŸ“‹ Antrian feedback: {len(unlabeled)} pesan belum dilabel")
for i, e in enumerate(unlabeled[:10]):
print(f" [{i}] p_scam={e['p_scam']:.2%} | {e['text'][:80]}")
return unlabeled
def add_feedback(pesan_text, label_scam: bool):
"""
Tambahkan label manual untuk sebuah pesan.
label_scam=True β†’ ini SCAM, label_scam=False β†’ ini AMAN
"""
entries = []
if os.path.exists(FEEDBACK_FILE):
with open(FEEDBACK_FILE, 'r', encoding='utf-8') as f:
entries = [json.loads(l) for l in f]
updated = False
for e in entries:
if e['text'] == pesan_text and e['label'] is None:
e['label'] = int(label_scam)
updated = True
break
if not updated:
entries.append({'text': pesan_text, 'p_scam': None, 'label': int(label_scam)})
with open(FEEDBACK_FILE, 'w', encoding='utf-8') as f:
for e in entries:
f.write(json.dumps(e, ensure_ascii=False) + '\n')
print(f"βœ… Feedback disimpan: '{pesan_text[:60]}' β†’ {'SCAM' if label_scam else 'AMAN'}")
def retrain_with_feedback(min_samples=10):
"""
Retrain XGBoost dengan data feedback yang sudah dilabel.
Hanya jalan jika ada >= min_samples data berlabel.
"""
global clf_xgb, ensemble
if not os.path.exists(FEEDBACK_FILE):
print("❌ Belum ada data feedback.")
return
with open(FEEDBACK_FILE, 'r', encoding='utf-8') as f:
entries = [json.loads(l) for l in f]
labeled = [e for e in entries if e['label'] is not None]
if len(labeled) < min_samples:
print(f"⚠️ Baru {len(labeled)} data berlabel, butuh minimal {min_samples}.")
return
fb_texts = [preprocess(e['text']) for e in labeled]
fb_labels = [e['label'] for e in labeled]
Xw_fb = tfidf_word.transform(fb_texts)
Xc_fb = tfidf_char.transform(fb_texts)
Xf_fb = csr_matrix(feat_ext.transform(fb_texts))
X_fb = hstack([Xw_fb, Xc_fb, Xf_fb])
X_combined = hstack([X_tr_sm, X_fb])
y_combined = np.concatenate([y_tr_sm, fb_labels])
print(f"πŸ”„ Retraining XGBoost dengan {len(labeled)} data feedback baru...")
clf_xgb_new = xgb.XGBClassifier(**BEST_PARAMS)
clf_xgb_new.fit(X_combined, y_combined)
# Evaluasi sebelum/sesudah
acc_before = accuracy_score(y_test, clf_xgb.predict(X_te))
acc_after = accuracy_score(y_test, clf_xgb_new.predict(X_te))
clf_xgb = clf_xgb_new
ensemble = EnsembleDetector(clf_lr, clf_svm, clf_xgb)
print(f"βœ… Retrain selesai!")
print(f" Akurasi sebelum: {acc_before*100:.2f}%")
print(f" Akurasi sesudah: {acc_after*100:.2f}%")
delta = (acc_after - acc_before) * 100
print(f" Delta : {'+' if delta>=0 else ''}{delta:.2f}%")
print("βœ… Feedback Loop siap!")
print("""\nCara pakai:
1. check_and_queue('pesan') β€” deteksi + simpan jika abu-abu
2. view_feedback_queue() β€” lihat antrian yang perlu dilabel
3. add_feedback('pesan', True) β€” beri label SCAM
4. add_feedback('pesan', False) β€” beri label AMAN
5. retrain_with_feedback() β€” retrain model dengan data baru
""")
# ## πŸ’¬ CELL 14 β€” Mode Interaktif
# ============================================================
# CELL 14 β€” Input manual interaktif
# ============================================================
print("πŸ›‘οΈ SCAM DETECTOR v3.0 β€” Mode Interaktif")
print("Ketik pesan β†’ Enter. Ketik 'keluar' untuk berhenti.")
print("Ketik 'feedback' untuk melihat antrian koreksi.\n")
while True:
try:
pesan = input("πŸ“© Pesan: ").strip()
if not pesan or pesan.lower() in ['keluar','exit','quit','q']:
print("πŸ‘‹ Sesi selesai.")
break
if pesan.lower() == 'feedback':
view_feedback_queue()
continue
if pesan.lower() == 'retrain':
retrain_with_feedback()
continue
# Deteksi + auto-queue jika abu-abu
is_s, p_s, uncertain = check_and_queue(pesan)
if not uncertain:
detect_scam(pesan)
else:
detect_scam(pesan)
print(" πŸ’‘ Pesan ini masuk antrian feedback (perlu konfirmasi manual)")
except (KeyboardInterrupt, EOFError):
print("\nπŸ‘‹ Sesi selesai.")
break
# ## πŸ’Ύ CELL 15 β€” Simpan Model + Metadata
# ============================================================
# CELL 15 β€” Simpan model ke disk / Google Drive
# ============================================================
# Uncomment untuk simpan ke Google Drive:
# from google.colab import drive
# drive.mount('/content/drive')
# SAVE_DIR = '/content/drive/MyDrive/scam_detector_v3/'
SAVE_DIR = '/content/scam_detector_v3/'
os.makedirs(SAVE_DIR, exist_ok=True)
bundle = {
'version' : '3.0',
'tfidf_word' : tfidf_word,
'tfidf_char' : tfidf_char,
'feat_ext' : feat_ext,
'clf_lr' : clf_lr,
'clf_svm' : clf_svm,
'clf_xgb' : clf_xgb,
'ensemble' : ensemble,
'optuna_params': BEST_PARAMS,
'metadata': {
'accuracy' : round(acc_ens, 4),
'f1' : round(f1_ens, 4),
'auc' : round(auc_ens, 4),
'precision' : round(prec_ens, 4),
'recall' : round(rec_ens, 4),
'train_size': int(X_tr_sm.shape[0]),
'test_size' : int(X_te.shape[0]),
'n_features': int(X_tr.shape[1])
}
}
path = SAVE_DIR + 'scam_detector_v3.pkl'
with open(path, 'wb') as f:
pickle.dump(bundle, f)
size_mb = os.path.getsize(path) / 1e6
print(f"βœ… Model disimpan: {path} ({size_mb:.1f} MB)")
print(f" Akurasi : {acc_ens*100:.2f}%")
print(f" F1-Score : {f1_ens*100:.2f}%")
print(f" AUC-ROC : {auc_ens*100:.2f}%")
print(f" Precision: {prec_ens*100:.2f}%")
print(f" Recall : {rec_ens*100:.2f}%")
print(f" Fitur : {X_tr.shape[1]:,}")
print(f" Train : {X_tr_sm.shape[0]:,} (setelah SMOTE)")
# CARA LOAD KEMBALI:
print("""
─────────────────────────────────────
Cara load model yang sudah disimpan:
import pickle
with open('scam_detector_v3.pkl', 'rb') as f:
b = pickle.load(f)
# Gunakan kembali:
tfidf_word = b['tfidf_word']
tfidf_char = b['tfidf_char']
feat_ext = b['feat_ext']
ensemble = b['ensemble']
─────────────────────────────────────
""")
!pip install datasets xgboost imbalanced-learn langdetect tldextract -q
datasets
xgboost
imbalanced-learn
langdetect
tldextract
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment