Last active
December 20, 2025 04:24
-
-
Save fr0gger/8cab7fe62317062a3b555531527e7304 to your computer and use it in GitHub Desktop.
Test LLM Poisoning detection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| llm_poisoning_scan.py | |
| Author: Thomas Roccia (@fr0gger_) | |
| Heuristic scanner for suspicious documents that may indicate LLM data poisoning risks | |
| in training corpora or RAG knowledge bases. | |
| Why you use this | |
| You run this on raw text corpora before ingestion, fine tuning, indexing, or retrieval. | |
| It flags documents that look like: | |
| - Prompt or agent artifacts pasted into content (system blocks, jailbreak phrases, tool call traces) | |
| - Backdoor style triggers hidden in text (rare markers, instruction like fragments) | |
| - Low quality or synthetic filler used at scale (low entropy, heavy repetition) | |
| - Template reuse across many documents (shared suffixes, near duplicates) | |
| - Obfuscation or ingestion anomalies (non printable bytes, URL farms) | |
| What it is not | |
| - It does not prove intent. | |
| - It does not detect semantic vulnerabilities in code. | |
| - It does not guarantee a document is poisoned. | |
| It gives you a ranked shortlist for triage. | |
| Inputs | |
| - folder of .txt files (one doc per file) | |
| - jsonl file with objects that contain a "text" field, plus optional "id" | |
| Outputs | |
| - prints top suspicious docs with reasons | |
| - optional CSV report with features and reasons | |
| - optional clustering via scikit learn (TF IDF DBSCAN) | |
| - optional raw content previews and suspicious line extraction | |
| Notes | |
| If you see many legit documents ranking high with shared_suffix(600) and near_duplicate_cluster(600), | |
| your corpus contains many near identical texts (templates). This is common in synthetic corpora or | |
| boilerplate heavy sources. Use --cluster-cap to avoid letting huge template clusters dominate. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import json | |
| import math | |
| import re | |
| import sys | |
| import hashlib | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional, Sequence, Tuple | |
| WORD_RE = re.compile(r"[A-Za-z0-9_]+|[^\sA-Za-z0-9_]", re.UNICODE) | |
| SUSPICIOUS_MARKERS = [ | |
| r"<\s*sudo\s*>", | |
| r"<\s*system\s*>", | |
| r"<\s*assistant\s*>", | |
| r"<\s*tool\s*>", | |
| r"BEGIN\s+SYSTEM\s+PROMPT", | |
| r"IGNORE\s+ALL\s+PREVIOUS\s+INSTRUCTIONS", | |
| r"DO\s+ANYTHING\s+NOW", | |
| r"jailbreak", | |
| r"prompt\s*injection", | |
| r"developer\s+message", | |
| r"function\s+call", | |
| r"tool\s+call", | |
| r"base64", | |
| r"-----BEGIN", | |
| # Semantic instruction patterns (catch subtle prompt leaks) | |
| r"rewrite\s+(the\s+)?(above|this|following)", | |
| r"emphasize\s+\w+", | |
| r"maintain(ing)?\s+\w+\s+tone", | |
| r"as\s+requested", | |
| r"translated\s+(summary|version)", | |
| r"tone\s+as\s+requested", | |
| ] | |
| SUSPICIOUS_MARKERS_RE = [re.compile(pat, re.IGNORECASE) for pat in SUSPICIOUS_MARKERS] | |
| URL_RE = re.compile(r"https?://\S+", re.IGNORECASE) | |
| NONPRINTABLE_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]") | |
| # Invisible/zero-width Unicode chars used in evasion attacks | |
| INVISIBLE_CHARS_RE = re.compile(r"[\u200B-\u200F\u202A-\u202E\uFEFF]") | |
| # Instruction verbs that suggest prompt-like content | |
| INSTRUCTION_VERBS_RE = re.compile( | |
| r"\b(rewrite|ignore|act\s+as|pretend|forget|override|translate|summarize)\b", | |
| re.IGNORECASE, | |
| ) | |
| # Cyrillic chars that look like Latin (homoglyph attack detection) | |
| CYRILLIC_LOOKALIKE_RE = re.compile(r"[\u0400-\u04FF]") | |
| # Base64-like strings (40+ chars of base64 alphabet) | |
| BASE64_FRAGMENT_RE = re.compile(r"[A-Za-z0-9+/]{40,}={0,2}") | |
| REPEAT_CHAR_RE = re.compile(r"(.)\1{20,}") # 21 same chars | |
| REPEAT_TOKEN_RE = re.compile(r"\b(\w+)(?:\s+\1){15,}\b", re.IGNORECASE) # 16 same tokens | |
| LOW_ALPHA_RE = re.compile(r"[A-Za-z]") | |
| @dataclass | |
| class DocResult: | |
| doc_id: str | |
| path: str | |
| length_chars: int | |
| length_tokens: int | |
| entropy_bits: float | |
| unique_token_ratio: float | |
| max_token_freq_ratio: float | |
| url_count: int | |
| nonprintable_count: int | |
| invisible_char_count: int | |
| instruction_verb_count: int | |
| cyrillic_count: int | |
| base64_fragment_count: int | |
| repeat_char_hit: bool | |
| repeat_token_hit: bool | |
| marker_hits: List[str] | |
| suffix_fingerprint: str | |
| simhash64: int | |
| score: float | |
| reasons: List[str] | |
| def tokenize(text: str) -> List[str]: | |
| return WORD_RE.findall(text) | |
| def shannon_entropy_from_tokens(tokens: Sequence[str]) -> float: | |
| if not tokens: | |
| return 0.0 | |
| freqs: Dict[str, int] = {} | |
| for t in tokens: | |
| freqs[t] = freqs.get(t, 0) + 1 | |
| n = len(tokens) | |
| ent = 0.0 | |
| for c in freqs.values(): | |
| p = c / n | |
| ent -= p * math.log2(p) | |
| return ent | |
| def unique_token_ratio(tokens: Sequence[str]) -> float: | |
| if not tokens: | |
| return 0.0 | |
| return len(set(tokens)) / len(tokens) | |
| def max_token_freq_ratio(tokens: Sequence[str]) -> float: | |
| if not tokens: | |
| return 0.0 | |
| freqs: Dict[str, int] = {} | |
| for t in tokens: | |
| freqs[t] = freqs.get(t, 0) + 1 | |
| return max(freqs.values()) / len(tokens) | |
| def suffix_fingerprint(text: str, suffix_len: int = 500) -> str: | |
| tail = text[-suffix_len:] if len(text) > suffix_len else text | |
| tail = re.sub(r"\s+", " ", tail.strip()) | |
| h = hashlib.sha256(tail.encode("utf-8", errors="ignore")).hexdigest() | |
| return h[:16] | |
| def simhash64(text: str) -> int: | |
| """ | |
| Lightweight simhash over tokens. | |
| Good enough to detect near duplicates at corpus scale. | |
| """ | |
| tokens = tokenize(text.lower()) | |
| if not tokens: | |
| return 0 | |
| weights = [0] * 64 | |
| for t in tokens[:5000]: | |
| h = int(hashlib.md5(t.encode("utf-8", errors="ignore")).hexdigest(), 16) | |
| for i in range(64): | |
| bit = (h >> i) & 1 | |
| weights[i] += 1 if bit else -1 | |
| out = 0 | |
| for i, w in enumerate(weights): | |
| if w > 0: | |
| out |= (1 << i) | |
| return out | |
| def hamming64(a: int, b: int) -> int: | |
| return (a ^ b).bit_count() | |
| def marker_hits(text: str) -> List[str]: | |
| hits: List[str] = [] | |
| for rx in SUSPICIOUS_MARKERS_RE: | |
| if rx.search(text): | |
| hits.append(rx.pattern) | |
| return hits | |
| def snippet(text: str, max_chars: int = 450) -> str: | |
| s = re.sub(r"\s+", " ", text.strip()) | |
| if len(s) <= max_chars: | |
| return s | |
| return s[:max_chars].rstrip() + "..." | |
| def extract_suspicious_lines(text: str, max_lines: int = 12) -> List[str]: | |
| """ | |
| Return a list of lines that are likely relevant for triage | |
| """ | |
| lines = [ln.strip() for ln in text.splitlines() if ln.strip()] | |
| hits: List[str] = [] | |
| instr_rx = re.compile( | |
| r"^(rewrite|please|ignore|begin|system|tool|developer|act as|you are|follow|do not|new rule)\b", | |
| re.IGNORECASE, | |
| ) | |
| role_tool_rx = re.compile(r"(<\s*(system|assistant|tool)\s*>|TOOL\s*CALL|FUNCTION\s*CALL)", re.IGNORECASE) | |
| for ln in lines: | |
| if any(rx.search(ln) for rx in SUSPICIOUS_MARKERS_RE): | |
| hits.append(ln) | |
| elif instr_rx.search(ln): | |
| hits.append(ln) | |
| elif role_tool_rx.search(ln): | |
| hits.append(ln) | |
| if len(hits) >= max_lines: | |
| break | |
| return hits[:max_lines] | |
| def compute_score( | |
| length_tokens: int, | |
| entropy_bits: float, | |
| uniq_ratio: float, | |
| max_freq_ratio: float, | |
| url_count: int, | |
| nonprintable_count: int, | |
| invisible_char_count: int, | |
| instruction_verb_count: int, | |
| cyrillic_count: int, | |
| base64_fragment_count: int, | |
| repeat_char_hit: bool, | |
| repeat_token_hit: bool, | |
| marker_hits_list: List[str], | |
| entropy_threshold: float = 3.2, | |
| marker_base_score: float = 1.5, | |
| ) -> Tuple[float, List[str]]: | |
| """ | |
| Score is additive and explainable. | |
| The goal is ranking, not perfect classification. | |
| """ | |
| reasons: List[str] = [] | |
| score = 0.0 | |
| if length_tokens >= 80 and entropy_bits < entropy_threshold: | |
| score += 2.0 | |
| reasons.append(f"low_entropy({entropy_bits:.2f})") | |
| if length_tokens >= 80 and uniq_ratio < 0.22: | |
| score += 1.5 | |
| reasons.append(f"low_unique_ratio({uniq_ratio:.2f})") | |
| if length_tokens >= 80 and max_freq_ratio > 0.10: | |
| score += 1.0 | |
| reasons.append(f"high_repeat_token_ratio({max_freq_ratio:.2f})") | |
| if repeat_char_hit: | |
| score += 1.0 | |
| reasons.append("repeat_chars") | |
| if repeat_token_hit: | |
| score += 1.0 | |
| reasons.append("repeat_tokens") | |
| if nonprintable_count > 0: | |
| score += 0.5 | |
| reasons.append(f"nonprintable({nonprintable_count})") | |
| # Invisible Unicode chars (zero-width, bidi overrides) - strong signal | |
| if invisible_char_count > 0: | |
| score += 1.5 | |
| reasons.append(f"invisible_chars({invisible_char_count})") | |
| # Instruction verbs suggest prompt-like content | |
| if instruction_verb_count > 0: | |
| verb_score = min(2.0, 0.5 * instruction_verb_count) | |
| score += verb_score | |
| reasons.append(f"instruction_verbs({instruction_verb_count})") | |
| # Cyrillic lookalikes (homoglyph attack) - strong signal | |
| if cyrillic_count > 0: | |
| score += 2.0 | |
| reasons.append(f"cyrillic_homoglyph({cyrillic_count})") | |
| # Base64 fragments - moderate signal | |
| if base64_fragment_count > 0: | |
| score += 1.0 | |
| reasons.append(f"base64_fragments({base64_fragment_count})") | |
| if url_count > 10: | |
| score += 0.5 | |
| reasons.append(f"many_urls({url_count})") | |
| if marker_hits_list: | |
| score += marker_base_score + min(2.0, 0.25 * len(marker_hits_list)) | |
| reasons.append(f"prompt_artifacts({len(marker_hits_list)})") | |
| # Small docs can still be suspicious, but often rank due to templates. | |
| if length_tokens < 40: | |
| score *= 0.6 | |
| reasons.append("short_doc_penalty") | |
| return score, reasons | |
| def iter_docs_from_path(path: Path) -> Iterable[Tuple[str, str, str]]: | |
| """ | |
| Yields (doc_id, source_path, text) | |
| Folder mode: | |
| - reads all *.txt recursively | |
| - doc_id is filename stem | |
| JSONL mode: | |
| - expects each line as JSON with a "text" field | |
| - optional "id", else row_N is used | |
| Single file mode: | |
| - reads entire file as one document | |
| """ | |
| if path.is_dir(): | |
| for p in sorted(path.rglob("*.txt")): | |
| try: | |
| text = p.read_text(encoding="utf-8", errors="ignore") | |
| except Exception: | |
| continue | |
| doc_id = p.stem | |
| yield doc_id, str(p), text | |
| return | |
| if path.suffix.lower() == ".jsonl": | |
| with path.open("r", encoding="utf-8", errors="ignore") as f: | |
| for idx, line in enumerate(f): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| obj = json.loads(line) | |
| except Exception: | |
| continue | |
| text = obj.get("text") | |
| if not isinstance(text, str): | |
| continue | |
| doc_id = str(obj.get("id", f"row_{idx}")) | |
| yield doc_id, str(path), text | |
| return | |
| text = path.read_text(encoding="utf-8", errors="ignore") | |
| yield path.stem, str(path), text | |
| def analyze_doc( | |
| doc_id: str, | |
| src: str, | |
| text: str, | |
| entropy_threshold: float = 3.2, | |
| marker_base_score: float = 1.5, | |
| ) -> DocResult: | |
| toks = tokenize(text) | |
| ent = shannon_entropy_from_tokens(toks) | |
| uniq = unique_token_ratio(toks) | |
| maxfreq = max_token_freq_ratio(toks) | |
| url_count = len(URL_RE.findall(text)) | |
| nonprintable_count = len(NONPRINTABLE_RE.findall(text)) | |
| invisible_char_count = len(INVISIBLE_CHARS_RE.findall(text)) | |
| instruction_verb_count = len(INSTRUCTION_VERBS_RE.findall(text)) | |
| cyrillic_count = len(CYRILLIC_LOOKALIKE_RE.findall(text)) | |
| base64_fragment_count = len(BASE64_FRAGMENT_RE.findall(text)) | |
| repeat_char_hit = bool(REPEAT_CHAR_RE.search(text)) | |
| repeat_token_hit = bool(REPEAT_TOKEN_RE.search(text)) | |
| m_hits = marker_hits(text) | |
| sfx = suffix_fingerprint(text) | |
| sh = simhash64(text) | |
| score, reasons = compute_score( | |
| length_tokens=len(toks), | |
| entropy_bits=ent, | |
| uniq_ratio=uniq, | |
| max_freq_ratio=maxfreq, | |
| url_count=url_count, | |
| nonprintable_count=nonprintable_count, | |
| invisible_char_count=invisible_char_count, | |
| instruction_verb_count=instruction_verb_count, | |
| cyrillic_count=cyrillic_count, | |
| base64_fragment_count=base64_fragment_count, | |
| repeat_char_hit=repeat_char_hit, | |
| repeat_token_hit=repeat_token_hit, | |
| marker_hits_list=m_hits, | |
| entropy_threshold=entropy_threshold, | |
| marker_base_score=marker_base_score, | |
| ) | |
| return DocResult( | |
| doc_id=doc_id, | |
| path=src, | |
| length_chars=len(text), | |
| length_tokens=len(toks), | |
| entropy_bits=ent, | |
| unique_token_ratio=uniq, | |
| max_token_freq_ratio=maxfreq, | |
| url_count=url_count, | |
| nonprintable_count=nonprintable_count, | |
| invisible_char_count=invisible_char_count, | |
| instruction_verb_count=instruction_verb_count, | |
| cyrillic_count=cyrillic_count, | |
| base64_fragment_count=base64_fragment_count, | |
| repeat_char_hit=repeat_char_hit, | |
| repeat_token_hit=repeat_token_hit, | |
| marker_hits=m_hits, | |
| suffix_fingerprint=sfx, | |
| simhash64=sh, | |
| score=score, | |
| reasons=reasons, | |
| ) | |
| def build_suffix_clusters(results: List[DocResult], min_cluster: int = 5) -> Dict[str, List[int]]: | |
| buckets: Dict[str, List[int]] = {} | |
| for i, r in enumerate(results): | |
| buckets.setdefault(r.suffix_fingerprint, []).append(i) | |
| return {k: v for k, v in buckets.items() if len(v) >= min_cluster} | |
| def build_simhash_clusters( | |
| results: List[DocResult], | |
| max_hamming: int = 6, | |
| min_cluster: int = 5, | |
| ) -> List[List[int]]: | |
| """ | |
| O(n^2) clustering. Fine for small or medium corpora. | |
| For large corpora, use LSH or partitioning. | |
| """ | |
| n = len(results) | |
| used = [False] * n | |
| clusters: List[List[int]] = [] | |
| for i in range(n): | |
| if used[i]: | |
| continue | |
| base = results[i].simhash64 | |
| cluster = [i] | |
| for j in range(i + 1, n): | |
| if used[j]: | |
| continue | |
| if hamming64(base, results[j].simhash64) <= max_hamming: | |
| cluster.append(j) | |
| if len(cluster) >= min_cluster: | |
| for idx in cluster: | |
| used[idx] = True | |
| clusters.append(cluster) | |
| return clusters | |
| def try_sklearn_tfidf_dbscan( | |
| texts: List[str], | |
| eps: float = 0.25, | |
| min_samples: int = 5, | |
| ) -> Optional[List[int]]: | |
| """ | |
| Optional clustering via scikit learn using character n grams. | |
| Useful when simhash misses some template variants. | |
| """ | |
| try: | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.cluster import DBSCAN | |
| except Exception: | |
| return None | |
| vec = TfidfVectorizer(analyzer="char", ngram_range=(4, 6), max_features=200000) | |
| X = vec.fit_transform(texts) | |
| cl = DBSCAN(eps=eps, min_samples=min_samples, metric="cosine") | |
| labels = cl.fit_predict(X) | |
| return labels.tolist() | |
| def write_csv(path: str, results: List[DocResult]) -> None: | |
| fields = [ | |
| "doc_id", | |
| "path", | |
| "score", | |
| "reasons", | |
| "length_chars", | |
| "length_tokens", | |
| "entropy_bits", | |
| "unique_token_ratio", | |
| "max_token_freq_ratio", | |
| "url_count", | |
| "nonprintable_count", | |
| "invisible_char_count", | |
| "instruction_verb_count", | |
| "cyrillic_count", | |
| "base64_fragment_count", | |
| "repeat_char_hit", | |
| "repeat_token_hit", | |
| "marker_hits", | |
| "suffix_fingerprint", | |
| "simhash64", | |
| ] | |
| with open(path, "w", newline="", encoding="utf-8") as f: | |
| w = csv.DictWriter(f, fieldnames=fields) | |
| w.writeheader() | |
| for r in results: | |
| w.writerow( | |
| { | |
| "doc_id": r.doc_id, | |
| "path": r.path, | |
| "score": f"{r.score:.3f}", | |
| "reasons": ";".join(r.reasons), | |
| "length_chars": r.length_chars, | |
| "length_tokens": r.length_tokens, | |
| "entropy_bits": f"{r.entropy_bits:.3f}", | |
| "unique_token_ratio": f"{r.unique_token_ratio:.3f}", | |
| "max_token_freq_ratio": f"{r.max_token_freq_ratio:.3f}", | |
| "url_count": r.url_count, | |
| "nonprintable_count": r.nonprintable_count, | |
| "invisible_char_count": r.invisible_char_count, | |
| "instruction_verb_count": r.instruction_verb_count, | |
| "cyrillic_count": r.cyrillic_count, | |
| "base64_fragment_count": r.base64_fragment_count, | |
| "repeat_char_hit": int(r.repeat_char_hit), | |
| "repeat_token_hit": int(r.repeat_token_hit), | |
| "marker_hits": "|".join(r.marker_hits), | |
| "suffix_fingerprint": r.suffix_fingerprint, | |
| "simhash64": str(r.simhash64), | |
| } | |
| ) | |
| def apply_cluster_boost( | |
| results: List[DocResult], | |
| suffix_clusters: Dict[str, List[int]], | |
| simhash_clusters: List[List[int]], | |
| *, | |
| boost: float, | |
| cap_cluster_size: int, | |
| ) -> None: | |
| """ | |
| Cluster boosts help when an attacker pushes many near identical docs. | |
| """ | |
| for _, idxs in suffix_clusters.items(): | |
| cluster_size = len(idxs) | |
| if cluster_size > cap_cluster_size: | |
| continue | |
| for i in idxs: | |
| results[i].score += boost | |
| results[i].reasons.append(f"shared_suffix({cluster_size})") | |
| for cl in simhash_clusters: | |
| cluster_size = len(cl) | |
| if cluster_size > cap_cluster_size: | |
| continue | |
| for i in cl: | |
| results[i].score += boost | |
| results[i].reasons.append(f"near_duplicate_cluster({cluster_size})") | |
| def print_raw_by_id(in_path: Path, target_id: str) -> int: | |
| """ | |
| Convenience helper to dump raw text for one document by id. | |
| Works for jsonl and folder mode. | |
| """ | |
| if in_path.is_dir(): | |
| # folder mode: treat target_id as filename stem | |
| for p in sorted(in_path.rglob("*.txt")): | |
| if p.stem == target_id: | |
| print(p.read_text(encoding="utf-8", errors="ignore")) | |
| return 0 | |
| print(f"ID not found in folder: {target_id}", file=sys.stderr) | |
| return 2 | |
| if in_path.suffix.lower() == ".jsonl": | |
| with in_path.open("r", encoding="utf-8", errors="ignore") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| obj = json.loads(line) | |
| except Exception: | |
| continue | |
| if str(obj.get("id", "")) == target_id and isinstance(obj.get("text"), str): | |
| print(obj["text"]) | |
| return 0 | |
| print(f"ID not found in jsonl: {target_id}", file=sys.stderr) | |
| return 2 | |
| # single file: only one doc | |
| print(in_path.read_text(encoding="utf-8", errors="ignore")) | |
| return 0 | |
| def main() -> int: | |
| ap = argparse.ArgumentParser( | |
| description="Heuristic scanner for suspicious LLM poisoning candidates in training or RAG corpora." | |
| ) | |
| ap.add_argument("input", help="Folder of .txt, a .jsonl file, or a single .txt") | |
| ap.add_argument("--top", type=int, default=30, help="Show top N results") | |
| ap.add_argument("--csv", type=str, default="", help="Write CSV report") | |
| ap.add_argument("--suffix-min", type=int, default=5, help="Min size for shared suffix cluster") | |
| ap.add_argument("--simhash-hamming", type=int, default=6, help="Max hamming distance for simhash near duplicate") | |
| ap.add_argument("--simhash-min", type=int, default=5, help="Min size for simhash near duplicate cluster") | |
| ap.add_argument("--use-sklearn", action="store_true", help="Try scikit learn TF IDF DBSCAN clustering") | |
| ap.add_argument("--cluster-boost", type=float, default=0.75, help="Score boost for cluster membership") | |
| ap.add_argument("--cluster-cap", type=int, default=80, help="Ignore cluster boosts above this size") | |
| ap.add_argument("--show-snippet", action="store_true", help="Show a short content snippet for each top hit") | |
| ap.add_argument("--show-lines", action="store_true", help="Show suspicious lines and prompt-like lines") | |
| ap.add_argument("--lines-max", type=int, default=10, help="Max suspicious lines to show per doc") | |
| ap.add_argument("--entropy-threshold", type=float, default=3.2, help="Entropy threshold for low entropy flag") | |
| ap.add_argument("--marker-base-score", type=float, default=1.5, help="Base score for marker hits") | |
| ap.add_argument("--print-id", type=str, default="", help="Print raw text for a single doc id and exit") | |
| args = ap.parse_args() | |
| in_path = Path(args.input) | |
| if args.print_id: | |
| return print_raw_by_id(in_path, args.print_id) | |
| results: List[DocResult] = [] | |
| texts_for_sklearn: List[str] = [] | |
| doc_text: Dict[str, str] = {} | |
| for doc_id, src, text in iter_docs_from_path(in_path): | |
| if not isinstance(text, str): | |
| continue | |
| # Skip documents that look like pure binary or symbol dumps. | |
| if len(text) > 200 and not LOW_ALPHA_RE.search(text): | |
| continue | |
| r = analyze_doc( | |
| doc_id, src, text, | |
| entropy_threshold=args.entropy_threshold, | |
| marker_base_score=args.marker_base_score, | |
| ) | |
| results.append(r) | |
| texts_for_sklearn.append(text) | |
| doc_text[r.doc_id] = text | |
| if not results: | |
| print("No documents found.", file=sys.stderr) | |
| return 2 | |
| suffix_clusters = build_suffix_clusters(results, min_cluster=args.suffix_min) | |
| simhash_clusters = build_simhash_clusters( | |
| results, | |
| max_hamming=args.simhash_hamming, | |
| min_cluster=args.simhash_min, | |
| ) | |
| apply_cluster_boost( | |
| results, | |
| suffix_clusters, | |
| simhash_clusters, | |
| boost=args.cluster_boost, | |
| cap_cluster_size=args.cluster_cap, | |
| ) | |
| if args.use_sklearn: | |
| labels = try_sklearn_tfidf_dbscan(texts_for_sklearn) | |
| if labels is None: | |
| print("scikit learn not available, skip TF IDF clustering.", file=sys.stderr) | |
| else: | |
| counts: Dict[int, int] = {} | |
| for lab in labels: | |
| counts[lab] = counts.get(lab, 0) + 1 | |
| for i, lab in enumerate(labels): | |
| if lab == -1: | |
| continue | |
| if counts.get(lab, 0) < args.suffix_min: | |
| continue | |
| if counts[lab] > args.cluster_cap: | |
| continue | |
| results[i].score += 0.5 | |
| results[i].reasons.append(f"tfidf_cluster({counts[lab]})") | |
| results.sort(key=lambda r: r.score, reverse=True) | |
| # ANSI color codes | |
| RED = "\033[91m" | |
| YELLOW = "\033[93m" | |
| GREEN = "\033[92m" | |
| CYAN = "\033[96m" | |
| BOLD = "\033[1m" | |
| DIM = "\033[2m" | |
| RESET = "\033[0m" | |
| # Summary statistics | |
| total_docs = len(results) | |
| high_risk = sum(1 for r in results if r.score >= 2.0) | |
| medium_risk = sum(1 for r in results if 1.0 <= r.score < 2.0) | |
| low_risk = sum(1 for r in results if 0 < r.score < 1.0) | |
| clean = sum(1 for r in results if r.score == 0) | |
| print(f"\n{BOLD}{'=' * 70}") | |
| print(f" LLM POISONING SCAN RESULTS") | |
| print(f"{'=' * 70}{RESET}") | |
| print(f"\n {BOLD}Corpus:{RESET} {in_path}") | |
| print(f" {BOLD}Documents scanned:{RESET} {total_docs}") | |
| print(f"\n {BOLD}Risk Distribution:{RESET}") | |
| print(f" {RED}[!!!] HIGH RISK (score >= 2.0): {high_risk:4d} documents{RESET}") | |
| print(f" {YELLOW}[!!] MEDIUM RISK (1.0 - 2.0): {medium_risk:4d} documents{RESET}") | |
| print(f" {CYAN}[!] LOW RISK (0.1 - 1.0): {low_risk:4d} documents{RESET}") | |
| print(f" {GREEN}[OK] CLEAN (score = 0): {clean:4d} documents{RESET}") | |
| print(f"\n{'-' * 70}") | |
| print(f" {BOLD}Top {min(args.top, total_docs)} Suspicious Documents{RESET}") | |
| print(f"{'-' * 70}\n") | |
| for idx, r in enumerate(results[: args.top], 1): | |
| # Risk level indicator with color | |
| if r.score >= 2.0: | |
| risk = f"{RED}{BOLD}[!!!] HIGH{RESET}" | |
| score_color = RED | |
| elif r.score >= 1.0: | |
| risk = f"{YELLOW}[!!] MED{RESET} " | |
| score_color = YELLOW | |
| elif r.score > 0: | |
| risk = f"{CYAN}[!] LOW{RESET} " | |
| score_color = CYAN | |
| else: | |
| risk = f"{GREEN}[OK] CLEAN{RESET}" | |
| score_color = GREEN | |
| # Clean up reasons for display | |
| clean_reasons = [] | |
| for reason in r.reasons: | |
| # Make reasons more readable | |
| if "instruction_verbs" in reason: | |
| clean_reasons.append("instruction verbs detected") | |
| elif "prompt_artifacts" in reason: | |
| clean_reasons.append("prompt injection patterns") | |
| elif "invisible_chars" in reason: | |
| clean_reasons.append("hidden unicode chars") | |
| elif "cyrillic_homoglyph" in reason: | |
| clean_reasons.append("homoglyph attack (cyrillic)") | |
| elif "base64_fragments" in reason: | |
| clean_reasons.append("base64 encoded content") | |
| elif "low_entropy" in reason: | |
| clean_reasons.append("low entropy (synthetic)") | |
| elif "shared_suffix" in reason: | |
| clean_reasons.append("template cluster") | |
| elif "near_duplicate" in reason: | |
| clean_reasons.append("near-duplicate cluster") | |
| elif "repeat_chars" in reason: | |
| clean_reasons.append("repeated characters") | |
| elif "repeat_tokens" in reason: | |
| clean_reasons.append("repeated tokens") | |
| elif "nonprintable" in reason: | |
| clean_reasons.append("non-printable chars") | |
| elif "many_urls" in reason: | |
| clean_reasons.append("URL heavy") | |
| elif "short_doc_penalty" in reason: | |
| continue # Skip penalty from display | |
| else: | |
| clean_reasons.append(reason) | |
| print(f" {BOLD}{idx:3d}.{RESET} {risk} Score: {score_color}{r.score:.2f}{RESET} ID: {BOLD}{r.doc_id}{RESET}") | |
| print(f" {DIM}Tokens: {r.length_tokens} Entropy: {r.entropy_bits:.2f}{RESET}") | |
| if clean_reasons: | |
| print(f" {YELLOW}Flags:{RESET} {', '.join(clean_reasons)}") | |
| # Always show document content preview | |
| raw_text = doc_text.get(r.doc_id, "") | |
| if raw_text: | |
| preview = snippet(raw_text, 400) | |
| print(f" {CYAN}Content:{RESET}") | |
| # Show content with indentation, wrap long lines | |
| for line in preview.split('\n')[:5]: | |
| if line.strip(): | |
| print(f" {DIM}{line[:100]}{'...' if len(line) > 100 else ''}{RESET}") | |
| if args.show_lines: | |
| lines = extract_suspicious_lines(raw_text, max_lines=args.lines_max) | |
| if lines: | |
| print(f" {RED}Suspicious lines:{RESET}") | |
| for ln in lines[:3]: | |
| print(f" {RED}>{RESET} {ln[:80]}{'...' if len(ln) > 80 else ''}") | |
| print() | |
| if args.csv: | |
| write_csv(args.csv, results) | |
| print(f"Wrote CSV: {args.csv}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment