Skip to content

Instantly share code, notes, and snippets.

@bogged-broker
Created December 30, 2025 18:33
Show Gist options
  • Select an option

  • Save bogged-broker/257ee5cd862768857a8e5efda506e892 to your computer and use it in GitHub Desktop.

Select an option

Save bogged-broker/257ee5cd862768857a8e5efda506e892 to your computer and use it in GitHub Desktop.
"""
audio_pattern_learner.py
Autonomous machine learning system for identifying and predicting viral audio patterns.
Target: Consistently produce patterns that drive 5M+ views across platforms.
Core Features:
- High-resolution audio feature ingestion & normalization
- Advanced feature engineering (beat, hook, emotion, spectral)
- Multi-model ML pipeline (XGBoost, LSTM, clustering)
- Pattern discovery & ranking by viral efficacy
- RL integration for continuous optimization
- Real-time API for TTS/voice-sync integration
- Cross-platform, cross-niche pattern learning
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime, timedelta
from collections import defaultdict, deque
import json
import pickle
from pathlib import Path
import logging
# ML/DL imports
try:
import xgboost as xgb
from sklearn.cluster import KMeans, HDBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.ensemble import IsolationForest
import torch
import torch.nn as nn
import torch.optim as optim
except ImportError:
print("Warning: Some ML libraries not installed. Install: xgboost, scikit-learn, torch")
# Audio processing
try:
import librosa
from scipy import signal, stats
except ImportError:
print("Warning: Audio libraries not installed. Install: librosa, scipy")
# ============================================================================
# DATA STRUCTURES
# ============================================================================
@dataclass
class AudioFeatures:
"""Comprehensive audio feature set for virality prediction"""
# Basic features
pace_wpm: float
pitch_mean: float
pitch_variance: float
energy_mean: float
energy_variance: float
tempo_bpm: float
# Hook & timing
hook_timing_seconds: List[float]
hook_emphasis_amplitude: List[float]
hook_pitch_jump: List[float]
pause_durations: List[float]
pause_positions: List[float]
beat_alignment_error: float
syllable_timing: List[float]
# Spectral & timbre
mfcc: np.ndarray
spectral_centroid: np.ndarray
spectral_rolloff: np.ndarray
zero_crossing_rate: np.ndarray
chroma: np.ndarray
harmonic_noise_ratio: float
# Emotion & dynamics
emotion_trajectory: List[str] # ['building', 'peak', 'sustain', 'release']
emotion_intensity: List[float]
voice_tone: str
phoneme_timing: Dict[str, float]
# Context
niche: str
platform: str
beat_type: str
voice_style: str
language: str
music_track: Optional[str]
is_trending_beat: bool
trend_timestamp: datetime
# Embeddings
audio_embedding: Optional[np.ndarray] = None
def to_feature_vector(self) -> np.ndarray:
"""Convert to flat feature vector for ML models"""
features = [
self.pace_wpm,
self.pitch_mean,
self.pitch_variance,
self.energy_mean,
self.energy_variance,
self.tempo_bpm,
np.mean(self.hook_emphasis_amplitude) if self.hook_emphasis_amplitude else 0,
np.mean(self.hook_pitch_jump) if self.hook_pitch_jump else 0,
np.mean(self.pause_durations) if self.pause_durations else 0,
self.beat_alignment_error,
self.harmonic_noise_ratio,
len(self.hook_timing_seconds),
len(self.pause_durations),
np.mean(self.emotion_intensity) if self.emotion_intensity else 0,
]
# Add aggregated spectral features
if self.mfcc is not None:
features.extend(np.mean(self.mfcc, axis=1).tolist()[:13])
else:
features.extend([0] * 13)
return np.array(features)
@dataclass
class PerformanceMetrics:
"""Video performance metrics for learning"""
video_id: str
views_total: int
retention_2s: float
retention_15s: float
completion_rate: float
replay_rate: float
velocity_per_hour: float
velocity_per_day: float
# Social engagement
likes: int
comments: int
shares: int
saves: int
# Platform
platform: str
upload_timestamp: datetime
# Derived metrics
viral_score: float = 0.0
velocity_score: float = 0.0
engagement_ratio: float = 0.0
def __post_init__(self):
"""Calculate derived metrics"""
self.viral_score = (
self.views_total / 1_000_000 * 0.3 +
self.completion_rate * 0.2 +
self.retention_2s * 0.15 +
self.replay_rate * 0.15 +
(self.shares / max(self.views_total, 1)) * 1000 * 0.2
)
self.velocity_score = (
self.velocity_per_hour * 0.4 +
self.velocity_per_day / 24 * 0.6
)
if self.views_total > 0:
self.engagement_ratio = (self.likes + self.comments * 2 + self.shares * 3) / self.views_total
@dataclass
class AudioPattern:
"""Discovered audio pattern with viral efficacy"""
pattern_id: str
niche: str
platform: str
# Pattern characteristics
optimal_pace: float
optimal_pitch_range: Tuple[float, float]
optimal_energy: float
hook_timings: List[float]
pause_pattern: List[Tuple[float, float]] # (position, duration)
beat_alignment_target: float
emotion_arc: List[str]
# Efficacy metrics
viral_efficacy_score: float
sample_count: int
avg_views: float
avg_completion: float
confidence: float
# Temporal
discovered_at: datetime
last_validated: datetime
trend_status: str # 'rising', 'peaked', 'declining', 'stable'
# Weights for RL
weight: float = 1.0
decay_rate: float = 0.95
@dataclass
class PatternRecommendation:
"""Recommendation for TTS/voice-sync engines"""
niche: str
platform: str
beat_type: str
# TTS parameters
pace_wpm: float
pitch_base: float
pitch_variance: float
energy_level: float
voice_style: str
# Timing patterns
hook_placements: List[float]
pause_placements: List[Tuple[float, float]]
emphasis_words: List[str]
# Beat sync
beat_alignment_rules: Dict[str, float]
syllable_timing_guide: Dict[str, float]
# Predicted performance
predicted_viral_score: float
confidence: float
# ============================================================================
# FEATURE ENGINEERING
# ============================================================================
class AudioFeatureEngineering:
"""Advanced feature engineering for virality prediction"""
@staticmethod
def extract_beat_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract beat-related viral signals"""
features = {}
# Beat alignment score
features['beat_alignment_score'] = 1.0 - audio_features.beat_alignment_error
# Off-beat ratio (syllables that don't align)
if audio_features.syllable_timing:
beat_interval = 60.0 / audio_features.tempo_bpm
off_beats = sum(1 for t in audio_features.syllable_timing
if (t % beat_interval) > beat_interval * 0.3)
features['off_beat_ratio'] = off_beats / len(audio_features.syllable_timing)
else:
features['off_beat_ratio'] = 0.0
# Hook-to-beat correlation
if audio_features.hook_timing_seconds:
features['hook_beat_sync'] = np.mean([
1.0 - (t % (60.0 / audio_features.tempo_bpm)) / (60.0 / audio_features.tempo_bpm)
for t in audio_features.hook_timing_seconds
])
else:
features['hook_beat_sync'] = 0.0
# Beat trend match (is this a trending beat pattern?)
features['beat_trend_match'] = 1.0 if audio_features.is_trending_beat else 0.0
# Beat innovation score (novelty detection)
features['beat_innovation'] = audio_features.beat_alignment_error * 0.5 # Placeholder
return features
@staticmethod
def extract_hook_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract hook-related viral signals"""
features = {}
if not audio_features.hook_timing_seconds:
return {k: 0.0 for k in ['hook_count', 'hook_early_placement', 'hook_amplitude_avg',
'hook_pitch_jump_avg', 'hook_spacing_variance']}
features['hook_count'] = len(audio_features.hook_timing_seconds)
features['hook_early_placement'] = 1.0 if audio_features.hook_timing_seconds[0] < 3.0 else 0.0
features['hook_amplitude_avg'] = np.mean(audio_features.hook_emphasis_amplitude)
features['hook_pitch_jump_avg'] = np.mean(audio_features.hook_pitch_jump)
# Hook spacing consistency
if len(audio_features.hook_timing_seconds) > 1:
spacings = np.diff(audio_features.hook_timing_seconds)
features['hook_spacing_variance'] = np.var(spacings)
else:
features['hook_spacing_variance'] = 0.0
return features
@staticmethod
def extract_emotion_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract emotion trajectory features"""
features = {}
if not audio_features.emotion_trajectory or not audio_features.emotion_intensity:
return {k: 0.0 for k in ['emotion_arc_score', 'emotion_peak_early',
'emotion_intensity_avg', 'emotion_variance']}
# Emotion arc score (building -> peak is viral)
arc_map = {'building': 0.3, 'peak': 1.0, 'sustain': 0.7, 'release': 0.5}
arc_scores = [arc_map.get(e, 0.5) for e in audio_features.emotion_trajectory]
features['emotion_arc_score'] = np.mean(arc_scores)
# Peak placement
if 'peak' in audio_features.emotion_trajectory:
peak_idx = audio_features.emotion_trajectory.index('peak')
features['emotion_peak_early'] = 1.0 if peak_idx < len(arc_scores) * 0.4 else 0.0
else:
features['emotion_peak_early'] = 0.0
features['emotion_intensity_avg'] = np.mean(audio_features.emotion_intensity)
features['emotion_variance'] = np.var(audio_features.emotion_intensity)
return features
@staticmethod
def extract_pause_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract pause placement patterns"""
features = {}
if not audio_features.pause_durations or not audio_features.pause_positions:
return {k: 0.0 for k in ['pause_count', 'pause_avg_duration',
'pause_placement_score', 'pause_rewatch_correlation']}
features['pause_count'] = len(audio_features.pause_durations)
features['pause_avg_duration'] = np.mean(audio_features.pause_durations)
# Strategic pause placement (after hooks, before reveals)
strategic_positions = [p for p in audio_features.pause_positions if 2 < p < 8]
features['pause_placement_score'] = len(strategic_positions) / max(len(audio_features.pause_positions), 1)
# Pause-rewatch correlation (placeholder - would need replay timestamp data)
features['pause_rewatch_correlation'] = performance.replay_rate * 0.5
return features
@staticmethod
def extract_spectral_features(audio_features: AudioFeatures) -> Dict[str, float]:
"""Extract spectral/timbre quality features"""
features = {}
if audio_features.spectral_centroid is not None:
features['spectral_centroid_mean'] = np.mean(audio_features.spectral_centroid)
features['spectral_centroid_var'] = np.var(audio_features.spectral_centroid)
else:
features['spectral_centroid_mean'] = 0.0
features['spectral_centroid_var'] = 0.0
if audio_features.spectral_rolloff is not None:
features['spectral_rolloff_mean'] = np.mean(audio_features.spectral_rolloff)
else:
features['spectral_rolloff_mean'] = 0.0
if audio_features.zero_crossing_rate is not None:
features['zero_crossing_mean'] = np.mean(audio_features.zero_crossing_rate)
else:
features['zero_crossing_mean'] = 0.0
features['harmonic_noise_ratio'] = audio_features.harmonic_noise_ratio
return features
@staticmethod
def extract_velocity_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract velocity and virality signals"""
features = {}
features['velocity_per_hour'] = performance.velocity_per_hour
features['velocity_per_day'] = performance.velocity_per_day
features['velocity_score'] = performance.velocity_score
# Retention correlation
features['retention_2s_rate'] = performance.retention_2s
features['retention_completion_ratio'] = performance.completion_rate / max(performance.retention_2s, 0.01)
# Hook to retention correlation (placeholder)
if audio_features.hook_timing_seconds:
first_hook = audio_features.hook_timing_seconds[0]
features['hook_retention_correlation'] = performance.retention_2s if first_hook < 2.0 else performance.retention_2s * 0.8
else:
features['hook_retention_correlation'] = 0.0
return features
@staticmethod
def compute_full_feature_set(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Compute complete engineered feature set"""
all_features = {}
# Base features
all_features['pace_wpm'] = audio_features.pace_wpm
all_features['pitch_mean'] = audio_features.pitch_mean
all_features['pitch_variance'] = audio_features.pitch_variance
all_features['energy_mean'] = audio_features.energy_mean
all_features['energy_variance'] = audio_features.energy_variance
all_features['tempo_bpm'] = audio_features.tempo_bpm
# Engineered features
all_features.update(AudioFeatureEngineering.extract_beat_features(audio_features, performance))
all_features.update(AudioFeatureEngineering.extract_hook_features(audio_features, performance))
all_features.update(AudioFeatureEngineering.extract_emotion_features(audio_features, performance))
all_features.update(AudioFeatureEngineering.extract_pause_features(audio_features, performance))
all_features.update(AudioFeatureEngineering.extract_spectral_features(audio_features))
all_features.update(AudioFeatureEngineering.extract_velocity_features(audio_features, performance))
# Target metrics
all_features['views_total'] = performance.views_total
all_features['viral_score'] = performance.viral_score
all_features['engagement_ratio'] = performance.engagement_ratio
return all_features
# ============================================================================
# MACHINE LEARNING MODELS
# ============================================================================
class ViralityPredictor:
"""Multi-model ensemble for predicting viral performance"""
def __init__(self):
self.xgb_model = None
self.scaler = StandardScaler()
self.feature_importance = {}
self.is_trained = False
def train(self, X: np.ndarray, y_views: np.ndarray, y_viral: np.ndarray):
"""Train XGBoost model on audio features -> views/viral score"""
# Scale features
X_scaled = self.scaler.fit_transform(X)
# Train XGBoost for views prediction
self.xgb_model = xgb.XGBRegressor(
n_estimators=200,
max_depth=8,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
objective='reg:squarederror'
)
self.xgb_model.fit(X_scaled, y_views)
# Store feature importance
if hasattr(self.xgb_model, 'feature_importances_'):
self.feature_importance = dict(enumerate(self.xgb_model.feature_importances_))
self.is_trained = True
def predict_views(self, X: np.ndarray) -> np.ndarray:
"""Predict view count"""
if not self.is_trained:
return np.zeros(X.shape[0])
X_scaled = self.scaler.transform(X)
return self.xgb_model.predict(X_scaled)
def predict_viral_score(self, X: np.ndarray) -> np.ndarray:
"""Predict viral score (proxy: predicted views converted to score)"""
views = self.predict_views(X)
return views / 1_000_000 * 0.5 # Simplified scoring
def get_top_features(self, n: int = 10) -> List[Tuple[int, float]]:
"""Get top N most important features"""
if not self.feature_importance:
return []
sorted_features = sorted(self.feature_importance.items(), key=lambda x: x[1], reverse=True)
return sorted_features[:n]
class SequenceModel(nn.Module):
"""LSTM for temporal audio pattern learning"""
def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 2):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.3)
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
lstm_out, _ = self.lstm(x)
return self.fc(lstm_out[:, -1, :])
class PatternClusterer:
"""Discover audio pattern clusters using embeddings"""
def __init__(self, n_clusters: int = 20):
self.n_clusters = n_clusters
self.kmeans = None
self.cluster_profiles = {}
def fit(self, embeddings: np.ndarray, performance_scores: np.ndarray):
"""Cluster audio patterns and compute cluster performance profiles"""
self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = self.kmeans.fit_predict(embeddings)
# Compute cluster profiles
for cluster_id in range(self.n_clusters):
mask = labels == cluster_id
cluster_scores = performance_scores[mask]
self.cluster_profiles[cluster_id] = {
'count': np.sum(mask),
'avg_score': np.mean(cluster_scores),
'std_score': np.std(cluster_scores),
'viral_probability': np.mean(cluster_scores > 5.0), # >5M views threshold
'centroid': self.kmeans.cluster_centers_[cluster_id]
}
return labels
def get_high_viral_clusters(self, threshold: float = 0.3) -> List[int]:
"""Get cluster IDs with high viral probability"""
return [cid for cid, profile in self.cluster_profiles.items()
if profile['viral_probability'] > threshold]
def predict_cluster(self, embedding: np.ndarray) -> int:
"""Predict cluster for new audio pattern"""
if self.kmeans is None:
return -1
return self.kmeans.predict(embedding.reshape(1, -1))[0]
class AnomalyDetector:
"""Detect unusual audio patterns (overperformers and underperformers)"""
def __init__(self):
self.model = IsolationForest(contamination=0.1, random_state=42)
def fit(self, X: np.ndarray):
"""Train anomaly detector"""
self.model.fit(X)
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict anomalies (-1 = anomaly, 1 = normal)"""
return self.model.predict(X)
def score(self, X: np.ndarray) -> np.ndarray:
"""Get anomaly scores (more negative = more anomalous)"""
return self.model.score_samples(X)
# ============================================================================
# CORE PATTERN LEARNER
# ============================================================================
class AudioPatternLearner:
"""
Main orchestrator for autonomous viral audio pattern learning.
Continuously learns from video performance and adapts to trends.
"""
def __init__(self, storage_path: str = "./pattern_learner_data"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(exist_ok=True)
# ML models
self.virality_predictor = ViralityPredictor()
self.pattern_clusterer = PatternClusterer(n_clusters=25)
self.anomaly_detector = AnomalyDetector()
# Pattern storage
self.discovered_patterns: Dict[str, AudioPattern] = {}
self.pattern_history: deque = deque(maxlen=10000)
# Feature tracking
self.feature_names: List[str] = []
self.niche_performance: Dict[str, Dict] = defaultdict(lambda: {
'total_videos': 0,
'avg_views': 0.0,
'top_patterns': []
})
# Reinforcement learning components
self.pattern_weights: Dict[str, float] = {}
self.replay_buffer: deque = deque(maxlen=5000)
# Caching
self.embedding_cache: Dict[str, np.ndarray] = {}
# Configuration
self.config = {
'viral_threshold': 5_000_000,
'min_sample_size': 10,
'pattern_decay_rate': 0.95,
'trend_window_days': 30,
'confidence_threshold': 0.7,
'update_frequency_hours': 6
}
# Logging
self.setup_logging()
def setup_logging(self):
"""Setup logging system"""
log_file = self.storage_path / "pattern_learner.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('AudioPatternLearner')
def ingest_video_batch(self, audio_features_list: List[AudioFeatures],
performance_list: List[PerformanceMetrics]):
"""
Ingest batch of video audio records with performance metrics.
Main entry point for continuous learning.
"""
self.logger.info(f"Ingesting batch of {len(audio_features_list)} videos")
if len(audio_features_list) != len(performance_list):
raise ValueError("Audio features and performance lists must be same length")
# Process each video
for audio_feat, perf in zip(audio_features_list, performance_list):
# Compute full feature set
engineered_features = AudioFeatureEngineering.compute_full_feature_set(audio_feat, perf)
# Store in replay buffer
self.replay_buffer.append({
'audio_features': audio_feat,
'performance': perf,
'engineered_features': engineered_features,
'timestamp': datetime.now()
})
# Update niche stats
self._update_niche_stats(audio_feat, perf)
# Trigger learning pipeline
self._train_models()
self._discover_patterns()
self._update_pattern_weights()
self._detect_anomalies()
self.logger.info(f"Batch processing complete. Total patterns: {len(self.discovered_patterns)}")
def _train_models(self):
"""Train/update all ML models"""
if len(self.replay_buffer) < self.config['min_sample_size']:
self.logger.warning("Insufficient data for training")
return
self.logger.info("Training ML models...")
# Prepare training data
X_list = []
y_views = []
y_viral = []
for record in self.replay_buffer:
features = record['engineered_features']
feature_vector = [features.get(k, 0.0) for k in sorted(features.keys())
if k not in ['views_total', 'viral_score', 'engagement_ratio']]
X_list.append(feature_vector)
y_views.append(record['performance'].views_total)
y_viral.append(record['performance'].viral_score)
X = np.array(X_list)
y_views = np.array(y_views)
y_viral = np.array(y_viral)
# Store feature names
sample_features = self.replay_buffer[0]['engineered_features']
self.feature_names = sorted([k for k in sample_features.keys()
if k not in ['views_total', 'viral_score', 'engagement_ratio']])
# Train virality predictor
self.virality_predictor.train(X, y_views, y_viral)
# Train anomaly detector
self.anomaly_detector.fit(X)
self.logger.info("Model training complete")
def _discover_patterns(self):
"""Discover viral audio patterns through clustering and analysis"""
if len(self.replay_buffer) < self.config['min_sample_size']:
return
self.logger.info("Discovering audio patterns...")
# Group by niche + platform
niche_platform_groups = defaultdict(list)
for record in self.replay_buffer:
audio = record['audio_features']
key = f"{audio.niche}_{audio.platform}_{audio.beat_type}"
niche_platform_groups[key].append(record)
# Analyze each group
for group_key, records in niche_platform_groups.items():
if len(records) < 5:
continue
# Filter for high performers (>5M views)
high_performers = [r for r in records if r['performance'].views_total >= self.config['viral_threshold']]
if len(high_performers) < 3:
continue
# Extract common patterns
pattern = self._extract_pattern_from_group(group_key, high_performers, records)
if pattern:
self.discovered_patterns[pattern.pattern_id] = pattern
self.logger.info(f"Discovered pattern: {pattern.pattern_id} (efficacy: {pattern.viral_efficacy_score:.3f})")
def _extract_pattern_from_group(self, group_key: str, high_performers: List[Dict],
all_records: List[Dict]) -> Optional[AudioPattern]:
"""Extract common audio pattern from high-performing videos"""
if not high_performers:
return None
# Compute median/mean characteristics
pace_vals = [r['audio_features'].pace_wpm for r in high_performers]
pitch_vals = [r['audio_features'].pitch_mean for r in high_performers]
energy_vals = [r['audio_features'].energy_mean for r in high_performers]
# Hook timing patterns
hook_timings = []
for r in high_performers:
if r['audio_features'].hook_timing_seconds:
hook_timings.extend(r['audio_features'].hook_timing_seconds[:3])
# Pause patterns
pause_patterns = []
for r in high_performers:
audio = r['audio_features']
if audio.pause_positions and audio.pause_durations:
for pos, dur in zip(audio.pause_positions[:5], audio.pause_durations[:5]):
pause_patterns.append((pos, dur))
# Emotion arcs
emotion_arcs = [r['audio_features'].emotion_trajectory for r in high_performers
if r['audio_features'].emotion_trajectory]
# Compute efficacy score
avg_views = np.mean([r['performance'].views_total for r in high_performers])
avg_completion = np.mean([r['performance'].completion_rate for r in high_performers])
viral_efficacy = (
(avg_views / 10_000_000) * 0.4 + # Normalize to 10M
avg_completion * 0.3 +
(len(high_performers) / len(all_records)) * 0.3
)
# Extract niche/platform from group key
parts = group_key.split('_')
niche = parts[0] if len(parts) > 0 else 'unknown'
platform = parts[1] if len(parts) > 1 else 'unknown'
# Create pattern
pattern = AudioPattern(
pattern_id=f"pattern_{group_key}_{int(datetime.now().timestamp())}",
niche=niche,
platform=platform,
optimal_pace=float(np.median(pace_vals)),
optimal_pitch_range=(float(np.percentile(pitch_vals, 25)), float(np.percentile(pitch_vals, 75))),
optimal_energy=float(np.median(energy_vals)),
hook_timings=hook_timings[:5] if hook_timings else [],
pause_pattern=pause_patterns[:5] if pause_patterns else [],
beat_alignment_target=0.95,
emotion_arc=emotion_arcs[0] if emotion_arcs else ['building', 'peak'],
viral_efficacy_score=viral_efficacy,
sample_count=len(high_performers),
avg_views=avg_views,
avg_completion=avg_completion,
confidence=min(len(high_performers) / 20, 1.0),
discovered_at=datetime.now(),
last_validated=datetime.now(),
trend_status='stable',
weight=1.0,
decay_rate=self.config['pattern_decay_rate']
)
return pattern
def _update_pattern_weights(self):
"""Update pattern weights based on recent performance (RL mechanism)"""
current_time = datetime.now()
for pattern_id, pattern in list(self.discovered_patterns.items()):
# Apply temporal decay
days_old = (current_time - pattern.last_validated).days
decay_factor = pattern.decay_rate ** days_old
pattern.weight *= decay_factor
# Remove patterns with very low weight
if pattern.weight < 0.1:
del self.discovered_patterns[pattern_id]
self.logger.info(f"Removed low-weight pattern: {pattern_id}")
continue
# Boost patterns that are still performing
recent_matches = self._find_recent_pattern_matches(pattern)
if recent_matches:
avg_recent_views = np.mean([m['performance'].views_total for m in recent_matches])
if avg_recent_views >= self.config['viral_threshold']:
pattern.weight *= 1.1
pattern.last_validated = current_time
def _find_recent_pattern_matches(self, pattern: AudioPattern, window_days: int = 7) -> List[Dict]:
"""Find recent videos matching this pattern"""
cutoff = datetime.now() - timedelta(days=window_days)
matches = []
for record in self.replay_buffer:
if record['timestamp'] < cutoff:
continue
audio = record['audio_features']
if audio.niche != pattern.niche or audio.platform != pattern.platform:
continue
# Check similarity
if abs(audio.pace_wpm - pattern.optimal_pace) < 20:
if pattern.optimal_pitch_range[0] <= audio.pitch_mean <= pattern.optimal_pitch_range[1]:
matches.append(record)
return matches
def _detect_anomalies(self):
"""Detect anomalous patterns (novel viral strategies)"""
if not self.anomaly_detector.model:
return
X_list = []
records = []
for record in list(self.replay_buffer)[-1000:]:
features = record['engineered_features']
feature_vector = [features.get(k, 0.0) for k in self.feature_names]
X_list.append(feature_vector)
records.append(record)
if not X_list:
return
X = np.array(X_list)
anomaly_scores = self.anomaly_detector.score(X)
# Find overperforming anomalies
for i, (score, record) in enumerate(zip(anomaly_scores, records)):
if score < -0.5 and record['performance'].views_total >= self.config['viral_threshold']:
self.logger.info(f"Anomalous high performer detected: {record['performance'].video_id}")
# Log for further analysis
self._log_anomaly(record, score)
def _log_anomaly(self, record: Dict, anomaly_score: float):
"""Log anomalous pattern for analysis"""
anomaly_log = {
'video_id': record['performance'].video_id,
'views': record['performance'].views_total,
'anomaly_score': float(anomaly_score),
'niche': record['audio_features'].niche,
'platform': record['audio_features'].platform,
'timestamp': datetime.now().isoformat()
}
log_path = self.storage_path / "anomalies.jsonl"
with open(log_path, 'a') as f:
f.write(json.dumps(anomaly_log) + '\n')
def _update_niche_stats(self, audio: AudioFeatures, perf: PerformanceMetrics):
"""Update performance statistics per niche"""
key = f"{audio.niche}_{audio.platform}"
stats = self.niche_performance[key]
stats['total_videos'] += 1
# Running average
n = stats['total_videos']
stats['avg_views'] = ((n - 1) * stats['avg_views'] + perf.views_total) / n
# ========================================================================
# PUBLIC API FOR TTS/VOICE-SYNC INTEGRATION
# ========================================================================
def get_recommended_audio_profile(self, niche: str, platform: str,
beat_type: str) -> Optional[PatternRecommendation]:
"""
Get recommended audio profile for TTS/voice-sync engines.
Returns optimal parameters for generating viral audio.
"""
# Find matching patterns
matching_patterns = [
p for p in self.discovered_patterns.values()
if p.niche == niche and p.platform == platform
]
if not matching_patterns:
self.logger.warning(f"No patterns found for {niche}/{platform}")
return None
# Get highest efficacy pattern
best_pattern = max(matching_patterns, key=lambda p: p.viral_efficacy_score * p.weight)
# Build recommendation
recommendation = PatternRecommendation(
niche=niche,
platform=platform,
beat_type=beat_type,
pace_wpm=best_pattern.optimal_pace,
pitch_base=best_pattern.optimal_pitch_range[0],
pitch_variance=(best_pattern.optimal_pitch_range[1] - best_pattern.optimal_pitch_range[0]) / 2,
energy_level=best_pattern.optimal_energy,
voice_style='dynamic',
hook_placements=best_pattern.hook_timings,
pause_placements=best_pattern.pause_pattern,
emphasis_words=[],
beat_alignment_rules={'target_error': best_pattern.beat_alignment_target},
syllable_timing_guide={},
predicted_viral_score=best_pattern.viral_efficacy_score,
confidence=best_pattern.confidence
)
self.logger.info(f"Generated recommendation for {niche}/{platform}: score={recommendation.predicted_viral_score:.3f}")
return recommendation
def predict_viral_success(self, audio_features: AudioFeatures) -> float:
"""
Predict viral success score for given audio features.
Returns score 0-10+ (higher = more likely to hit 5M+ views).
"""
if not self.virality_predictor.is_trained:
self.logger.warning("Model not trained yet")
return 0.0
# Create dummy performance for feature engineering
dummy_perf = PerformanceMetrics(
video_id='prediction',
views_total=0,
retention_2s=0.8,
retention_15s=0.5,
completion_rate=0.3,
replay_rate=0.1,
velocity_per_hour=1000,
velocity_per_day=10000,
likes=0,
comments=0,
shares=0,
saves=0,
platform=audio_features.platform,
upload_timestamp=datetime.now()
)
# Compute features
engineered_features = AudioFeatureEngineering.compute_full_feature_set(audio_features, dummy_perf)
feature_vector = [engineered_features.get(k, 0.0) for k in self.feature_names]
X = np.array(feature_vector).reshape(1, -1)
predicted_views = self.virality_predictor.predict_views(X)[0]
# Convert to score (0-10+)
viral_score = predicted_views / 1_000_000
return float(viral_score)
def get_top_patterns(self, n: int = 10, niche: Optional[str] = None) -> List[AudioPattern]:
"""Get top N patterns by viral efficacy"""
patterns = list(self.discovered_patterns.values())
if niche:
patterns = [p for p in patterns if p.niche == niche]
patterns.sort(key=lambda p: p.viral_efficacy_score * p.weight, reverse=True)
return patterns[:n]
def get_feature_importance(self) -> Dict[str, float]:
"""Get feature importance rankings"""
if not self.virality_predictor.feature_importance:
return {}
importance_dict = {}
for idx, importance in self.virality_predictor.feature_importance.items():
if idx < len(self.feature_names):
importance_dict[self.feature_names[idx]] = float(importance)
return dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True))
def get_niche_performance_summary(self) -> Dict[str, Dict]:
"""Get performance summary for all niches"""
return dict(self.niche_performance)
# ========================================================================
# PERSISTENCE & STATE MANAGEMENT
# ========================================================================
def save_state(self):
"""Save learner state to disk"""
self.logger.info("Saving learner state...")
state = {
'discovered_patterns': {k: self._pattern_to_dict(v) for k, v in self.discovered_patterns.items()},
'niche_performance': dict(self.niche_performance),
'feature_names': self.feature_names,
'config': self.config,
'timestamp': datetime.now().isoformat()
}
with open(self.storage_path / 'learner_state.json', 'w') as f:
json.dump(state, f, indent=2)
# Save models
if self.virality_predictor.is_trained:
with open(self.storage_path / 'virality_predictor.pkl', 'wb') as f:
pickle.dump(self.virality_predictor, f)
self.logger.info("State saved successfully")
def load_state(self):
"""Load learner state from disk"""
state_file = self.storage_path / 'learner_state.json'
if not state_file.exists():
self.logger.warning("No saved state found")
return
self.logger.info("Loading learner state...")
with open(state_file, 'r') as f:
state = json.load(f)
self.discovered_patterns = {k: self._dict_to_pattern(v) for k, v in state['discovered_patterns'].items()}
self.niche_performance = defaultdict(lambda: {'total_videos': 0, 'avg_views': 0.0, 'top_patterns': []},
state['niche_performance'])
self.feature_names = state['feature_names']
self.config.update(state['config'])
# Load models
model_file = self.storage_path / 'virality_predictor.pkl'
if model_file.exists():
with open(model_file, 'rb') as f:
self.virality_predictor = pickle.load(f)
self.logger.info(f"State loaded: {len(self.discovered_patterns)} patterns")
def _pattern_to_dict(self, pattern: AudioPattern) -> Dict:
"""Convert AudioPattern to JSON-serializable dict"""
return {
'pattern_id': pattern.pattern_id,
'niche': pattern.niche,
'platform': pattern.platform,
'optimal_pace': pattern.optimal_pace,
'optimal_pitch_range': pattern.optimal_pitch_range,
'optimal_energy': pattern.optimal_energy,
'hook_timings': pattern.hook_timings,
'pause_pattern': pattern.pause_pattern,
'beat_alignment_target': pattern.beat_alignment_target,
'emotion_arc': pattern.emotion_arc,
'viral_efficacy_score': pattern.viral_efficacy_score,
'sample_count': pattern.sample_count,
'avg_views': pattern.avg_views,
'avg_completion': pattern.avg_completion,
'confidence': pattern.confidence,
'discovered_at': pattern.discovered_at.isoformat(),
'last_validated': pattern.last_validated.isoformat(),
'trend_status': pattern.trend_status,
'weight': pattern.weight,
'decay_rate': pattern.decay_rate
}
def _dict_to_pattern(self, d: Dict) -> AudioPattern:
"""Convert dict back to AudioPattern"""
return AudioPattern(
pattern_id=d['pattern_id'],
niche=d['niche'],
platform=d['platform'],
optimal_pace=d['optimal_pace'],
optimal_pitch_range=tuple(d['optimal_pitch_range']),
optimal_energy=d['optimal_energy'],
hook_timings=d['hook_timings'],
pause_pattern=[tuple(p) for p in d['pause_pattern']],
beat_alignment_target=d['beat_alignment_target'],
emotion_arc=d['emotion_arc'],
viral_efficacy_score=d['viral_efficacy_score'],
sample_count=d['sample_count'],
avg_views=d['avg_views'],
avg_completion=d['avg_completion'],
confidence=d['confidence'],
discovered_at=datetime.fromisoformat(d['discovered_at']),
last_validated=datetime.fromisoformat(d['last_validated']),
trend_status=d['trend_status'],
weight=d['weight'],
decay_rate=d['decay_rate']
)
# ============================================================================
# EXAMPLE USAGE & INTEGRATION
# ============================================================================
if __name__ == "__main__":
# Initialize learner
learner = AudioPatternLearner(storage_path="./viral_audio_learner")
# Example: Load previous state
learner.load_state()
# Example: Ingest new video batch (would come from audio_performance_store.py)
sample_audio = AudioFeatures(
pace_wpm=165,
pitch_mean=220.0,
pitch_variance=50.0,
energy_mean=0.7,
energy_variance=0.15,
tempo_bpm=128,
hook_timing_seconds=[1.5, 8.2, 15.0],
hook_emphasis_amplitude=[0.9, 0.85, 0.8],
hook_pitch_jump=[50, 45, 40],
pause_durations=[0.3, 0.5, 0.4],
pause_positions=[5.0, 12.0, 20.0],
beat_alignment_error=0.05,
syllable_timing=[0.1, 0.3, 0.5, 0.7],
mfcc=np.random.randn(13, 100),
spectral_centroid=np.random.randn(100),
spectral_rolloff=np.random.randn(100),
zero_crossing_rate=np.random.randn(100),
chroma=np.random.randn(12, 100),
harmonic_noise_ratio=0.85,
emotion_trajectory=['building', 'peak', 'sustain'],
emotion_intensity=[0.6, 0.9, 0.7],
voice_tone='energetic',
phoneme_timing={'a': 0.1, 'e': 0.15},
niche='finance',
platform='tiktok',
beat_type='trap',
voice_style='male_young',
language='en',
music_track='trending_beat_001',
is_trending_beat=True,
trend_timestamp=datetime.now()
)
sample_performance = PerformanceMetrics(
video_id='vid_12345',
views_total=7_500_000,
retention_2s=0.85,
retention_15s=0.55,
completion_rate=0.35,
replay_rate=0.12,
velocity_per_hour=15000,
velocity_per_day=180000,
likes=450000,
comments=12000,
shares=85000,
saves=120000,
platform='tiktok',
upload_timestamp=datetime.now()
)
# Ingest batch
learner.ingest_video_batch([sample_audio], [sample_performance])
# Get recommendation for new video
recommendation = learner.get_recommended_audio_profile('finance', 'tiktok', 'trap')
if recommendation:
print(f"\nRecommendation for finance/tiktok/trap:")
print(f" Pace: {recommendation.pace_wpm} WPM")
print(f" Pitch: {recommendation.pitch_base} ± {recommendation.pitch_variance}")
print(f" Hook placements: {recommendation.hook_placements}")
print(f" Predicted viral score: {recommendation.predicted_viral_score:.2f}")
print(f" Confidence: {recommendation.confidence:.2%}")
# Predict viral success for new audio
viral_score = learner.predict_viral_success(sample_audio)
print(f"\nPredicted viral score: {viral_score:.2f}M views")
# Get top patterns
top_patterns = learner.get_top_patterns(n=5, niche='finance')
print(f"\nTop 5 patterns for finance:")
for i, pattern in enumerate(top_patterns, 1):
print(f" {i}. {pattern.pattern_id}: efficacy={pattern.viral_efficacy_score:.3f}, samples={pattern.sample_count}")
# Get feature importance
importance = learner.get_feature_importance()
print(f"\nTop 10 most important features:")
for feat, imp in list(importance.items())[:10]:
print(f" {feat}: {imp:.4f}")
# Save state
learner.save_state()
print("\n✅ Pattern learner ready for production use")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment