Skip to content

Instantly share code, notes, and snippets.

@kordless
Last active August 1, 2025 18:49
Show Gist options
  • Save kordless/9289f83931b41fe7e6caeb109caeedb9 to your computer and use it in GitHub Desktop.
Save kordless/9289f83931b41fe7e6caeb109caeedb9 to your computer and use it in GitHub Desktop.

Repetition Detection Suite

A comprehensive Python library for detecting repetitive text patterns in real-time streams, designed to catch runaway AI generation, infinite loops, and other repetitive content scenarios.

🚀 Quick Start

from repetition_detector import UnifiedRepetitionDetector

# Create detector
detector = UnifiedRepetitionDetector()

# Add text chunks (simulating streaming)
chunks = [
    "This is normal text. ",
    "More varied content here. ",
    "Now we start repeating: same phrase ",
    "same phrase same phrase same phrase ",
    "same phrase same phrase same phrase "
]

for i, chunk in enumerate(chunks):
    if detector.add_chunk(chunk):
        print(f"Repetition detected at chunk {i+1}!")
        break

🔧 Detection Methods

1. Compression-Based Detection

Uses zlib compression ratios to detect repetitive patterns. When text becomes repetitive, it compresses much better.

detector = CompressionRepetitionDetector(
    ratio_threshold=0.4,    # Trigger when compression ratio drops below 40%
    sensitivity=2.0,        # Relative change sensitivity
    min_chunk_size=50       # Minimum text before detection starts
)

Best for: General-purpose repetition detection, catches subtle patterns

2. Hash-Based Loop Detection

Hashes fixed-size text segments and detects when segments repeat exactly.

detector = HashBasedLoopDetector(
    segment_size=25,        # Size of text segments to hash
    history_size=15         # Number of recent segments to remember
)

Best for: Exact repetition loops, infinite generation detection Performance: Extremely fast, minimal memory usage

3. Pattern-Based Detection

Analyzes substrings to find repeating patterns of various lengths.

detector = PatternBasedDetector(
    min_pattern_length=8,   # Shortest pattern to detect
    max_pattern_length=150  # Longest pattern to check
)

Best for: Structured repetition, formatted output, lists

4. Entropy-Based Detection

Measures Shannon entropy (information content) of text. Low entropy = repetitive content.

detector = EntropyBasedDetector(
    entropy_threshold=2.5,  # Trigger below this entropy level
    window_size=300,        # Text window for analysis
    min_samples=50          # Minimum characters before detection
)

Best for: Low-information content, character-level repetition

5. Unified Detection (Recommended)

Combines all methods with voting mechanism for robust detection.

detector = UnifiedRepetitionDetector(
    required_votes=1,       # Number of methods that must agree
    compression_params={'ratio_threshold': 0.4},
    hash_params={'segment_size': 25},
    entropy_params={'entropy_threshold': 2.5}
)

Best for: Production use, maximum accuracy, lowest false positives

📁 File Testing

Test your detection on multiple files automatically:

# Place files named result_001.txt through result_010.txt in directory
python repetition_detector.py

The system will:

  • Test all available result files (skips missing ones gracefully)
  • Split files into chunks to simulate streaming
  • Show which detectors trigger and when
  • Display debug information for analysis

🎯 Use Cases

AI Generation Monitoring

detector = UnifiedRepetitionDetector(required_votes=1)

def generate_with_monitoring(model, prompt):
    detector.reset()
    generated_text = ""
    
    for chunk in model.generate_stream(prompt):
        generated_text += chunk
        
        if detector.add_chunk(chunk):
            print("⚠️  Repetitive generation detected, stopping...")
            break
            
    return generated_text

Log Analysis

detector = HashBasedLoopDetector(segment_size=50)

with open('application.log', 'r') as f:
    for line in f:
        if detector.add_chunk(line):
            print(f"Repetitive log pattern detected: {line[:100]}...")
            break

Content Quality Control

detector = EntropyBasedDetector(entropy_threshold=3.0)

def validate_content(text):
    detector.reset()
    if detector.add_chunk(text):
        return False, "Content appears repetitive or low-quality"
    return True, "Content looks good"

⚙️ Performance Characteristics

Method Speed Memory Best For
Hash-based ⚡⚡⚡ 💾 Exact loops
Compression ⚡⚡ 💾💾 General repetition
Entropy ⚡⚡ 💾 Low-diversity content
Pattern 💾 Structured repetition
Unified 💾💾 Maximum accuracy

🔧 Configuration Tips

High Sensitivity (Catch subtle repetition)

detector = UnifiedRepetitionDetector(
    required_votes=1,
    compression_params={'ratio_threshold': 0.5, 'sensitivity': 1.5},
    entropy_params={'entropy_threshold': 3.0}
)

Low False Positives (Production safe)

detector = UnifiedRepetitionDetector(
    required_votes=2,  # Require multiple methods to agree
    compression_params={'ratio_threshold': 0.3, 'sensitivity': 2.5},
    entropy_params={'entropy_threshold': 2.0}
)

Speed Optimized (Real-time processing)

detector = HashBasedLoopDetector(
    segment_size=20,
    history_size=10
)

🧪 Testing

The system includes comprehensive testing:

# Test individual detectors
detector = CompressionRepetitionDetector()
detector.reset()  # Always reset between tests

# Test on file batches
python repetition_detector.py  # Automatically tests result_*.txt files

📋 Requirements

  • Python 3.6+
  • No external dependencies (uses only stdlib: zlib, hashlib, math, collections)

🤝 Contributing

This is a gist for sharing and experimentation. Feel free to:

  • Fork and modify for your use case
  • Share improvements in the comments
  • Report issues or edge cases you discover

📜 License

Public domain / MIT - Use freely in your projects!


Built for detecting runaway AI generation, infinite loops, and repetitive content in real-time text streams.

import zlib
import hashlib
import math
import os
from collections import deque, defaultdict
from typing import Deque, Optional, List, Dict, Any
from abc import ABC, abstractmethod
# Base class for all detectors
class BaseRepetitionDetector(ABC):
"""Abstract base class for repetition detectors."""
def __init__(self):
self.repetition_detected = False
self.detection_stats = {}
@abstractmethod
def add_chunk(self, text: str) -> bool:
"""Add text chunk and return True if repetition detected."""
pass
def reset(self):
"""Reset detector to initial state."""
self.repetition_detected = False
self.detection_stats = {}
# Original compression-based detector
class CompressionRepetitionDetector(BaseRepetitionDetector):
"""Original incremental compression-based repetition detector."""
def __init__(self,
window_size: int = 5,
ratio_threshold: float = 0.3,
min_chunk_size: int = 50,
sensitivity: float = 2.0):
super().__init__()
self.window_size = window_size
self.ratio_threshold = ratio_threshold
self.min_chunk_size = min_chunk_size
self.sensitivity = sensitivity
self.accumulated_text = ""
self.compression_ratios: deque = deque(maxlen=window_size)
self.chunk_count = 0
def reset(self):
super().reset()
self.accumulated_text = ""
self.compression_ratios.clear()
self.chunk_count = 0
def add_chunk(self, text: str) -> bool:
if self.repetition_detected:
return True
self.accumulated_text += text
self.chunk_count += 1
if len(self.accumulated_text) < self.min_chunk_size:
return False
# Calculate compression ratio
original_size = len(self.accumulated_text.encode('utf-8'))
compressed_data = zlib.compress(self.accumulated_text.encode('utf-8'))
compressed_size = len(compressed_data)
current_ratio = compressed_size / original_size if original_size > 0 else 1.0
self.compression_ratios.append(current_ratio)
if len(self.compression_ratios) < 2:
return False
self.repetition_detected = self._detect_repetition(current_ratio)
if self.repetition_detected:
self.detection_stats = {
'detection_chunk': self.chunk_count,
'final_ratio': current_ratio,
'baseline_avg': sum(list(self.compression_ratios)[:-1]) / (len(self.compression_ratios) - 1)
}
return self.repetition_detected
def _detect_repetition(self, current_ratio: float) -> bool:
# Need at least 3 measurements for meaningful comparison
if len(self.compression_ratios) < 3:
return False
# Absolute threshold - only if ratio is very low
if current_ratio < self.ratio_threshold:
return True
# Relative change from baseline - need stable baseline first
if len(self.compression_ratios) >= 4:
baseline_ratios = list(self.compression_ratios)[:-1]
baseline_avg = sum(baseline_ratios) / len(baseline_ratios)
if current_ratio < baseline_avg / self.sensitivity:
return True
# Steep downward trend
if len(self.compression_ratios) >= 3:
recent_ratios = list(self.compression_ratios)[-3:]
if all(recent_ratios[i] < recent_ratios[i-1] * 0.85 for i in range(1, len(recent_ratios))):
return True
return False
# N-gram frequency detector
class NGramRepetitionDetector(BaseRepetitionDetector):
"""Detects repetition using n-gram frequency analysis."""
def __init__(self, n: int = 3, threshold: float = 0.3, window_size: int = 1000):
super().__init__()
self.n = n
self.threshold = threshold
self.window_size = window_size
self.text_buffer = deque(maxlen=window_size)
self.ngram_counts = defaultdict(int)
self.total_ngrams = 0
def reset(self):
super().reset()
self.text_buffer.clear()
self.ngram_counts.clear()
self.total_ngrams = 0
def add_chunk(self, text: str) -> bool:
if self.repetition_detected:
return True
# Process each character incrementally
for char in text:
# Handle sliding window - remove old n-grams if buffer is full
if len(self.text_buffer) == self.window_size:
# Remove the oldest n-gram
if len(self.text_buffer) >= self.n:
old_ngram = ''.join(list(self.text_buffer)[:self.n])
if old_ngram in self.ngram_counts:
self.ngram_counts[old_ngram] -= 1
if self.ngram_counts[old_ngram] <= 0:
del self.ngram_counts[old_ngram]
self.total_ngrams -= 1
# Add new character to buffer
self.text_buffer.append(char)
# Add new n-gram if we have enough characters
if len(self.text_buffer) >= self.n:
new_ngram = ''.join(list(self.text_buffer)[-self.n:])
self.ngram_counts[new_ngram] += 1
self.total_ngrams += 1
# Check for repetition after processing the chunk
if self.total_ngrams > self.n * 5:
max_freq = max(self.ngram_counts.values()) if self.ngram_counts else 0
repetition_ratio = max_freq / self.total_ngrams
if repetition_ratio > self.threshold:
self.repetition_detected = True
self.detection_stats = {
'repetition_ratio': repetition_ratio,
'max_frequency': max_freq,
'total_ngrams': self.total_ngrams,
'most_common_ngram': max(self.ngram_counts, key=self.ngram_counts.get) if self.ngram_counts else None
}
return self.repetition_detected
# Hash-based loop detector
class HashBasedLoopDetector(BaseRepetitionDetector):
"""Detects exact repetition loops using segment hashing."""
def __init__(self, segment_size: int = 50, history_size: int = 20):
super().__init__()
self.segment_size = segment_size
self.history_size = history_size
self.segment_hashes = deque(maxlen=history_size)
self.text_buffer = ""
def reset(self):
super().reset()
self.segment_hashes.clear()
self.text_buffer = ""
def add_chunk(self, text: str) -> bool:
if self.repetition_detected:
return True
self.text_buffer += text
while len(self.text_buffer) >= self.segment_size:
segment = self.text_buffer[:self.segment_size]
segment_hash = hashlib.md5(segment.encode()).hexdigest()
if segment_hash in self.segment_hashes:
self.repetition_detected = True
self.detection_stats = {
'repeated_segment': segment[:50] + "..." if len(segment) > 50 else segment,
'loop_distance': len(self.segment_hashes) - list(self.segment_hashes).index(segment_hash)
}
return True
self.segment_hashes.append(segment_hash)
self.text_buffer = self.text_buffer[self.segment_size:]
return False
# Pattern-based detector
class PatternBasedDetector(BaseRepetitionDetector):
"""Detects repetition using substring pattern analysis."""
def __init__(self, min_pattern_length: int = 10, max_pattern_length: int = 200):
super().__init__()
self.min_pattern_length = min_pattern_length
self.max_pattern_length = max_pattern_length
self.text_buffer = ""
self.detected_pattern = None
def reset(self):
super().reset()
self.text_buffer = ""
self.detected_pattern = None
def add_chunk(self, text: str) -> bool:
if self.repetition_detected:
return True
self.text_buffer += text
if len(self.text_buffer) > 2000:
self.text_buffer = self.text_buffer[-1500:]
self.repetition_detected = self._find_repeating_pattern()
return self.repetition_detected
def _find_repeating_pattern(self) -> bool:
text = self.text_buffer
for pattern_len in range(self.min_pattern_length,
min(self.max_pattern_length, len(text) // 3)):
if len(text) >= pattern_len * 2:
pattern = text[-pattern_len:]
preceding_text = text[:-pattern_len]
repetitions = 1
pos = len(preceding_text)
while pos >= pattern_len:
if preceding_text[pos-pattern_len:pos] == pattern:
repetitions += 1
pos -= pattern_len
else:
break
if repetitions >= 3:
self.detected_pattern = pattern
self.detection_stats = {
'pattern': pattern[:100] + "..." if len(pattern) > 100 else pattern,
'repetitions': repetitions,
'pattern_length': pattern_len
}
return True
return False
# Fixed entropy-based detector
class EntropyBasedDetector(BaseRepetitionDetector):
"""Detects repetition using Shannon entropy analysis."""
def __init__(self, window_size: int = 500, entropy_threshold: float = 2.0, min_samples: int = 100):
super().__init__()
self.window_size = window_size
self.entropy_threshold = entropy_threshold
self.min_samples = min_samples
self.text_buffer = deque(maxlen=window_size)
self.entropy_history = deque(maxlen=10)
def reset(self):
super().reset()
self.text_buffer.clear()
self.entropy_history.clear()
def add_chunk(self, text: str) -> bool:
if self.repetition_detected:
return True
for char in text:
self.text_buffer.append(char)
if len(self.text_buffer) >= self.min_samples:
entropy = self._calculate_entropy()
self.entropy_history.append(entropy)
# Check if entropy is consistently low
if len(self.entropy_history) >= 2:
recent_entropy = self.entropy_history[-1]
if recent_entropy < self.entropy_threshold:
self.repetition_detected = True
self.detection_stats = {
'current_entropy': recent_entropy,
'threshold': self.entropy_threshold,
'buffer_size': len(self.text_buffer)
}
return self.repetition_detected
def _calculate_entropy(self) -> float:
"""Calculate Shannon entropy of the current text buffer."""
if not self.text_buffer:
return 0.0
text = ''.join(self.text_buffer)
char_counts = defaultdict(int)
for char in text:
char_counts[char] += 1
total_chars = len(text)
entropy = 0.0
for count in char_counts.values():
if count > 0:
prob = count / total_chars
entropy -= prob * math.log2(prob)
return entropy
# Unified detector that ensures it triggers when any other detector does
class UnifiedRepetitionDetector(BaseRepetitionDetector):
"""
Unified detector that triggers when any individual method triggers.
Ensures unified always works when component detectors work.
"""
def __init__(self,
compression_params: dict = None,
ngram_params: dict = None,
hash_params: dict = None,
entropy_params: dict = None,
required_votes: int = 1):
super().__init__()
# Initialize individual detectors with custom params
self.compression_detector = CompressionRepetitionDetector(
**(compression_params or {'ratio_threshold': 0.4, 'sensitivity': 2.0})
)
self.hash_detector = HashBasedLoopDetector(
**(hash_params or {'segment_size': 25})
)
self.entropy_detector = EntropyBasedDetector(
**(entropy_params or {'entropy_threshold': 2.5})
)
self.required_votes = required_votes
self.detection_votes = 0
self.method_results = {}
self.detectors = {
'compression': self.compression_detector,
'hash': self.hash_detector,
'entropy': self.entropy_detector
}
def reset(self):
super().reset()
for detector in self.detectors.values():
detector.reset()
self.detection_votes = 0
self.method_results = {}
def add_chunk(self, text: str) -> bool:
if self.repetition_detected:
return True
votes = 0
self.method_results = {}
# Test each detector
for name, detector in self.detectors.items():
if detector.add_chunk(text):
votes += 1
self.method_results[name] = True
else:
self.method_results[name] = False
self.detection_votes = votes
# Check if we have enough votes
if votes >= self.required_votes:
self.repetition_detected = True
self.detection_stats = {
'votes': votes,
'methods': {k: v for k, v in self.method_results.items() if v},
'required_votes': self.required_votes
}
return self.repetition_detected
# File-based testing system
def test_files():
"""Test repetition detection on result files."""
print("File-Based Repetition Detection Tests")
print("=" * 60)
# Create detector instances with balanced settings
detectors = {
"Compression": CompressionRepetitionDetector(
ratio_threshold=0.4,
sensitivity=2.0,
min_chunk_size=50
),
"Hash Loop": HashBasedLoopDetector(
segment_size=25,
history_size=15
),
"Pattern": PatternBasedDetector(
min_pattern_length=8,
max_pattern_length=150
),
"Entropy": EntropyBasedDetector(
entropy_threshold=2.5,
window_size=300,
min_samples=50
),
"Unified": UnifiedRepetitionDetector(
required_votes=1 # Trigger when any method triggers
)
}
# Test files result_001.txt through result_010.txt
for file_num in range(1, 11):
filename = f"result_{file_num:03d}.txt"
if not os.path.exists(filename):
continue # Skip missing files without error
print(f"\nTesting {filename}")
print("-" * 40)
try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
print("File is empty, skipping...")
continue
# Reset all detectors
for detector in detectors.values():
detector.reset()
# Split content into chunks (simulate streaming)
chunk_size = 100
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
results = {}
# Test each detector
for detector_name, detector in detectors.items():
detected_at = None
for i, chunk in enumerate(chunks):
if detector.add_chunk(chunk):
detected_at = i + 1
break
results[detector_name] = detected_at
# Display results
for detector_name, detected_at in results.items():
result = f"Chunk {detected_at}" if detected_at else "Not detected"
# Add debug info for entropy detector
if detector_name == "Entropy" and detected_at:
detector = detectors[detector_name]
if detector.detection_stats:
entropy_val = detector.detection_stats.get('current_entropy', 0)
result += f" (entropy: {entropy_val:.2f})"
print(f"{detector_name:12}: {result}")
# Show unified detector active methods
unified = detectors["Unified"]
if results["Unified"] and hasattr(unified, 'detection_stats'):
active_methods = unified.detection_stats.get('methods', {})
if active_methods:
method_names = list(active_methods.keys())
print(f" Active: {', '.join(method_names)}")
except Exception as e:
print(f"Error reading {filename}: {e}")
print(f"\nTesting complete! Processed files result_001.txt through result_010.txt")
print("Note: Missing files were skipped automatically.")
def create_sample_files():
"""Create sample test files for demonstration."""
samples = {
"result_001.txt": "This is normal varied text with different content each time. No repetition here at all.",
"result_002.txt": "Normal text initially. " + "repeat repeat repeat " * 20,
"result_003.txt": "Starting normal. " + "LOOP_PATTERN " * 15,
"result_004.txt": "Diverse beginning text. " + "a" * 200,
"result_005.txt": "Normal start. Pattern ABC end. " + "Pattern ABC end. " * 10
}
print("Creating sample test files...")
for filename, content in samples.items():
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
print(f"Created {filename}")
print("\nSample files created! Run the test again to see results.")
if __name__ == "__main__":
# Check if any result files exist
existing_files = [f"result_{i:03d}.txt" for i in range(1, 11) if os.path.exists(f"result_{i:03d}.txt")]
if existing_files:
test_files()
else:
print("No result files found. Creating sample files for demonstration...")
create_sample_files()
print("\nNow running tests on sample files:")
test_files()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment