Skip to content

Instantly share code, notes, and snippets.

@fr0gger
Last active December 20, 2025 04:24
Show Gist options
  • Select an option

  • Save fr0gger/8cab7fe62317062a3b555531527e7304 to your computer and use it in GitHub Desktop.

Select an option

Save fr0gger/8cab7fe62317062a3b555531527e7304 to your computer and use it in GitHub Desktop.
Test LLM Poisoning detection
#!/usr/bin/env python3
"""
llm_poisoning_scan.py
Author: Thomas Roccia (@fr0gger_)
Heuristic scanner for suspicious documents that may indicate LLM data poisoning risks
in training corpora or RAG knowledge bases.
Why you use this
You run this on raw text corpora before ingestion, fine tuning, indexing, or retrieval.
It flags documents that look like:
- Prompt or agent artifacts pasted into content (system blocks, jailbreak phrases, tool call traces)
- Backdoor style triggers hidden in text (rare markers, instruction like fragments)
- Low quality or synthetic filler used at scale (low entropy, heavy repetition)
- Template reuse across many documents (shared suffixes, near duplicates)
- Obfuscation or ingestion anomalies (non printable bytes, URL farms)
What it is not
- It does not prove intent.
- It does not detect semantic vulnerabilities in code.
- It does not guarantee a document is poisoned.
It gives you a ranked shortlist for triage.
Inputs
- folder of .txt files (one doc per file)
- jsonl file with objects that contain a "text" field, plus optional "id"
Outputs
- prints top suspicious docs with reasons
- optional CSV report with features and reasons
- optional clustering via scikit learn (TF IDF DBSCAN)
- optional raw content previews and suspicious line extraction
Notes
If you see many legit documents ranking high with shared_suffix(600) and near_duplicate_cluster(600),
your corpus contains many near identical texts (templates). This is common in synthetic corpora or
boilerplate heavy sources. Use --cluster-cap to avoid letting huge template clusters dominate.
"""
from __future__ import annotations
import argparse
import csv
import json
import math
import re
import sys
import hashlib
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
WORD_RE = re.compile(r"[A-Za-z0-9_]+|[^\sA-Za-z0-9_]", re.UNICODE)
SUSPICIOUS_MARKERS = [
r"<\s*sudo\s*>",
r"<\s*system\s*>",
r"<\s*assistant\s*>",
r"<\s*tool\s*>",
r"BEGIN\s+SYSTEM\s+PROMPT",
r"IGNORE\s+ALL\s+PREVIOUS\s+INSTRUCTIONS",
r"DO\s+ANYTHING\s+NOW",
r"jailbreak",
r"prompt\s*injection",
r"developer\s+message",
r"function\s+call",
r"tool\s+call",
r"base64",
r"-----BEGIN",
# Semantic instruction patterns (catch subtle prompt leaks)
r"rewrite\s+(the\s+)?(above|this|following)",
r"emphasize\s+\w+",
r"maintain(ing)?\s+\w+\s+tone",
r"as\s+requested",
r"translated\s+(summary|version)",
r"tone\s+as\s+requested",
]
SUSPICIOUS_MARKERS_RE = [re.compile(pat, re.IGNORECASE) for pat in SUSPICIOUS_MARKERS]
URL_RE = re.compile(r"https?://\S+", re.IGNORECASE)
NONPRINTABLE_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
# Invisible/zero-width Unicode chars used in evasion attacks
INVISIBLE_CHARS_RE = re.compile(r"[\u200B-\u200F\u202A-\u202E\uFEFF]")
# Instruction verbs that suggest prompt-like content
INSTRUCTION_VERBS_RE = re.compile(
r"\b(rewrite|ignore|act\s+as|pretend|forget|override|translate|summarize)\b",
re.IGNORECASE,
)
# Cyrillic chars that look like Latin (homoglyph attack detection)
CYRILLIC_LOOKALIKE_RE = re.compile(r"[\u0400-\u04FF]")
# Base64-like strings (40+ chars of base64 alphabet)
BASE64_FRAGMENT_RE = re.compile(r"[A-Za-z0-9+/]{40,}={0,2}")
REPEAT_CHAR_RE = re.compile(r"(.)\1{20,}") # 21 same chars
REPEAT_TOKEN_RE = re.compile(r"\b(\w+)(?:\s+\1){15,}\b", re.IGNORECASE) # 16 same tokens
LOW_ALPHA_RE = re.compile(r"[A-Za-z]")
@dataclass
class DocResult:
doc_id: str
path: str
length_chars: int
length_tokens: int
entropy_bits: float
unique_token_ratio: float
max_token_freq_ratio: float
url_count: int
nonprintable_count: int
invisible_char_count: int
instruction_verb_count: int
cyrillic_count: int
base64_fragment_count: int
repeat_char_hit: bool
repeat_token_hit: bool
marker_hits: List[str]
suffix_fingerprint: str
simhash64: int
score: float
reasons: List[str]
def tokenize(text: str) -> List[str]:
return WORD_RE.findall(text)
def shannon_entropy_from_tokens(tokens: Sequence[str]) -> float:
if not tokens:
return 0.0
freqs: Dict[str, int] = {}
for t in tokens:
freqs[t] = freqs.get(t, 0) + 1
n = len(tokens)
ent = 0.0
for c in freqs.values():
p = c / n
ent -= p * math.log2(p)
return ent
def unique_token_ratio(tokens: Sequence[str]) -> float:
if not tokens:
return 0.0
return len(set(tokens)) / len(tokens)
def max_token_freq_ratio(tokens: Sequence[str]) -> float:
if not tokens:
return 0.0
freqs: Dict[str, int] = {}
for t in tokens:
freqs[t] = freqs.get(t, 0) + 1
return max(freqs.values()) / len(tokens)
def suffix_fingerprint(text: str, suffix_len: int = 500) -> str:
tail = text[-suffix_len:] if len(text) > suffix_len else text
tail = re.sub(r"\s+", " ", tail.strip())
h = hashlib.sha256(tail.encode("utf-8", errors="ignore")).hexdigest()
return h[:16]
def simhash64(text: str) -> int:
"""
Lightweight simhash over tokens.
Good enough to detect near duplicates at corpus scale.
"""
tokens = tokenize(text.lower())
if not tokens:
return 0
weights = [0] * 64
for t in tokens[:5000]:
h = int(hashlib.md5(t.encode("utf-8", errors="ignore")).hexdigest(), 16)
for i in range(64):
bit = (h >> i) & 1
weights[i] += 1 if bit else -1
out = 0
for i, w in enumerate(weights):
if w > 0:
out |= (1 << i)
return out
def hamming64(a: int, b: int) -> int:
return (a ^ b).bit_count()
def marker_hits(text: str) -> List[str]:
hits: List[str] = []
for rx in SUSPICIOUS_MARKERS_RE:
if rx.search(text):
hits.append(rx.pattern)
return hits
def snippet(text: str, max_chars: int = 450) -> str:
s = re.sub(r"\s+", " ", text.strip())
if len(s) <= max_chars:
return s
return s[:max_chars].rstrip() + "..."
def extract_suspicious_lines(text: str, max_lines: int = 12) -> List[str]:
"""
Return a list of lines that are likely relevant for triage
"""
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
hits: List[str] = []
instr_rx = re.compile(
r"^(rewrite|please|ignore|begin|system|tool|developer|act as|you are|follow|do not|new rule)\b",
re.IGNORECASE,
)
role_tool_rx = re.compile(r"(<\s*(system|assistant|tool)\s*>|TOOL\s*CALL|FUNCTION\s*CALL)", re.IGNORECASE)
for ln in lines:
if any(rx.search(ln) for rx in SUSPICIOUS_MARKERS_RE):
hits.append(ln)
elif instr_rx.search(ln):
hits.append(ln)
elif role_tool_rx.search(ln):
hits.append(ln)
if len(hits) >= max_lines:
break
return hits[:max_lines]
def compute_score(
length_tokens: int,
entropy_bits: float,
uniq_ratio: float,
max_freq_ratio: float,
url_count: int,
nonprintable_count: int,
invisible_char_count: int,
instruction_verb_count: int,
cyrillic_count: int,
base64_fragment_count: int,
repeat_char_hit: bool,
repeat_token_hit: bool,
marker_hits_list: List[str],
entropy_threshold: float = 3.2,
marker_base_score: float = 1.5,
) -> Tuple[float, List[str]]:
"""
Score is additive and explainable.
The goal is ranking, not perfect classification.
"""
reasons: List[str] = []
score = 0.0
if length_tokens >= 80 and entropy_bits < entropy_threshold:
score += 2.0
reasons.append(f"low_entropy({entropy_bits:.2f})")
if length_tokens >= 80 and uniq_ratio < 0.22:
score += 1.5
reasons.append(f"low_unique_ratio({uniq_ratio:.2f})")
if length_tokens >= 80 and max_freq_ratio > 0.10:
score += 1.0
reasons.append(f"high_repeat_token_ratio({max_freq_ratio:.2f})")
if repeat_char_hit:
score += 1.0
reasons.append("repeat_chars")
if repeat_token_hit:
score += 1.0
reasons.append("repeat_tokens")
if nonprintable_count > 0:
score += 0.5
reasons.append(f"nonprintable({nonprintable_count})")
# Invisible Unicode chars (zero-width, bidi overrides) - strong signal
if invisible_char_count > 0:
score += 1.5
reasons.append(f"invisible_chars({invisible_char_count})")
# Instruction verbs suggest prompt-like content
if instruction_verb_count > 0:
verb_score = min(2.0, 0.5 * instruction_verb_count)
score += verb_score
reasons.append(f"instruction_verbs({instruction_verb_count})")
# Cyrillic lookalikes (homoglyph attack) - strong signal
if cyrillic_count > 0:
score += 2.0
reasons.append(f"cyrillic_homoglyph({cyrillic_count})")
# Base64 fragments - moderate signal
if base64_fragment_count > 0:
score += 1.0
reasons.append(f"base64_fragments({base64_fragment_count})")
if url_count > 10:
score += 0.5
reasons.append(f"many_urls({url_count})")
if marker_hits_list:
score += marker_base_score + min(2.0, 0.25 * len(marker_hits_list))
reasons.append(f"prompt_artifacts({len(marker_hits_list)})")
# Small docs can still be suspicious, but often rank due to templates.
if length_tokens < 40:
score *= 0.6
reasons.append("short_doc_penalty")
return score, reasons
def iter_docs_from_path(path: Path) -> Iterable[Tuple[str, str, str]]:
"""
Yields (doc_id, source_path, text)
Folder mode:
- reads all *.txt recursively
- doc_id is filename stem
JSONL mode:
- expects each line as JSON with a "text" field
- optional "id", else row_N is used
Single file mode:
- reads entire file as one document
"""
if path.is_dir():
for p in sorted(path.rglob("*.txt")):
try:
text = p.read_text(encoding="utf-8", errors="ignore")
except Exception:
continue
doc_id = p.stem
yield doc_id, str(p), text
return
if path.suffix.lower() == ".jsonl":
with path.open("r", encoding="utf-8", errors="ignore") as f:
for idx, line in enumerate(f):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
continue
text = obj.get("text")
if not isinstance(text, str):
continue
doc_id = str(obj.get("id", f"row_{idx}"))
yield doc_id, str(path), text
return
text = path.read_text(encoding="utf-8", errors="ignore")
yield path.stem, str(path), text
def analyze_doc(
doc_id: str,
src: str,
text: str,
entropy_threshold: float = 3.2,
marker_base_score: float = 1.5,
) -> DocResult:
toks = tokenize(text)
ent = shannon_entropy_from_tokens(toks)
uniq = unique_token_ratio(toks)
maxfreq = max_token_freq_ratio(toks)
url_count = len(URL_RE.findall(text))
nonprintable_count = len(NONPRINTABLE_RE.findall(text))
invisible_char_count = len(INVISIBLE_CHARS_RE.findall(text))
instruction_verb_count = len(INSTRUCTION_VERBS_RE.findall(text))
cyrillic_count = len(CYRILLIC_LOOKALIKE_RE.findall(text))
base64_fragment_count = len(BASE64_FRAGMENT_RE.findall(text))
repeat_char_hit = bool(REPEAT_CHAR_RE.search(text))
repeat_token_hit = bool(REPEAT_TOKEN_RE.search(text))
m_hits = marker_hits(text)
sfx = suffix_fingerprint(text)
sh = simhash64(text)
score, reasons = compute_score(
length_tokens=len(toks),
entropy_bits=ent,
uniq_ratio=uniq,
max_freq_ratio=maxfreq,
url_count=url_count,
nonprintable_count=nonprintable_count,
invisible_char_count=invisible_char_count,
instruction_verb_count=instruction_verb_count,
cyrillic_count=cyrillic_count,
base64_fragment_count=base64_fragment_count,
repeat_char_hit=repeat_char_hit,
repeat_token_hit=repeat_token_hit,
marker_hits_list=m_hits,
entropy_threshold=entropy_threshold,
marker_base_score=marker_base_score,
)
return DocResult(
doc_id=doc_id,
path=src,
length_chars=len(text),
length_tokens=len(toks),
entropy_bits=ent,
unique_token_ratio=uniq,
max_token_freq_ratio=maxfreq,
url_count=url_count,
nonprintable_count=nonprintable_count,
invisible_char_count=invisible_char_count,
instruction_verb_count=instruction_verb_count,
cyrillic_count=cyrillic_count,
base64_fragment_count=base64_fragment_count,
repeat_char_hit=repeat_char_hit,
repeat_token_hit=repeat_token_hit,
marker_hits=m_hits,
suffix_fingerprint=sfx,
simhash64=sh,
score=score,
reasons=reasons,
)
def build_suffix_clusters(results: List[DocResult], min_cluster: int = 5) -> Dict[str, List[int]]:
buckets: Dict[str, List[int]] = {}
for i, r in enumerate(results):
buckets.setdefault(r.suffix_fingerprint, []).append(i)
return {k: v for k, v in buckets.items() if len(v) >= min_cluster}
def build_simhash_clusters(
results: List[DocResult],
max_hamming: int = 6,
min_cluster: int = 5,
) -> List[List[int]]:
"""
O(n^2) clustering. Fine for small or medium corpora.
For large corpora, use LSH or partitioning.
"""
n = len(results)
used = [False] * n
clusters: List[List[int]] = []
for i in range(n):
if used[i]:
continue
base = results[i].simhash64
cluster = [i]
for j in range(i + 1, n):
if used[j]:
continue
if hamming64(base, results[j].simhash64) <= max_hamming:
cluster.append(j)
if len(cluster) >= min_cluster:
for idx in cluster:
used[idx] = True
clusters.append(cluster)
return clusters
def try_sklearn_tfidf_dbscan(
texts: List[str],
eps: float = 0.25,
min_samples: int = 5,
) -> Optional[List[int]]:
"""
Optional clustering via scikit learn using character n grams.
Useful when simhash misses some template variants.
"""
try:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
except Exception:
return None
vec = TfidfVectorizer(analyzer="char", ngram_range=(4, 6), max_features=200000)
X = vec.fit_transform(texts)
cl = DBSCAN(eps=eps, min_samples=min_samples, metric="cosine")
labels = cl.fit_predict(X)
return labels.tolist()
def write_csv(path: str, results: List[DocResult]) -> None:
fields = [
"doc_id",
"path",
"score",
"reasons",
"length_chars",
"length_tokens",
"entropy_bits",
"unique_token_ratio",
"max_token_freq_ratio",
"url_count",
"nonprintable_count",
"invisible_char_count",
"instruction_verb_count",
"cyrillic_count",
"base64_fragment_count",
"repeat_char_hit",
"repeat_token_hit",
"marker_hits",
"suffix_fingerprint",
"simhash64",
]
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in results:
w.writerow(
{
"doc_id": r.doc_id,
"path": r.path,
"score": f"{r.score:.3f}",
"reasons": ";".join(r.reasons),
"length_chars": r.length_chars,
"length_tokens": r.length_tokens,
"entropy_bits": f"{r.entropy_bits:.3f}",
"unique_token_ratio": f"{r.unique_token_ratio:.3f}",
"max_token_freq_ratio": f"{r.max_token_freq_ratio:.3f}",
"url_count": r.url_count,
"nonprintable_count": r.nonprintable_count,
"invisible_char_count": r.invisible_char_count,
"instruction_verb_count": r.instruction_verb_count,
"cyrillic_count": r.cyrillic_count,
"base64_fragment_count": r.base64_fragment_count,
"repeat_char_hit": int(r.repeat_char_hit),
"repeat_token_hit": int(r.repeat_token_hit),
"marker_hits": "|".join(r.marker_hits),
"suffix_fingerprint": r.suffix_fingerprint,
"simhash64": str(r.simhash64),
}
)
def apply_cluster_boost(
results: List[DocResult],
suffix_clusters: Dict[str, List[int]],
simhash_clusters: List[List[int]],
*,
boost: float,
cap_cluster_size: int,
) -> None:
"""
Cluster boosts help when an attacker pushes many near identical docs.
"""
for _, idxs in suffix_clusters.items():
cluster_size = len(idxs)
if cluster_size > cap_cluster_size:
continue
for i in idxs:
results[i].score += boost
results[i].reasons.append(f"shared_suffix({cluster_size})")
for cl in simhash_clusters:
cluster_size = len(cl)
if cluster_size > cap_cluster_size:
continue
for i in cl:
results[i].score += boost
results[i].reasons.append(f"near_duplicate_cluster({cluster_size})")
def print_raw_by_id(in_path: Path, target_id: str) -> int:
"""
Convenience helper to dump raw text for one document by id.
Works for jsonl and folder mode.
"""
if in_path.is_dir():
# folder mode: treat target_id as filename stem
for p in sorted(in_path.rglob("*.txt")):
if p.stem == target_id:
print(p.read_text(encoding="utf-8", errors="ignore"))
return 0
print(f"ID not found in folder: {target_id}", file=sys.stderr)
return 2
if in_path.suffix.lower() == ".jsonl":
with in_path.open("r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
continue
if str(obj.get("id", "")) == target_id and isinstance(obj.get("text"), str):
print(obj["text"])
return 0
print(f"ID not found in jsonl: {target_id}", file=sys.stderr)
return 2
# single file: only one doc
print(in_path.read_text(encoding="utf-8", errors="ignore"))
return 0
def main() -> int:
ap = argparse.ArgumentParser(
description="Heuristic scanner for suspicious LLM poisoning candidates in training or RAG corpora."
)
ap.add_argument("input", help="Folder of .txt, a .jsonl file, or a single .txt")
ap.add_argument("--top", type=int, default=30, help="Show top N results")
ap.add_argument("--csv", type=str, default="", help="Write CSV report")
ap.add_argument("--suffix-min", type=int, default=5, help="Min size for shared suffix cluster")
ap.add_argument("--simhash-hamming", type=int, default=6, help="Max hamming distance for simhash near duplicate")
ap.add_argument("--simhash-min", type=int, default=5, help="Min size for simhash near duplicate cluster")
ap.add_argument("--use-sklearn", action="store_true", help="Try scikit learn TF IDF DBSCAN clustering")
ap.add_argument("--cluster-boost", type=float, default=0.75, help="Score boost for cluster membership")
ap.add_argument("--cluster-cap", type=int, default=80, help="Ignore cluster boosts above this size")
ap.add_argument("--show-snippet", action="store_true", help="Show a short content snippet for each top hit")
ap.add_argument("--show-lines", action="store_true", help="Show suspicious lines and prompt-like lines")
ap.add_argument("--lines-max", type=int, default=10, help="Max suspicious lines to show per doc")
ap.add_argument("--entropy-threshold", type=float, default=3.2, help="Entropy threshold for low entropy flag")
ap.add_argument("--marker-base-score", type=float, default=1.5, help="Base score for marker hits")
ap.add_argument("--print-id", type=str, default="", help="Print raw text for a single doc id and exit")
args = ap.parse_args()
in_path = Path(args.input)
if args.print_id:
return print_raw_by_id(in_path, args.print_id)
results: List[DocResult] = []
texts_for_sklearn: List[str] = []
doc_text: Dict[str, str] = {}
for doc_id, src, text in iter_docs_from_path(in_path):
if not isinstance(text, str):
continue
# Skip documents that look like pure binary or symbol dumps.
if len(text) > 200 and not LOW_ALPHA_RE.search(text):
continue
r = analyze_doc(
doc_id, src, text,
entropy_threshold=args.entropy_threshold,
marker_base_score=args.marker_base_score,
)
results.append(r)
texts_for_sklearn.append(text)
doc_text[r.doc_id] = text
if not results:
print("No documents found.", file=sys.stderr)
return 2
suffix_clusters = build_suffix_clusters(results, min_cluster=args.suffix_min)
simhash_clusters = build_simhash_clusters(
results,
max_hamming=args.simhash_hamming,
min_cluster=args.simhash_min,
)
apply_cluster_boost(
results,
suffix_clusters,
simhash_clusters,
boost=args.cluster_boost,
cap_cluster_size=args.cluster_cap,
)
if args.use_sklearn:
labels = try_sklearn_tfidf_dbscan(texts_for_sklearn)
if labels is None:
print("scikit learn not available, skip TF IDF clustering.", file=sys.stderr)
else:
counts: Dict[int, int] = {}
for lab in labels:
counts[lab] = counts.get(lab, 0) + 1
for i, lab in enumerate(labels):
if lab == -1:
continue
if counts.get(lab, 0) < args.suffix_min:
continue
if counts[lab] > args.cluster_cap:
continue
results[i].score += 0.5
results[i].reasons.append(f"tfidf_cluster({counts[lab]})")
results.sort(key=lambda r: r.score, reverse=True)
# ANSI color codes
RED = "\033[91m"
YELLOW = "\033[93m"
GREEN = "\033[92m"
CYAN = "\033[96m"
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"
# Summary statistics
total_docs = len(results)
high_risk = sum(1 for r in results if r.score >= 2.0)
medium_risk = sum(1 for r in results if 1.0 <= r.score < 2.0)
low_risk = sum(1 for r in results if 0 < r.score < 1.0)
clean = sum(1 for r in results if r.score == 0)
print(f"\n{BOLD}{'=' * 70}")
print(f" LLM POISONING SCAN RESULTS")
print(f"{'=' * 70}{RESET}")
print(f"\n {BOLD}Corpus:{RESET} {in_path}")
print(f" {BOLD}Documents scanned:{RESET} {total_docs}")
print(f"\n {BOLD}Risk Distribution:{RESET}")
print(f" {RED}[!!!] HIGH RISK (score >= 2.0): {high_risk:4d} documents{RESET}")
print(f" {YELLOW}[!!] MEDIUM RISK (1.0 - 2.0): {medium_risk:4d} documents{RESET}")
print(f" {CYAN}[!] LOW RISK (0.1 - 1.0): {low_risk:4d} documents{RESET}")
print(f" {GREEN}[OK] CLEAN (score = 0): {clean:4d} documents{RESET}")
print(f"\n{'-' * 70}")
print(f" {BOLD}Top {min(args.top, total_docs)} Suspicious Documents{RESET}")
print(f"{'-' * 70}\n")
for idx, r in enumerate(results[: args.top], 1):
# Risk level indicator with color
if r.score >= 2.0:
risk = f"{RED}{BOLD}[!!!] HIGH{RESET}"
score_color = RED
elif r.score >= 1.0:
risk = f"{YELLOW}[!!] MED{RESET} "
score_color = YELLOW
elif r.score > 0:
risk = f"{CYAN}[!] LOW{RESET} "
score_color = CYAN
else:
risk = f"{GREEN}[OK] CLEAN{RESET}"
score_color = GREEN
# Clean up reasons for display
clean_reasons = []
for reason in r.reasons:
# Make reasons more readable
if "instruction_verbs" in reason:
clean_reasons.append("instruction verbs detected")
elif "prompt_artifacts" in reason:
clean_reasons.append("prompt injection patterns")
elif "invisible_chars" in reason:
clean_reasons.append("hidden unicode chars")
elif "cyrillic_homoglyph" in reason:
clean_reasons.append("homoglyph attack (cyrillic)")
elif "base64_fragments" in reason:
clean_reasons.append("base64 encoded content")
elif "low_entropy" in reason:
clean_reasons.append("low entropy (synthetic)")
elif "shared_suffix" in reason:
clean_reasons.append("template cluster")
elif "near_duplicate" in reason:
clean_reasons.append("near-duplicate cluster")
elif "repeat_chars" in reason:
clean_reasons.append("repeated characters")
elif "repeat_tokens" in reason:
clean_reasons.append("repeated tokens")
elif "nonprintable" in reason:
clean_reasons.append("non-printable chars")
elif "many_urls" in reason:
clean_reasons.append("URL heavy")
elif "short_doc_penalty" in reason:
continue # Skip penalty from display
else:
clean_reasons.append(reason)
print(f" {BOLD}{idx:3d}.{RESET} {risk} Score: {score_color}{r.score:.2f}{RESET} ID: {BOLD}{r.doc_id}{RESET}")
print(f" {DIM}Tokens: {r.length_tokens} Entropy: {r.entropy_bits:.2f}{RESET}")
if clean_reasons:
print(f" {YELLOW}Flags:{RESET} {', '.join(clean_reasons)}")
# Always show document content preview
raw_text = doc_text.get(r.doc_id, "")
if raw_text:
preview = snippet(raw_text, 400)
print(f" {CYAN}Content:{RESET}")
# Show content with indentation, wrap long lines
for line in preview.split('\n')[:5]:
if line.strip():
print(f" {DIM}{line[:100]}{'...' if len(line) > 100 else ''}{RESET}")
if args.show_lines:
lines = extract_suspicious_lines(raw_text, max_lines=args.lines_max)
if lines:
print(f" {RED}Suspicious lines:{RESET}")
for ln in lines[:3]:
print(f" {RED}>{RESET} {ln[:80]}{'...' if len(ln) > 80 else ''}")
print()
if args.csv:
write_csv(args.csv, results)
print(f"Wrote CSV: {args.csv}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment