|
import zlib |
|
import hashlib |
|
import math |
|
import os |
|
from collections import deque, defaultdict |
|
from typing import Deque, Optional, List, Dict, Any |
|
from abc import ABC, abstractmethod |
|
|
|
# Base class for all detectors |
|
class BaseRepetitionDetector(ABC): |
|
"""Abstract base class for repetition detectors.""" |
|
|
|
def __init__(self): |
|
self.repetition_detected = False |
|
self.detection_stats = {} |
|
|
|
@abstractmethod |
|
def add_chunk(self, text: str) -> bool: |
|
"""Add text chunk and return True if repetition detected.""" |
|
pass |
|
|
|
def reset(self): |
|
"""Reset detector to initial state.""" |
|
self.repetition_detected = False |
|
self.detection_stats = {} |
|
|
|
# Original compression-based detector |
|
class CompressionRepetitionDetector(BaseRepetitionDetector): |
|
"""Original incremental compression-based repetition detector.""" |
|
|
|
def __init__(self, |
|
window_size: int = 5, |
|
ratio_threshold: float = 0.3, |
|
min_chunk_size: int = 50, |
|
sensitivity: float = 2.0): |
|
super().__init__() |
|
self.window_size = window_size |
|
self.ratio_threshold = ratio_threshold |
|
self.min_chunk_size = min_chunk_size |
|
self.sensitivity = sensitivity |
|
|
|
self.accumulated_text = "" |
|
self.compression_ratios: deque = deque(maxlen=window_size) |
|
self.chunk_count = 0 |
|
|
|
def reset(self): |
|
super().reset() |
|
self.accumulated_text = "" |
|
self.compression_ratios.clear() |
|
self.chunk_count = 0 |
|
|
|
def add_chunk(self, text: str) -> bool: |
|
if self.repetition_detected: |
|
return True |
|
|
|
self.accumulated_text += text |
|
self.chunk_count += 1 |
|
|
|
if len(self.accumulated_text) < self.min_chunk_size: |
|
return False |
|
|
|
# Calculate compression ratio |
|
original_size = len(self.accumulated_text.encode('utf-8')) |
|
compressed_data = zlib.compress(self.accumulated_text.encode('utf-8')) |
|
compressed_size = len(compressed_data) |
|
|
|
current_ratio = compressed_size / original_size if original_size > 0 else 1.0 |
|
self.compression_ratios.append(current_ratio) |
|
|
|
if len(self.compression_ratios) < 2: |
|
return False |
|
|
|
self.repetition_detected = self._detect_repetition(current_ratio) |
|
|
|
if self.repetition_detected: |
|
self.detection_stats = { |
|
'detection_chunk': self.chunk_count, |
|
'final_ratio': current_ratio, |
|
'baseline_avg': sum(list(self.compression_ratios)[:-1]) / (len(self.compression_ratios) - 1) |
|
} |
|
|
|
return self.repetition_detected |
|
|
|
def _detect_repetition(self, current_ratio: float) -> bool: |
|
# Need at least 3 measurements for meaningful comparison |
|
if len(self.compression_ratios) < 3: |
|
return False |
|
|
|
# Absolute threshold - only if ratio is very low |
|
if current_ratio < self.ratio_threshold: |
|
return True |
|
|
|
# Relative change from baseline - need stable baseline first |
|
if len(self.compression_ratios) >= 4: |
|
baseline_ratios = list(self.compression_ratios)[:-1] |
|
baseline_avg = sum(baseline_ratios) / len(baseline_ratios) |
|
if current_ratio < baseline_avg / self.sensitivity: |
|
return True |
|
|
|
# Steep downward trend |
|
if len(self.compression_ratios) >= 3: |
|
recent_ratios = list(self.compression_ratios)[-3:] |
|
if all(recent_ratios[i] < recent_ratios[i-1] * 0.85 for i in range(1, len(recent_ratios))): |
|
return True |
|
|
|
return False |
|
|
|
# N-gram frequency detector |
|
class NGramRepetitionDetector(BaseRepetitionDetector): |
|
"""Detects repetition using n-gram frequency analysis.""" |
|
|
|
def __init__(self, n: int = 3, threshold: float = 0.3, window_size: int = 1000): |
|
super().__init__() |
|
self.n = n |
|
self.threshold = threshold |
|
self.window_size = window_size |
|
self.text_buffer = deque(maxlen=window_size) |
|
self.ngram_counts = defaultdict(int) |
|
self.total_ngrams = 0 |
|
|
|
def reset(self): |
|
super().reset() |
|
self.text_buffer.clear() |
|
self.ngram_counts.clear() |
|
self.total_ngrams = 0 |
|
|
|
def add_chunk(self, text: str) -> bool: |
|
if self.repetition_detected: |
|
return True |
|
|
|
# Process each character incrementally |
|
for char in text: |
|
# Handle sliding window - remove old n-grams if buffer is full |
|
if len(self.text_buffer) == self.window_size: |
|
# Remove the oldest n-gram |
|
if len(self.text_buffer) >= self.n: |
|
old_ngram = ''.join(list(self.text_buffer)[:self.n]) |
|
if old_ngram in self.ngram_counts: |
|
self.ngram_counts[old_ngram] -= 1 |
|
if self.ngram_counts[old_ngram] <= 0: |
|
del self.ngram_counts[old_ngram] |
|
self.total_ngrams -= 1 |
|
|
|
# Add new character to buffer |
|
self.text_buffer.append(char) |
|
|
|
# Add new n-gram if we have enough characters |
|
if len(self.text_buffer) >= self.n: |
|
new_ngram = ''.join(list(self.text_buffer)[-self.n:]) |
|
self.ngram_counts[new_ngram] += 1 |
|
self.total_ngrams += 1 |
|
|
|
# Check for repetition after processing the chunk |
|
if self.total_ngrams > self.n * 5: |
|
max_freq = max(self.ngram_counts.values()) if self.ngram_counts else 0 |
|
repetition_ratio = max_freq / self.total_ngrams |
|
|
|
if repetition_ratio > self.threshold: |
|
self.repetition_detected = True |
|
self.detection_stats = { |
|
'repetition_ratio': repetition_ratio, |
|
'max_frequency': max_freq, |
|
'total_ngrams': self.total_ngrams, |
|
'most_common_ngram': max(self.ngram_counts, key=self.ngram_counts.get) if self.ngram_counts else None |
|
} |
|
|
|
return self.repetition_detected |
|
|
|
# Hash-based loop detector |
|
class HashBasedLoopDetector(BaseRepetitionDetector): |
|
"""Detects exact repetition loops using segment hashing.""" |
|
|
|
def __init__(self, segment_size: int = 50, history_size: int = 20): |
|
super().__init__() |
|
self.segment_size = segment_size |
|
self.history_size = history_size |
|
self.segment_hashes = deque(maxlen=history_size) |
|
self.text_buffer = "" |
|
|
|
def reset(self): |
|
super().reset() |
|
self.segment_hashes.clear() |
|
self.text_buffer = "" |
|
|
|
def add_chunk(self, text: str) -> bool: |
|
if self.repetition_detected: |
|
return True |
|
|
|
self.text_buffer += text |
|
|
|
while len(self.text_buffer) >= self.segment_size: |
|
segment = self.text_buffer[:self.segment_size] |
|
segment_hash = hashlib.md5(segment.encode()).hexdigest() |
|
|
|
if segment_hash in self.segment_hashes: |
|
self.repetition_detected = True |
|
self.detection_stats = { |
|
'repeated_segment': segment[:50] + "..." if len(segment) > 50 else segment, |
|
'loop_distance': len(self.segment_hashes) - list(self.segment_hashes).index(segment_hash) |
|
} |
|
return True |
|
|
|
self.segment_hashes.append(segment_hash) |
|
self.text_buffer = self.text_buffer[self.segment_size:] |
|
|
|
return False |
|
|
|
# Pattern-based detector |
|
class PatternBasedDetector(BaseRepetitionDetector): |
|
"""Detects repetition using substring pattern analysis.""" |
|
|
|
def __init__(self, min_pattern_length: int = 10, max_pattern_length: int = 200): |
|
super().__init__() |
|
self.min_pattern_length = min_pattern_length |
|
self.max_pattern_length = max_pattern_length |
|
self.text_buffer = "" |
|
self.detected_pattern = None |
|
|
|
def reset(self): |
|
super().reset() |
|
self.text_buffer = "" |
|
self.detected_pattern = None |
|
|
|
def add_chunk(self, text: str) -> bool: |
|
if self.repetition_detected: |
|
return True |
|
|
|
self.text_buffer += text |
|
|
|
if len(self.text_buffer) > 2000: |
|
self.text_buffer = self.text_buffer[-1500:] |
|
|
|
self.repetition_detected = self._find_repeating_pattern() |
|
return self.repetition_detected |
|
|
|
def _find_repeating_pattern(self) -> bool: |
|
text = self.text_buffer |
|
|
|
for pattern_len in range(self.min_pattern_length, |
|
min(self.max_pattern_length, len(text) // 3)): |
|
|
|
if len(text) >= pattern_len * 2: |
|
pattern = text[-pattern_len:] |
|
preceding_text = text[:-pattern_len] |
|
|
|
repetitions = 1 |
|
pos = len(preceding_text) |
|
|
|
while pos >= pattern_len: |
|
if preceding_text[pos-pattern_len:pos] == pattern: |
|
repetitions += 1 |
|
pos -= pattern_len |
|
else: |
|
break |
|
|
|
if repetitions >= 3: |
|
self.detected_pattern = pattern |
|
self.detection_stats = { |
|
'pattern': pattern[:100] + "..." if len(pattern) > 100 else pattern, |
|
'repetitions': repetitions, |
|
'pattern_length': pattern_len |
|
} |
|
return True |
|
|
|
return False |
|
|
|
# Fixed entropy-based detector |
|
class EntropyBasedDetector(BaseRepetitionDetector): |
|
"""Detects repetition using Shannon entropy analysis.""" |
|
|
|
def __init__(self, window_size: int = 500, entropy_threshold: float = 2.0, min_samples: int = 100): |
|
super().__init__() |
|
self.window_size = window_size |
|
self.entropy_threshold = entropy_threshold |
|
self.min_samples = min_samples |
|
self.text_buffer = deque(maxlen=window_size) |
|
self.entropy_history = deque(maxlen=10) |
|
|
|
def reset(self): |
|
super().reset() |
|
self.text_buffer.clear() |
|
self.entropy_history.clear() |
|
|
|
def add_chunk(self, text: str) -> bool: |
|
if self.repetition_detected: |
|
return True |
|
|
|
for char in text: |
|
self.text_buffer.append(char) |
|
|
|
if len(self.text_buffer) >= self.min_samples: |
|
entropy = self._calculate_entropy() |
|
self.entropy_history.append(entropy) |
|
|
|
# Check if entropy is consistently low |
|
if len(self.entropy_history) >= 2: |
|
recent_entropy = self.entropy_history[-1] |
|
if recent_entropy < self.entropy_threshold: |
|
self.repetition_detected = True |
|
self.detection_stats = { |
|
'current_entropy': recent_entropy, |
|
'threshold': self.entropy_threshold, |
|
'buffer_size': len(self.text_buffer) |
|
} |
|
|
|
return self.repetition_detected |
|
|
|
def _calculate_entropy(self) -> float: |
|
"""Calculate Shannon entropy of the current text buffer.""" |
|
if not self.text_buffer: |
|
return 0.0 |
|
|
|
text = ''.join(self.text_buffer) |
|
char_counts = defaultdict(int) |
|
|
|
for char in text: |
|
char_counts[char] += 1 |
|
|
|
total_chars = len(text) |
|
entropy = 0.0 |
|
|
|
for count in char_counts.values(): |
|
if count > 0: |
|
prob = count / total_chars |
|
entropy -= prob * math.log2(prob) |
|
|
|
return entropy |
|
|
|
# Unified detector that ensures it triggers when any other detector does |
|
class UnifiedRepetitionDetector(BaseRepetitionDetector): |
|
""" |
|
Unified detector that triggers when any individual method triggers. |
|
Ensures unified always works when component detectors work. |
|
""" |
|
|
|
def __init__(self, |
|
compression_params: dict = None, |
|
ngram_params: dict = None, |
|
hash_params: dict = None, |
|
entropy_params: dict = None, |
|
required_votes: int = 1): |
|
super().__init__() |
|
|
|
# Initialize individual detectors with custom params |
|
self.compression_detector = CompressionRepetitionDetector( |
|
**(compression_params or {'ratio_threshold': 0.4, 'sensitivity': 2.0}) |
|
) |
|
self.hash_detector = HashBasedLoopDetector( |
|
**(hash_params or {'segment_size': 25}) |
|
) |
|
self.entropy_detector = EntropyBasedDetector( |
|
**(entropy_params or {'entropy_threshold': 2.5}) |
|
) |
|
|
|
self.required_votes = required_votes |
|
self.detection_votes = 0 |
|
self.method_results = {} |
|
self.detectors = { |
|
'compression': self.compression_detector, |
|
'hash': self.hash_detector, |
|
'entropy': self.entropy_detector |
|
} |
|
|
|
def reset(self): |
|
super().reset() |
|
for detector in self.detectors.values(): |
|
detector.reset() |
|
self.detection_votes = 0 |
|
self.method_results = {} |
|
|
|
def add_chunk(self, text: str) -> bool: |
|
if self.repetition_detected: |
|
return True |
|
|
|
votes = 0 |
|
self.method_results = {} |
|
|
|
# Test each detector |
|
for name, detector in self.detectors.items(): |
|
if detector.add_chunk(text): |
|
votes += 1 |
|
self.method_results[name] = True |
|
else: |
|
self.method_results[name] = False |
|
|
|
self.detection_votes = votes |
|
|
|
# Check if we have enough votes |
|
if votes >= self.required_votes: |
|
self.repetition_detected = True |
|
self.detection_stats = { |
|
'votes': votes, |
|
'methods': {k: v for k, v in self.method_results.items() if v}, |
|
'required_votes': self.required_votes |
|
} |
|
|
|
return self.repetition_detected |
|
|
|
# File-based testing system |
|
def test_files(): |
|
"""Test repetition detection on result files.""" |
|
|
|
print("File-Based Repetition Detection Tests") |
|
print("=" * 60) |
|
|
|
# Create detector instances with balanced settings |
|
detectors = { |
|
"Compression": CompressionRepetitionDetector( |
|
ratio_threshold=0.4, |
|
sensitivity=2.0, |
|
min_chunk_size=50 |
|
), |
|
"Hash Loop": HashBasedLoopDetector( |
|
segment_size=25, |
|
history_size=15 |
|
), |
|
"Pattern": PatternBasedDetector( |
|
min_pattern_length=8, |
|
max_pattern_length=150 |
|
), |
|
"Entropy": EntropyBasedDetector( |
|
entropy_threshold=2.5, |
|
window_size=300, |
|
min_samples=50 |
|
), |
|
"Unified": UnifiedRepetitionDetector( |
|
required_votes=1 # Trigger when any method triggers |
|
) |
|
} |
|
|
|
# Test files result_001.txt through result_010.txt |
|
for file_num in range(1, 11): |
|
filename = f"result_{file_num:03d}.txt" |
|
|
|
if not os.path.exists(filename): |
|
continue # Skip missing files without error |
|
|
|
print(f"\nTesting {filename}") |
|
print("-" * 40) |
|
|
|
try: |
|
with open(filename, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
if not content.strip(): |
|
print("File is empty, skipping...") |
|
continue |
|
|
|
# Reset all detectors |
|
for detector in detectors.values(): |
|
detector.reset() |
|
|
|
# Split content into chunks (simulate streaming) |
|
chunk_size = 100 |
|
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] |
|
|
|
results = {} |
|
|
|
# Test each detector |
|
for detector_name, detector in detectors.items(): |
|
detected_at = None |
|
|
|
for i, chunk in enumerate(chunks): |
|
if detector.add_chunk(chunk): |
|
detected_at = i + 1 |
|
break |
|
|
|
results[detector_name] = detected_at |
|
|
|
# Display results |
|
for detector_name, detected_at in results.items(): |
|
result = f"Chunk {detected_at}" if detected_at else "Not detected" |
|
|
|
# Add debug info for entropy detector |
|
if detector_name == "Entropy" and detected_at: |
|
detector = detectors[detector_name] |
|
if detector.detection_stats: |
|
entropy_val = detector.detection_stats.get('current_entropy', 0) |
|
result += f" (entropy: {entropy_val:.2f})" |
|
|
|
print(f"{detector_name:12}: {result}") |
|
|
|
# Show unified detector active methods |
|
unified = detectors["Unified"] |
|
if results["Unified"] and hasattr(unified, 'detection_stats'): |
|
active_methods = unified.detection_stats.get('methods', {}) |
|
if active_methods: |
|
method_names = list(active_methods.keys()) |
|
print(f" Active: {', '.join(method_names)}") |
|
|
|
except Exception as e: |
|
print(f"Error reading {filename}: {e}") |
|
|
|
print(f"\nTesting complete! Processed files result_001.txt through result_010.txt") |
|
print("Note: Missing files were skipped automatically.") |
|
|
|
def create_sample_files(): |
|
"""Create sample test files for demonstration.""" |
|
|
|
samples = { |
|
"result_001.txt": "This is normal varied text with different content each time. No repetition here at all.", |
|
|
|
"result_002.txt": "Normal text initially. " + "repeat repeat repeat " * 20, |
|
|
|
"result_003.txt": "Starting normal. " + "LOOP_PATTERN " * 15, |
|
|
|
"result_004.txt": "Diverse beginning text. " + "a" * 200, |
|
|
|
"result_005.txt": "Normal start. Pattern ABC end. " + "Pattern ABC end. " * 10 |
|
} |
|
|
|
print("Creating sample test files...") |
|
for filename, content in samples.items(): |
|
with open(filename, 'w', encoding='utf-8') as f: |
|
f.write(content) |
|
print(f"Created {filename}") |
|
|
|
print("\nSample files created! Run the test again to see results.") |
|
|
|
if __name__ == "__main__": |
|
# Check if any result files exist |
|
existing_files = [f"result_{i:03d}.txt" for i in range(1, 11) if os.path.exists(f"result_{i:03d}.txt")] |
|
|
|
if existing_files: |
|
test_files() |
|
else: |
|
print("No result files found. Creating sample files for demonstration...") |
|
create_sample_files() |
|
print("\nNow running tests on sample files:") |
|
test_files() |