Created
December 30, 2025 18:33
-
-
Save bogged-broker/257ee5cd862768857a8e5efda506e892 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| audio_pattern_learner.py | |
| Autonomous machine learning system for identifying and predicting viral audio patterns. | |
| Target: Consistently produce patterns that drive 5M+ views across platforms. | |
| Core Features: | |
| - High-resolution audio feature ingestion & normalization | |
| - Advanced feature engineering (beat, hook, emotion, spectral) | |
| - Multi-model ML pipeline (XGBoost, LSTM, clustering) | |
| - Pattern discovery & ranking by viral efficacy | |
| - RL integration for continuous optimization | |
| - Real-time API for TTS/voice-sync integration | |
| - Cross-platform, cross-niche pattern learning | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict, deque | |
| import json | |
| import pickle | |
| from pathlib import Path | |
| import logging | |
| # ML/DL imports | |
| try: | |
| import xgboost as xgb | |
| from sklearn.cluster import KMeans, HDBSCAN | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.ensemble import IsolationForest | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| except ImportError: | |
| print("Warning: Some ML libraries not installed. Install: xgboost, scikit-learn, torch") | |
| # Audio processing | |
| try: | |
| import librosa | |
| from scipy import signal, stats | |
| except ImportError: | |
| print("Warning: Audio libraries not installed. Install: librosa, scipy") | |
| # ============================================================================ | |
| # DATA STRUCTURES | |
| # ============================================================================ | |
| @dataclass | |
| class AudioFeatures: | |
| """Comprehensive audio feature set for virality prediction""" | |
| # Basic features | |
| pace_wpm: float | |
| pitch_mean: float | |
| pitch_variance: float | |
| energy_mean: float | |
| energy_variance: float | |
| tempo_bpm: float | |
| # Hook & timing | |
| hook_timing_seconds: List[float] | |
| hook_emphasis_amplitude: List[float] | |
| hook_pitch_jump: List[float] | |
| pause_durations: List[float] | |
| pause_positions: List[float] | |
| beat_alignment_error: float | |
| syllable_timing: List[float] | |
| # Spectral & timbre | |
| mfcc: np.ndarray | |
| spectral_centroid: np.ndarray | |
| spectral_rolloff: np.ndarray | |
| zero_crossing_rate: np.ndarray | |
| chroma: np.ndarray | |
| harmonic_noise_ratio: float | |
| # Emotion & dynamics | |
| emotion_trajectory: List[str] # ['building', 'peak', 'sustain', 'release'] | |
| emotion_intensity: List[float] | |
| voice_tone: str | |
| phoneme_timing: Dict[str, float] | |
| # Context | |
| niche: str | |
| platform: str | |
| beat_type: str | |
| voice_style: str | |
| language: str | |
| music_track: Optional[str] | |
| is_trending_beat: bool | |
| trend_timestamp: datetime | |
| # Embeddings | |
| audio_embedding: Optional[np.ndarray] = None | |
| def to_feature_vector(self) -> np.ndarray: | |
| """Convert to flat feature vector for ML models""" | |
| features = [ | |
| self.pace_wpm, | |
| self.pitch_mean, | |
| self.pitch_variance, | |
| self.energy_mean, | |
| self.energy_variance, | |
| self.tempo_bpm, | |
| np.mean(self.hook_emphasis_amplitude) if self.hook_emphasis_amplitude else 0, | |
| np.mean(self.hook_pitch_jump) if self.hook_pitch_jump else 0, | |
| np.mean(self.pause_durations) if self.pause_durations else 0, | |
| self.beat_alignment_error, | |
| self.harmonic_noise_ratio, | |
| len(self.hook_timing_seconds), | |
| len(self.pause_durations), | |
| np.mean(self.emotion_intensity) if self.emotion_intensity else 0, | |
| ] | |
| # Add aggregated spectral features | |
| if self.mfcc is not None: | |
| features.extend(np.mean(self.mfcc, axis=1).tolist()[:13]) | |
| else: | |
| features.extend([0] * 13) | |
| return np.array(features) | |
| @dataclass | |
| class PerformanceMetrics: | |
| """Video performance metrics for learning""" | |
| video_id: str | |
| views_total: int | |
| retention_2s: float | |
| retention_15s: float | |
| completion_rate: float | |
| replay_rate: float | |
| velocity_per_hour: float | |
| velocity_per_day: float | |
| # Social engagement | |
| likes: int | |
| comments: int | |
| shares: int | |
| saves: int | |
| # Platform | |
| platform: str | |
| upload_timestamp: datetime | |
| # Derived metrics | |
| viral_score: float = 0.0 | |
| velocity_score: float = 0.0 | |
| engagement_ratio: float = 0.0 | |
| def __post_init__(self): | |
| """Calculate derived metrics""" | |
| self.viral_score = ( | |
| self.views_total / 1_000_000 * 0.3 + | |
| self.completion_rate * 0.2 + | |
| self.retention_2s * 0.15 + | |
| self.replay_rate * 0.15 + | |
| (self.shares / max(self.views_total, 1)) * 1000 * 0.2 | |
| ) | |
| self.velocity_score = ( | |
| self.velocity_per_hour * 0.4 + | |
| self.velocity_per_day / 24 * 0.6 | |
| ) | |
| if self.views_total > 0: | |
| self.engagement_ratio = (self.likes + self.comments * 2 + self.shares * 3) / self.views_total | |
| @dataclass | |
| class AudioPattern: | |
| """Discovered audio pattern with viral efficacy""" | |
| pattern_id: str | |
| niche: str | |
| platform: str | |
| # Pattern characteristics | |
| optimal_pace: float | |
| optimal_pitch_range: Tuple[float, float] | |
| optimal_energy: float | |
| hook_timings: List[float] | |
| pause_pattern: List[Tuple[float, float]] # (position, duration) | |
| beat_alignment_target: float | |
| emotion_arc: List[str] | |
| # Efficacy metrics | |
| viral_efficacy_score: float | |
| sample_count: int | |
| avg_views: float | |
| avg_completion: float | |
| confidence: float | |
| # Temporal | |
| discovered_at: datetime | |
| last_validated: datetime | |
| trend_status: str # 'rising', 'peaked', 'declining', 'stable' | |
| # Weights for RL | |
| weight: float = 1.0 | |
| decay_rate: float = 0.95 | |
| @dataclass | |
| class PatternRecommendation: | |
| """Recommendation for TTS/voice-sync engines""" | |
| niche: str | |
| platform: str | |
| beat_type: str | |
| # TTS parameters | |
| pace_wpm: float | |
| pitch_base: float | |
| pitch_variance: float | |
| energy_level: float | |
| voice_style: str | |
| # Timing patterns | |
| hook_placements: List[float] | |
| pause_placements: List[Tuple[float, float]] | |
| emphasis_words: List[str] | |
| # Beat sync | |
| beat_alignment_rules: Dict[str, float] | |
| syllable_timing_guide: Dict[str, float] | |
| # Predicted performance | |
| predicted_viral_score: float | |
| confidence: float | |
| # ============================================================================ | |
| # FEATURE ENGINEERING | |
| # ============================================================================ | |
| class AudioFeatureEngineering: | |
| """Advanced feature engineering for virality prediction""" | |
| @staticmethod | |
| def extract_beat_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract beat-related viral signals""" | |
| features = {} | |
| # Beat alignment score | |
| features['beat_alignment_score'] = 1.0 - audio_features.beat_alignment_error | |
| # Off-beat ratio (syllables that don't align) | |
| if audio_features.syllable_timing: | |
| beat_interval = 60.0 / audio_features.tempo_bpm | |
| off_beats = sum(1 for t in audio_features.syllable_timing | |
| if (t % beat_interval) > beat_interval * 0.3) | |
| features['off_beat_ratio'] = off_beats / len(audio_features.syllable_timing) | |
| else: | |
| features['off_beat_ratio'] = 0.0 | |
| # Hook-to-beat correlation | |
| if audio_features.hook_timing_seconds: | |
| features['hook_beat_sync'] = np.mean([ | |
| 1.0 - (t % (60.0 / audio_features.tempo_bpm)) / (60.0 / audio_features.tempo_bpm) | |
| for t in audio_features.hook_timing_seconds | |
| ]) | |
| else: | |
| features['hook_beat_sync'] = 0.0 | |
| # Beat trend match (is this a trending beat pattern?) | |
| features['beat_trend_match'] = 1.0 if audio_features.is_trending_beat else 0.0 | |
| # Beat innovation score (novelty detection) | |
| features['beat_innovation'] = audio_features.beat_alignment_error * 0.5 # Placeholder | |
| return features | |
| @staticmethod | |
| def extract_hook_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract hook-related viral signals""" | |
| features = {} | |
| if not audio_features.hook_timing_seconds: | |
| return {k: 0.0 for k in ['hook_count', 'hook_early_placement', 'hook_amplitude_avg', | |
| 'hook_pitch_jump_avg', 'hook_spacing_variance']} | |
| features['hook_count'] = len(audio_features.hook_timing_seconds) | |
| features['hook_early_placement'] = 1.0 if audio_features.hook_timing_seconds[0] < 3.0 else 0.0 | |
| features['hook_amplitude_avg'] = np.mean(audio_features.hook_emphasis_amplitude) | |
| features['hook_pitch_jump_avg'] = np.mean(audio_features.hook_pitch_jump) | |
| # Hook spacing consistency | |
| if len(audio_features.hook_timing_seconds) > 1: | |
| spacings = np.diff(audio_features.hook_timing_seconds) | |
| features['hook_spacing_variance'] = np.var(spacings) | |
| else: | |
| features['hook_spacing_variance'] = 0.0 | |
| return features | |
| @staticmethod | |
| def extract_emotion_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract emotion trajectory features""" | |
| features = {} | |
| if not audio_features.emotion_trajectory or not audio_features.emotion_intensity: | |
| return {k: 0.0 for k in ['emotion_arc_score', 'emotion_peak_early', | |
| 'emotion_intensity_avg', 'emotion_variance']} | |
| # Emotion arc score (building -> peak is viral) | |
| arc_map = {'building': 0.3, 'peak': 1.0, 'sustain': 0.7, 'release': 0.5} | |
| arc_scores = [arc_map.get(e, 0.5) for e in audio_features.emotion_trajectory] | |
| features['emotion_arc_score'] = np.mean(arc_scores) | |
| # Peak placement | |
| if 'peak' in audio_features.emotion_trajectory: | |
| peak_idx = audio_features.emotion_trajectory.index('peak') | |
| features['emotion_peak_early'] = 1.0 if peak_idx < len(arc_scores) * 0.4 else 0.0 | |
| else: | |
| features['emotion_peak_early'] = 0.0 | |
| features['emotion_intensity_avg'] = np.mean(audio_features.emotion_intensity) | |
| features['emotion_variance'] = np.var(audio_features.emotion_intensity) | |
| return features | |
| @staticmethod | |
| def extract_pause_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract pause placement patterns""" | |
| features = {} | |
| if not audio_features.pause_durations or not audio_features.pause_positions: | |
| return {k: 0.0 for k in ['pause_count', 'pause_avg_duration', | |
| 'pause_placement_score', 'pause_rewatch_correlation']} | |
| features['pause_count'] = len(audio_features.pause_durations) | |
| features['pause_avg_duration'] = np.mean(audio_features.pause_durations) | |
| # Strategic pause placement (after hooks, before reveals) | |
| strategic_positions = [p for p in audio_features.pause_positions if 2 < p < 8] | |
| features['pause_placement_score'] = len(strategic_positions) / max(len(audio_features.pause_positions), 1) | |
| # Pause-rewatch correlation (placeholder - would need replay timestamp data) | |
| features['pause_rewatch_correlation'] = performance.replay_rate * 0.5 | |
| return features | |
| @staticmethod | |
| def extract_spectral_features(audio_features: AudioFeatures) -> Dict[str, float]: | |
| """Extract spectral/timbre quality features""" | |
| features = {} | |
| if audio_features.spectral_centroid is not None: | |
| features['spectral_centroid_mean'] = np.mean(audio_features.spectral_centroid) | |
| features['spectral_centroid_var'] = np.var(audio_features.spectral_centroid) | |
| else: | |
| features['spectral_centroid_mean'] = 0.0 | |
| features['spectral_centroid_var'] = 0.0 | |
| if audio_features.spectral_rolloff is not None: | |
| features['spectral_rolloff_mean'] = np.mean(audio_features.spectral_rolloff) | |
| else: | |
| features['spectral_rolloff_mean'] = 0.0 | |
| if audio_features.zero_crossing_rate is not None: | |
| features['zero_crossing_mean'] = np.mean(audio_features.zero_crossing_rate) | |
| else: | |
| features['zero_crossing_mean'] = 0.0 | |
| features['harmonic_noise_ratio'] = audio_features.harmonic_noise_ratio | |
| return features | |
| @staticmethod | |
| def extract_velocity_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract velocity and virality signals""" | |
| features = {} | |
| features['velocity_per_hour'] = performance.velocity_per_hour | |
| features['velocity_per_day'] = performance.velocity_per_day | |
| features['velocity_score'] = performance.velocity_score | |
| # Retention correlation | |
| features['retention_2s_rate'] = performance.retention_2s | |
| features['retention_completion_ratio'] = performance.completion_rate / max(performance.retention_2s, 0.01) | |
| # Hook to retention correlation (placeholder) | |
| if audio_features.hook_timing_seconds: | |
| first_hook = audio_features.hook_timing_seconds[0] | |
| features['hook_retention_correlation'] = performance.retention_2s if first_hook < 2.0 else performance.retention_2s * 0.8 | |
| else: | |
| features['hook_retention_correlation'] = 0.0 | |
| return features | |
| @staticmethod | |
| def compute_full_feature_set(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Compute complete engineered feature set""" | |
| all_features = {} | |
| # Base features | |
| all_features['pace_wpm'] = audio_features.pace_wpm | |
| all_features['pitch_mean'] = audio_features.pitch_mean | |
| all_features['pitch_variance'] = audio_features.pitch_variance | |
| all_features['energy_mean'] = audio_features.energy_mean | |
| all_features['energy_variance'] = audio_features.energy_variance | |
| all_features['tempo_bpm'] = audio_features.tempo_bpm | |
| # Engineered features | |
| all_features.update(AudioFeatureEngineering.extract_beat_features(audio_features, performance)) | |
| all_features.update(AudioFeatureEngineering.extract_hook_features(audio_features, performance)) | |
| all_features.update(AudioFeatureEngineering.extract_emotion_features(audio_features, performance)) | |
| all_features.update(AudioFeatureEngineering.extract_pause_features(audio_features, performance)) | |
| all_features.update(AudioFeatureEngineering.extract_spectral_features(audio_features)) | |
| all_features.update(AudioFeatureEngineering.extract_velocity_features(audio_features, performance)) | |
| # Target metrics | |
| all_features['views_total'] = performance.views_total | |
| all_features['viral_score'] = performance.viral_score | |
| all_features['engagement_ratio'] = performance.engagement_ratio | |
| return all_features | |
| # ============================================================================ | |
| # MACHINE LEARNING MODELS | |
| # ============================================================================ | |
| class ViralityPredictor: | |
| """Multi-model ensemble for predicting viral performance""" | |
| def __init__(self): | |
| self.xgb_model = None | |
| self.scaler = StandardScaler() | |
| self.feature_importance = {} | |
| self.is_trained = False | |
| def train(self, X: np.ndarray, y_views: np.ndarray, y_viral: np.ndarray): | |
| """Train XGBoost model on audio features -> views/viral score""" | |
| # Scale features | |
| X_scaled = self.scaler.fit_transform(X) | |
| # Train XGBoost for views prediction | |
| self.xgb_model = xgb.XGBRegressor( | |
| n_estimators=200, | |
| max_depth=8, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| objective='reg:squarederror' | |
| ) | |
| self.xgb_model.fit(X_scaled, y_views) | |
| # Store feature importance | |
| if hasattr(self.xgb_model, 'feature_importances_'): | |
| self.feature_importance = dict(enumerate(self.xgb_model.feature_importances_)) | |
| self.is_trained = True | |
| def predict_views(self, X: np.ndarray) -> np.ndarray: | |
| """Predict view count""" | |
| if not self.is_trained: | |
| return np.zeros(X.shape[0]) | |
| X_scaled = self.scaler.transform(X) | |
| return self.xgb_model.predict(X_scaled) | |
| def predict_viral_score(self, X: np.ndarray) -> np.ndarray: | |
| """Predict viral score (proxy: predicted views converted to score)""" | |
| views = self.predict_views(X) | |
| return views / 1_000_000 * 0.5 # Simplified scoring | |
| def get_top_features(self, n: int = 10) -> List[Tuple[int, float]]: | |
| """Get top N most important features""" | |
| if not self.feature_importance: | |
| return [] | |
| sorted_features = sorted(self.feature_importance.items(), key=lambda x: x[1], reverse=True) | |
| return sorted_features[:n] | |
| class SequenceModel(nn.Module): | |
| """LSTM for temporal audio pattern learning""" | |
| def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 2): | |
| super().__init__() | |
| self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.3) | |
| self.fc = nn.Linear(hidden_size, 1) | |
| def forward(self, x): | |
| lstm_out, _ = self.lstm(x) | |
| return self.fc(lstm_out[:, -1, :]) | |
| class PatternClusterer: | |
| """Discover audio pattern clusters using embeddings""" | |
| def __init__(self, n_clusters: int = 20): | |
| self.n_clusters = n_clusters | |
| self.kmeans = None | |
| self.cluster_profiles = {} | |
| def fit(self, embeddings: np.ndarray, performance_scores: np.ndarray): | |
| """Cluster audio patterns and compute cluster performance profiles""" | |
| self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42) | |
| labels = self.kmeans.fit_predict(embeddings) | |
| # Compute cluster profiles | |
| for cluster_id in range(self.n_clusters): | |
| mask = labels == cluster_id | |
| cluster_scores = performance_scores[mask] | |
| self.cluster_profiles[cluster_id] = { | |
| 'count': np.sum(mask), | |
| 'avg_score': np.mean(cluster_scores), | |
| 'std_score': np.std(cluster_scores), | |
| 'viral_probability': np.mean(cluster_scores > 5.0), # >5M views threshold | |
| 'centroid': self.kmeans.cluster_centers_[cluster_id] | |
| } | |
| return labels | |
| def get_high_viral_clusters(self, threshold: float = 0.3) -> List[int]: | |
| """Get cluster IDs with high viral probability""" | |
| return [cid for cid, profile in self.cluster_profiles.items() | |
| if profile['viral_probability'] > threshold] | |
| def predict_cluster(self, embedding: np.ndarray) -> int: | |
| """Predict cluster for new audio pattern""" | |
| if self.kmeans is None: | |
| return -1 | |
| return self.kmeans.predict(embedding.reshape(1, -1))[0] | |
| class AnomalyDetector: | |
| """Detect unusual audio patterns (overperformers and underperformers)""" | |
| def __init__(self): | |
| self.model = IsolationForest(contamination=0.1, random_state=42) | |
| def fit(self, X: np.ndarray): | |
| """Train anomaly detector""" | |
| self.model.fit(X) | |
| def predict(self, X: np.ndarray) -> np.ndarray: | |
| """Predict anomalies (-1 = anomaly, 1 = normal)""" | |
| return self.model.predict(X) | |
| def score(self, X: np.ndarray) -> np.ndarray: | |
| """Get anomaly scores (more negative = more anomalous)""" | |
| return self.model.score_samples(X) | |
| # ============================================================================ | |
| # CORE PATTERN LEARNER | |
| # ============================================================================ | |
| class AudioPatternLearner: | |
| """ | |
| Main orchestrator for autonomous viral audio pattern learning. | |
| Continuously learns from video performance and adapts to trends. | |
| """ | |
| def __init__(self, storage_path: str = "./pattern_learner_data"): | |
| self.storage_path = Path(storage_path) | |
| self.storage_path.mkdir(exist_ok=True) | |
| # ML models | |
| self.virality_predictor = ViralityPredictor() | |
| self.pattern_clusterer = PatternClusterer(n_clusters=25) | |
| self.anomaly_detector = AnomalyDetector() | |
| # Pattern storage | |
| self.discovered_patterns: Dict[str, AudioPattern] = {} | |
| self.pattern_history: deque = deque(maxlen=10000) | |
| # Feature tracking | |
| self.feature_names: List[str] = [] | |
| self.niche_performance: Dict[str, Dict] = defaultdict(lambda: { | |
| 'total_videos': 0, | |
| 'avg_views': 0.0, | |
| 'top_patterns': [] | |
| }) | |
| # Reinforcement learning components | |
| self.pattern_weights: Dict[str, float] = {} | |
| self.replay_buffer: deque = deque(maxlen=5000) | |
| # Caching | |
| self.embedding_cache: Dict[str, np.ndarray] = {} | |
| # Configuration | |
| self.config = { | |
| 'viral_threshold': 5_000_000, | |
| 'min_sample_size': 10, | |
| 'pattern_decay_rate': 0.95, | |
| 'trend_window_days': 30, | |
| 'confidence_threshold': 0.7, | |
| 'update_frequency_hours': 6 | |
| } | |
| # Logging | |
| self.setup_logging() | |
| def setup_logging(self): | |
| """Setup logging system""" | |
| log_file = self.storage_path / "pattern_learner.log" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(log_file), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger('AudioPatternLearner') | |
| def ingest_video_batch(self, audio_features_list: List[AudioFeatures], | |
| performance_list: List[PerformanceMetrics]): | |
| """ | |
| Ingest batch of video audio records with performance metrics. | |
| Main entry point for continuous learning. | |
| """ | |
| self.logger.info(f"Ingesting batch of {len(audio_features_list)} videos") | |
| if len(audio_features_list) != len(performance_list): | |
| raise ValueError("Audio features and performance lists must be same length") | |
| # Process each video | |
| for audio_feat, perf in zip(audio_features_list, performance_list): | |
| # Compute full feature set | |
| engineered_features = AudioFeatureEngineering.compute_full_feature_set(audio_feat, perf) | |
| # Store in replay buffer | |
| self.replay_buffer.append({ | |
| 'audio_features': audio_feat, | |
| 'performance': perf, | |
| 'engineered_features': engineered_features, | |
| 'timestamp': datetime.now() | |
| }) | |
| # Update niche stats | |
| self._update_niche_stats(audio_feat, perf) | |
| # Trigger learning pipeline | |
| self._train_models() | |
| self._discover_patterns() | |
| self._update_pattern_weights() | |
| self._detect_anomalies() | |
| self.logger.info(f"Batch processing complete. Total patterns: {len(self.discovered_patterns)}") | |
| def _train_models(self): | |
| """Train/update all ML models""" | |
| if len(self.replay_buffer) < self.config['min_sample_size']: | |
| self.logger.warning("Insufficient data for training") | |
| return | |
| self.logger.info("Training ML models...") | |
| # Prepare training data | |
| X_list = [] | |
| y_views = [] | |
| y_viral = [] | |
| for record in self.replay_buffer: | |
| features = record['engineered_features'] | |
| feature_vector = [features.get(k, 0.0) for k in sorted(features.keys()) | |
| if k not in ['views_total', 'viral_score', 'engagement_ratio']] | |
| X_list.append(feature_vector) | |
| y_views.append(record['performance'].views_total) | |
| y_viral.append(record['performance'].viral_score) | |
| X = np.array(X_list) | |
| y_views = np.array(y_views) | |
| y_viral = np.array(y_viral) | |
| # Store feature names | |
| sample_features = self.replay_buffer[0]['engineered_features'] | |
| self.feature_names = sorted([k for k in sample_features.keys() | |
| if k not in ['views_total', 'viral_score', 'engagement_ratio']]) | |
| # Train virality predictor | |
| self.virality_predictor.train(X, y_views, y_viral) | |
| # Train anomaly detector | |
| self.anomaly_detector.fit(X) | |
| self.logger.info("Model training complete") | |
| def _discover_patterns(self): | |
| """Discover viral audio patterns through clustering and analysis""" | |
| if len(self.replay_buffer) < self.config['min_sample_size']: | |
| return | |
| self.logger.info("Discovering audio patterns...") | |
| # Group by niche + platform | |
| niche_platform_groups = defaultdict(list) | |
| for record in self.replay_buffer: | |
| audio = record['audio_features'] | |
| key = f"{audio.niche}_{audio.platform}_{audio.beat_type}" | |
| niche_platform_groups[key].append(record) | |
| # Analyze each group | |
| for group_key, records in niche_platform_groups.items(): | |
| if len(records) < 5: | |
| continue | |
| # Filter for high performers (>5M views) | |
| high_performers = [r for r in records if r['performance'].views_total >= self.config['viral_threshold']] | |
| if len(high_performers) < 3: | |
| continue | |
| # Extract common patterns | |
| pattern = self._extract_pattern_from_group(group_key, high_performers, records) | |
| if pattern: | |
| self.discovered_patterns[pattern.pattern_id] = pattern | |
| self.logger.info(f"Discovered pattern: {pattern.pattern_id} (efficacy: {pattern.viral_efficacy_score:.3f})") | |
| def _extract_pattern_from_group(self, group_key: str, high_performers: List[Dict], | |
| all_records: List[Dict]) -> Optional[AudioPattern]: | |
| """Extract common audio pattern from high-performing videos""" | |
| if not high_performers: | |
| return None | |
| # Compute median/mean characteristics | |
| pace_vals = [r['audio_features'].pace_wpm for r in high_performers] | |
| pitch_vals = [r['audio_features'].pitch_mean for r in high_performers] | |
| energy_vals = [r['audio_features'].energy_mean for r in high_performers] | |
| # Hook timing patterns | |
| hook_timings = [] | |
| for r in high_performers: | |
| if r['audio_features'].hook_timing_seconds: | |
| hook_timings.extend(r['audio_features'].hook_timing_seconds[:3]) | |
| # Pause patterns | |
| pause_patterns = [] | |
| for r in high_performers: | |
| audio = r['audio_features'] | |
| if audio.pause_positions and audio.pause_durations: | |
| for pos, dur in zip(audio.pause_positions[:5], audio.pause_durations[:5]): | |
| pause_patterns.append((pos, dur)) | |
| # Emotion arcs | |
| emotion_arcs = [r['audio_features'].emotion_trajectory for r in high_performers | |
| if r['audio_features'].emotion_trajectory] | |
| # Compute efficacy score | |
| avg_views = np.mean([r['performance'].views_total for r in high_performers]) | |
| avg_completion = np.mean([r['performance'].completion_rate for r in high_performers]) | |
| viral_efficacy = ( | |
| (avg_views / 10_000_000) * 0.4 + # Normalize to 10M | |
| avg_completion * 0.3 + | |
| (len(high_performers) / len(all_records)) * 0.3 | |
| ) | |
| # Extract niche/platform from group key | |
| parts = group_key.split('_') | |
| niche = parts[0] if len(parts) > 0 else 'unknown' | |
| platform = parts[1] if len(parts) > 1 else 'unknown' | |
| # Create pattern | |
| pattern = AudioPattern( | |
| pattern_id=f"pattern_{group_key}_{int(datetime.now().timestamp())}", | |
| niche=niche, | |
| platform=platform, | |
| optimal_pace=float(np.median(pace_vals)), | |
| optimal_pitch_range=(float(np.percentile(pitch_vals, 25)), float(np.percentile(pitch_vals, 75))), | |
| optimal_energy=float(np.median(energy_vals)), | |
| hook_timings=hook_timings[:5] if hook_timings else [], | |
| pause_pattern=pause_patterns[:5] if pause_patterns else [], | |
| beat_alignment_target=0.95, | |
| emotion_arc=emotion_arcs[0] if emotion_arcs else ['building', 'peak'], | |
| viral_efficacy_score=viral_efficacy, | |
| sample_count=len(high_performers), | |
| avg_views=avg_views, | |
| avg_completion=avg_completion, | |
| confidence=min(len(high_performers) / 20, 1.0), | |
| discovered_at=datetime.now(), | |
| last_validated=datetime.now(), | |
| trend_status='stable', | |
| weight=1.0, | |
| decay_rate=self.config['pattern_decay_rate'] | |
| ) | |
| return pattern | |
| def _update_pattern_weights(self): | |
| """Update pattern weights based on recent performance (RL mechanism)""" | |
| current_time = datetime.now() | |
| for pattern_id, pattern in list(self.discovered_patterns.items()): | |
| # Apply temporal decay | |
| days_old = (current_time - pattern.last_validated).days | |
| decay_factor = pattern.decay_rate ** days_old | |
| pattern.weight *= decay_factor | |
| # Remove patterns with very low weight | |
| if pattern.weight < 0.1: | |
| del self.discovered_patterns[pattern_id] | |
| self.logger.info(f"Removed low-weight pattern: {pattern_id}") | |
| continue | |
| # Boost patterns that are still performing | |
| recent_matches = self._find_recent_pattern_matches(pattern) | |
| if recent_matches: | |
| avg_recent_views = np.mean([m['performance'].views_total for m in recent_matches]) | |
| if avg_recent_views >= self.config['viral_threshold']: | |
| pattern.weight *= 1.1 | |
| pattern.last_validated = current_time | |
| def _find_recent_pattern_matches(self, pattern: AudioPattern, window_days: int = 7) -> List[Dict]: | |
| """Find recent videos matching this pattern""" | |
| cutoff = datetime.now() - timedelta(days=window_days) | |
| matches = [] | |
| for record in self.replay_buffer: | |
| if record['timestamp'] < cutoff: | |
| continue | |
| audio = record['audio_features'] | |
| if audio.niche != pattern.niche or audio.platform != pattern.platform: | |
| continue | |
| # Check similarity | |
| if abs(audio.pace_wpm - pattern.optimal_pace) < 20: | |
| if pattern.optimal_pitch_range[0] <= audio.pitch_mean <= pattern.optimal_pitch_range[1]: | |
| matches.append(record) | |
| return matches | |
| def _detect_anomalies(self): | |
| """Detect anomalous patterns (novel viral strategies)""" | |
| if not self.anomaly_detector.model: | |
| return | |
| X_list = [] | |
| records = [] | |
| for record in list(self.replay_buffer)[-1000:]: | |
| features = record['engineered_features'] | |
| feature_vector = [features.get(k, 0.0) for k in self.feature_names] | |
| X_list.append(feature_vector) | |
| records.append(record) | |
| if not X_list: | |
| return | |
| X = np.array(X_list) | |
| anomaly_scores = self.anomaly_detector.score(X) | |
| # Find overperforming anomalies | |
| for i, (score, record) in enumerate(zip(anomaly_scores, records)): | |
| if score < -0.5 and record['performance'].views_total >= self.config['viral_threshold']: | |
| self.logger.info(f"Anomalous high performer detected: {record['performance'].video_id}") | |
| # Log for further analysis | |
| self._log_anomaly(record, score) | |
| def _log_anomaly(self, record: Dict, anomaly_score: float): | |
| """Log anomalous pattern for analysis""" | |
| anomaly_log = { | |
| 'video_id': record['performance'].video_id, | |
| 'views': record['performance'].views_total, | |
| 'anomaly_score': float(anomaly_score), | |
| 'niche': record['audio_features'].niche, | |
| 'platform': record['audio_features'].platform, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| log_path = self.storage_path / "anomalies.jsonl" | |
| with open(log_path, 'a') as f: | |
| f.write(json.dumps(anomaly_log) + '\n') | |
| def _update_niche_stats(self, audio: AudioFeatures, perf: PerformanceMetrics): | |
| """Update performance statistics per niche""" | |
| key = f"{audio.niche}_{audio.platform}" | |
| stats = self.niche_performance[key] | |
| stats['total_videos'] += 1 | |
| # Running average | |
| n = stats['total_videos'] | |
| stats['avg_views'] = ((n - 1) * stats['avg_views'] + perf.views_total) / n | |
| # ======================================================================== | |
| # PUBLIC API FOR TTS/VOICE-SYNC INTEGRATION | |
| # ======================================================================== | |
| def get_recommended_audio_profile(self, niche: str, platform: str, | |
| beat_type: str) -> Optional[PatternRecommendation]: | |
| """ | |
| Get recommended audio profile for TTS/voice-sync engines. | |
| Returns optimal parameters for generating viral audio. | |
| """ | |
| # Find matching patterns | |
| matching_patterns = [ | |
| p for p in self.discovered_patterns.values() | |
| if p.niche == niche and p.platform == platform | |
| ] | |
| if not matching_patterns: | |
| self.logger.warning(f"No patterns found for {niche}/{platform}") | |
| return None | |
| # Get highest efficacy pattern | |
| best_pattern = max(matching_patterns, key=lambda p: p.viral_efficacy_score * p.weight) | |
| # Build recommendation | |
| recommendation = PatternRecommendation( | |
| niche=niche, | |
| platform=platform, | |
| beat_type=beat_type, | |
| pace_wpm=best_pattern.optimal_pace, | |
| pitch_base=best_pattern.optimal_pitch_range[0], | |
| pitch_variance=(best_pattern.optimal_pitch_range[1] - best_pattern.optimal_pitch_range[0]) / 2, | |
| energy_level=best_pattern.optimal_energy, | |
| voice_style='dynamic', | |
| hook_placements=best_pattern.hook_timings, | |
| pause_placements=best_pattern.pause_pattern, | |
| emphasis_words=[], | |
| beat_alignment_rules={'target_error': best_pattern.beat_alignment_target}, | |
| syllable_timing_guide={}, | |
| predicted_viral_score=best_pattern.viral_efficacy_score, | |
| confidence=best_pattern.confidence | |
| ) | |
| self.logger.info(f"Generated recommendation for {niche}/{platform}: score={recommendation.predicted_viral_score:.3f}") | |
| return recommendation | |
| def predict_viral_success(self, audio_features: AudioFeatures) -> float: | |
| """ | |
| Predict viral success score for given audio features. | |
| Returns score 0-10+ (higher = more likely to hit 5M+ views). | |
| """ | |
| if not self.virality_predictor.is_trained: | |
| self.logger.warning("Model not trained yet") | |
| return 0.0 | |
| # Create dummy performance for feature engineering | |
| dummy_perf = PerformanceMetrics( | |
| video_id='prediction', | |
| views_total=0, | |
| retention_2s=0.8, | |
| retention_15s=0.5, | |
| completion_rate=0.3, | |
| replay_rate=0.1, | |
| velocity_per_hour=1000, | |
| velocity_per_day=10000, | |
| likes=0, | |
| comments=0, | |
| shares=0, | |
| saves=0, | |
| platform=audio_features.platform, | |
| upload_timestamp=datetime.now() | |
| ) | |
| # Compute features | |
| engineered_features = AudioFeatureEngineering.compute_full_feature_set(audio_features, dummy_perf) | |
| feature_vector = [engineered_features.get(k, 0.0) for k in self.feature_names] | |
| X = np.array(feature_vector).reshape(1, -1) | |
| predicted_views = self.virality_predictor.predict_views(X)[0] | |
| # Convert to score (0-10+) | |
| viral_score = predicted_views / 1_000_000 | |
| return float(viral_score) | |
| def get_top_patterns(self, n: int = 10, niche: Optional[str] = None) -> List[AudioPattern]: | |
| """Get top N patterns by viral efficacy""" | |
| patterns = list(self.discovered_patterns.values()) | |
| if niche: | |
| patterns = [p for p in patterns if p.niche == niche] | |
| patterns.sort(key=lambda p: p.viral_efficacy_score * p.weight, reverse=True) | |
| return patterns[:n] | |
| def get_feature_importance(self) -> Dict[str, float]: | |
| """Get feature importance rankings""" | |
| if not self.virality_predictor.feature_importance: | |
| return {} | |
| importance_dict = {} | |
| for idx, importance in self.virality_predictor.feature_importance.items(): | |
| if idx < len(self.feature_names): | |
| importance_dict[self.feature_names[idx]] = float(importance) | |
| return dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)) | |
| def get_niche_performance_summary(self) -> Dict[str, Dict]: | |
| """Get performance summary for all niches""" | |
| return dict(self.niche_performance) | |
| # ======================================================================== | |
| # PERSISTENCE & STATE MANAGEMENT | |
| # ======================================================================== | |
| def save_state(self): | |
| """Save learner state to disk""" | |
| self.logger.info("Saving learner state...") | |
| state = { | |
| 'discovered_patterns': {k: self._pattern_to_dict(v) for k, v in self.discovered_patterns.items()}, | |
| 'niche_performance': dict(self.niche_performance), | |
| 'feature_names': self.feature_names, | |
| 'config': self.config, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| with open(self.storage_path / 'learner_state.json', 'w') as f: | |
| json.dump(state, f, indent=2) | |
| # Save models | |
| if self.virality_predictor.is_trained: | |
| with open(self.storage_path / 'virality_predictor.pkl', 'wb') as f: | |
| pickle.dump(self.virality_predictor, f) | |
| self.logger.info("State saved successfully") | |
| def load_state(self): | |
| """Load learner state from disk""" | |
| state_file = self.storage_path / 'learner_state.json' | |
| if not state_file.exists(): | |
| self.logger.warning("No saved state found") | |
| return | |
| self.logger.info("Loading learner state...") | |
| with open(state_file, 'r') as f: | |
| state = json.load(f) | |
| self.discovered_patterns = {k: self._dict_to_pattern(v) for k, v in state['discovered_patterns'].items()} | |
| self.niche_performance = defaultdict(lambda: {'total_videos': 0, 'avg_views': 0.0, 'top_patterns': []}, | |
| state['niche_performance']) | |
| self.feature_names = state['feature_names'] | |
| self.config.update(state['config']) | |
| # Load models | |
| model_file = self.storage_path / 'virality_predictor.pkl' | |
| if model_file.exists(): | |
| with open(model_file, 'rb') as f: | |
| self.virality_predictor = pickle.load(f) | |
| self.logger.info(f"State loaded: {len(self.discovered_patterns)} patterns") | |
| def _pattern_to_dict(self, pattern: AudioPattern) -> Dict: | |
| """Convert AudioPattern to JSON-serializable dict""" | |
| return { | |
| 'pattern_id': pattern.pattern_id, | |
| 'niche': pattern.niche, | |
| 'platform': pattern.platform, | |
| 'optimal_pace': pattern.optimal_pace, | |
| 'optimal_pitch_range': pattern.optimal_pitch_range, | |
| 'optimal_energy': pattern.optimal_energy, | |
| 'hook_timings': pattern.hook_timings, | |
| 'pause_pattern': pattern.pause_pattern, | |
| 'beat_alignment_target': pattern.beat_alignment_target, | |
| 'emotion_arc': pattern.emotion_arc, | |
| 'viral_efficacy_score': pattern.viral_efficacy_score, | |
| 'sample_count': pattern.sample_count, | |
| 'avg_views': pattern.avg_views, | |
| 'avg_completion': pattern.avg_completion, | |
| 'confidence': pattern.confidence, | |
| 'discovered_at': pattern.discovered_at.isoformat(), | |
| 'last_validated': pattern.last_validated.isoformat(), | |
| 'trend_status': pattern.trend_status, | |
| 'weight': pattern.weight, | |
| 'decay_rate': pattern.decay_rate | |
| } | |
| def _dict_to_pattern(self, d: Dict) -> AudioPattern: | |
| """Convert dict back to AudioPattern""" | |
| return AudioPattern( | |
| pattern_id=d['pattern_id'], | |
| niche=d['niche'], | |
| platform=d['platform'], | |
| optimal_pace=d['optimal_pace'], | |
| optimal_pitch_range=tuple(d['optimal_pitch_range']), | |
| optimal_energy=d['optimal_energy'], | |
| hook_timings=d['hook_timings'], | |
| pause_pattern=[tuple(p) for p in d['pause_pattern']], | |
| beat_alignment_target=d['beat_alignment_target'], | |
| emotion_arc=d['emotion_arc'], | |
| viral_efficacy_score=d['viral_efficacy_score'], | |
| sample_count=d['sample_count'], | |
| avg_views=d['avg_views'], | |
| avg_completion=d['avg_completion'], | |
| confidence=d['confidence'], | |
| discovered_at=datetime.fromisoformat(d['discovered_at']), | |
| last_validated=datetime.fromisoformat(d['last_validated']), | |
| trend_status=d['trend_status'], | |
| weight=d['weight'], | |
| decay_rate=d['decay_rate'] | |
| ) | |
| # ============================================================================ | |
| # EXAMPLE USAGE & INTEGRATION | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| # Initialize learner | |
| learner = AudioPatternLearner(storage_path="./viral_audio_learner") | |
| # Example: Load previous state | |
| learner.load_state() | |
| # Example: Ingest new video batch (would come from audio_performance_store.py) | |
| sample_audio = AudioFeatures( | |
| pace_wpm=165, | |
| pitch_mean=220.0, | |
| pitch_variance=50.0, | |
| energy_mean=0.7, | |
| energy_variance=0.15, | |
| tempo_bpm=128, | |
| hook_timing_seconds=[1.5, 8.2, 15.0], | |
| hook_emphasis_amplitude=[0.9, 0.85, 0.8], | |
| hook_pitch_jump=[50, 45, 40], | |
| pause_durations=[0.3, 0.5, 0.4], | |
| pause_positions=[5.0, 12.0, 20.0], | |
| beat_alignment_error=0.05, | |
| syllable_timing=[0.1, 0.3, 0.5, 0.7], | |
| mfcc=np.random.randn(13, 100), | |
| spectral_centroid=np.random.randn(100), | |
| spectral_rolloff=np.random.randn(100), | |
| zero_crossing_rate=np.random.randn(100), | |
| chroma=np.random.randn(12, 100), | |
| harmonic_noise_ratio=0.85, | |
| emotion_trajectory=['building', 'peak', 'sustain'], | |
| emotion_intensity=[0.6, 0.9, 0.7], | |
| voice_tone='energetic', | |
| phoneme_timing={'a': 0.1, 'e': 0.15}, | |
| niche='finance', | |
| platform='tiktok', | |
| beat_type='trap', | |
| voice_style='male_young', | |
| language='en', | |
| music_track='trending_beat_001', | |
| is_trending_beat=True, | |
| trend_timestamp=datetime.now() | |
| ) | |
| sample_performance = PerformanceMetrics( | |
| video_id='vid_12345', | |
| views_total=7_500_000, | |
| retention_2s=0.85, | |
| retention_15s=0.55, | |
| completion_rate=0.35, | |
| replay_rate=0.12, | |
| velocity_per_hour=15000, | |
| velocity_per_day=180000, | |
| likes=450000, | |
| comments=12000, | |
| shares=85000, | |
| saves=120000, | |
| platform='tiktok', | |
| upload_timestamp=datetime.now() | |
| ) | |
| # Ingest batch | |
| learner.ingest_video_batch([sample_audio], [sample_performance]) | |
| # Get recommendation for new video | |
| recommendation = learner.get_recommended_audio_profile('finance', 'tiktok', 'trap') | |
| if recommendation: | |
| print(f"\nRecommendation for finance/tiktok/trap:") | |
| print(f" Pace: {recommendation.pace_wpm} WPM") | |
| print(f" Pitch: {recommendation.pitch_base} ± {recommendation.pitch_variance}") | |
| print(f" Hook placements: {recommendation.hook_placements}") | |
| print(f" Predicted viral score: {recommendation.predicted_viral_score:.2f}") | |
| print(f" Confidence: {recommendation.confidence:.2%}") | |
| # Predict viral success for new audio | |
| viral_score = learner.predict_viral_success(sample_audio) | |
| print(f"\nPredicted viral score: {viral_score:.2f}M views") | |
| # Get top patterns | |
| top_patterns = learner.get_top_patterns(n=5, niche='finance') | |
| print(f"\nTop 5 patterns for finance:") | |
| for i, pattern in enumerate(top_patterns, 1): | |
| print(f" {i}. {pattern.pattern_id}: efficacy={pattern.viral_efficacy_score:.3f}, samples={pattern.sample_count}") | |
| # Get feature importance | |
| importance = learner.get_feature_importance() | |
| print(f"\nTop 10 most important features:") | |
| for feat, imp in list(importance.items())[:10]: | |
| print(f" {feat}: {imp:.4f}") | |
| # Save state | |
| learner.save_state() | |
| print("\n✅ Pattern learner ready for production use") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment