Created
December 31, 2025 01:06
-
-
Save bogged-broker/8ad475a28c5c729b293c0f1f1707abcf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ======================================================================== | |
| # CRITICAL: CONTINUOUS FEEDBACK & CALIBRATION | |
| # ======================================================================== | |
| def update_pattern_performance(self, pattern_id: str, actual_metrics: PerformanceMetrics): | |
| """ | |
| Update pattern with actual performance metrics (CLOSES THE FEEDBACK LOOP). | |
| This is called AFTER video posts to calibrate predictions. | |
| """ | |
| if pattern_id not in self.discovered_patterns: | |
| self.logger.warning(f"Pattern {pattern_id} not found for update") | |
| return | |
| pattern = self.discovered_patterns[pattern_id] | |
| # Update pattern statistics with actual performance | |
| n = pattern.sample_count | |
| pattern.sample_count += 1 | |
| # Running average update | |
| pattern.avg_views = ((n * pattern.avg_views) + actual_metrics.views_total) / (n + 1) | |
| pattern.avg_completion = ((n * pattern.avg_completion) + actual_metrics.completion_rate) / (n + 1) | |
| # Recalculate viral efficacy with new data | |
| actual_viral_score = actual_metrics.viral_score | |
| pattern.viral_efficacy_score = ( | |
| (pattern.avg_views / 10_000_000) * 0.4 + | |
| pattern.avg_completion * 0.3 + | |
| (pattern.sample_count / (pattern.sample_count + 10)) * 0.3 # Sample size confidence | |
| ) | |
| # Update confidence based on prediction accuracy | |
| if hasattr(pattern, 'recent_predictions'): | |
| pattern.recent_predictions.append(actual_viral_score) | |
| if len(pattern.recent_predictions) > 10: | |
| pattern.recent_predictions.pop(0) | |
| # Compute prediction variance (lower = more confident) | |
| variance = np.var(pattern.recent_predictions) if len(pattern.recent_predictions) > 2 else 1.0 | |
| pattern.confidence = np.clip(1.0 - (variance / 10.0), 0.3, 0.95) | |
| else: | |
| pattern.recent_predictions = [actual_viral_score] | |
| # Boost weight if exceeded 5M threshold | |
| if actual_metrics.views_total >= self.config['viral_threshold']: | |
| pattern.weight *= 1.15 | |
| pattern.trend_status = 'rising' | |
| self.logger.info(f"โ Pattern {pattern_id} reinforced: {actual_metrics.views_total:,} views") | |
| else: | |
| # Penalize if underperformed | |
| pattern.weight *= 0.90 | |
| if pattern.weight < 0.5: | |
| pattern.trend_status = 'declining' | |
| pattern.last_validated = datetime.now() | |
| # Store in performance history | |
| self.pattern_history.append({ | |
| 'pattern_id': pattern_id, | |
| 'predicted': pattern.viral_efficacy_score, | |
| 'actual': actual_viral_score, | |
| 'error': abs(pattern.viral_efficacy_score - actual_viral_score), | |
| 'timestamp': datetime.now() | |
| }) | |
| self.logger.info(f"Pattern {pattern_id} updated: efficacy={pattern.viral_efficacy_score:.3f}, " | |
| f"confidence={pattern.confidence:.2%}, weight={pattern.weight:.2f}") | |
| def reinforce_pattern(self, pattern_id: str, reward_value: float): | |
| """ | |
| RL-style pattern reinforcement based on reward signal. | |
| Reward > 0 = exceeded expectations, < 0 = underperformed. | |
| """ | |
| if pattern_id not in self.discovered_patterns: | |
| return | |
| pattern = self.discovered_patterns[pattern_id] | |
| # Apply reward to weight (exponential moving average) | |
| learning_rate = 0.1 | |
| pattern.weight = pattern.weight * (1 - learning_rate) + reward_value * learning_rate | |
| pattern.weight = np.clip(pattern.weight, 0.1, 3.0) | |
| # Update trend status based on reward | |
| if reward_value > 1.5: | |
| pattern.trend_status = 'rising' | |
| elif reward_value > 0.8: | |
| pattern.trend_status = 'stable' | |
| else: | |
| pattern.trend_status = 'declining' | |
| # Store reward in RL history | |
| self.rl_reward_history.append({ | |
| 'pattern_id': pattern_id, | |
| 'reward': reward_value, | |
| 'weight_after': pattern.weight, | |
| 'timestamp': datetime.now() | |
| }) | |
| self.logger.info(f"Pattern {pattern_id} reinforced: reward={reward_value:.2f}, new_weight={pattern.weight:.2f}") | |
| def predict_viral_probability(self, audio_features: AudioFeatures, | |
| context: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| CRITICAL: Pre-post viral prediction with decision recommendation. | |
| Returns: predicted_views, confidence, recommendation (POST/REVISE/HOLD). | |
| """ | |
| # Get base prediction | |
| viral_score, confidence, breakdown = self.predict_viral_success(audio_features, return_confidence=True) | |
| # Get recommended profile for comparison | |
| recommended = self.get_recommended_audio_profile( | |
| audio_features.niche, | |
| audio_features.platform, | |
| audio_features.beat_type, | |
| use_rl=True | |
| ) | |
| # Compute alignment score with optimal pattern | |
| alignment_score = 0.5 | |
| if recommended: | |
| pace_alignment = 1.0 - abs(audio_features.pace_wpm - recommended.pace_wpm) / 50.0 | |
| pitch_alignment = 1.0 - abs(audio_features.pitch_mean - recommended.pitch_base) / 100.0 | |
| energy_alignment = 1.0 - abs(audio_features.energy_mean - recommended.energy_level) / 0.3 | |
| alignment_score = np.clip((pace_alignment + pitch_alignment + energy_alignment) / 3, 0, 1) | |
| # Factor in trend context | |
| trend_boost = 1.0 | |
| if context.get('trending_beat', False) or audio_features.is_trending_beat: | |
| trend_boost = 1.2 | |
| if context.get('viral_window', False): # Peak posting hours | |
| trend_boost *= 1.1 | |
| # Final adjusted prediction | |
| adjusted_score = viral_score * trend_boost * (0.7 + 0.3 * alignment_score) | |
| # Decision logic | |
| if adjusted_score >= 5.0 and confidence >= 0.7: | |
| decision = "POST" | |
| reason = "High viral probability with strong confidence" | |
| elif adjusted_score >= 3.5 and confidence >= 0.6: | |
| decision = "POST" | |
| reason = "Good viral potential, acceptable confidence" | |
| elif adjusted_score >= 2.0 and alignment_score < 0.6: | |
| decision = "REVISE" | |
| reason = "Moderate potential but poor alignment with optimal pattern" | |
| elif confidence < 0.5: | |
| decision = "REVISE" | |
| reason = "Low confidence in prediction" | |
| else: | |
| decision = "HOLD" | |
| reason = "Insufficient viral probability" | |
| return { | |
| 'predicted_views': breakdown['predicted_views'], | |
| 'predicted_viral_score': adjusted_score, | |
| 'confidence': confidence, | |
| 'alignment_score': alignment_score, | |
| 'trend_boost': trend_boost, | |
| 'decision': decision, | |
| 'reason': reason, | |
| 'breakdown': breakdown, | |
| 'recommended_profile': recommended | |
| } | |
| # ======================================================================== | |
| # CRITICAL: TREND-AWARE MULTIMODAL CONTEXT | |
| # ======================================================================== | |
| def update_trend_context(self, pattern_id: str, trend_data: Dict[str, Any]): | |
| """ | |
| Update pattern with current platform trends and temporal signals. | |
| Trend data includes: trending_sounds, viral_spike_windows, cultural_signals. | |
| """ | |
| if pattern_id not in self.discovered_patterns: | |
| return | |
| pattern = self.discovered_patterns[pattern_id] | |
| # Update trend status based on data | |
| if trend_data.get('is_trending', False): | |
| pattern.weight *= 1.2 | |
| pattern.trend_status = 'rising' | |
| if trend_data.get('sound_trending', False): | |
| pattern.weight *= 1.15 | |
| # Temporal boost (hour of day, day of week) | |
| hour_boost = trend_data.get('hour_boost', 1.0) | |
| day_boost = trend_data.get('day_boost', 1.0) | |
| pattern.weight *= (hour_boost * day_boost) | |
| # Store trend context in pattern metadata | |
| if not hasattr(pattern, 'trend_metadata'): | |
| pattern.trend_metadata = {} | |
| pattern.trend_metadata.update({ | |
| 'last_trend_update': datetime.now(), | |
| 'trending_score': trend_data.get('trending_score', 0.0), | |
| 'platform_saturation': trend_data.get('saturation', 0.0), | |
| 'viral_window_active': trend_data.get('viral_window', False) | |
| }) | |
| self.logger.info(f"Pattern {pattern_id} trend context updated: weight={pattern.weight:.2f}") | |
| def get_similar_patterns(self, audio_features: AudioFeatures, niche: str, | |
| platform: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| CRITICAL: Cross-video pattern learning via similarity search. | |
| Returns top-K similar high-performing patterns. | |
| """ | |
| # Filter patterns by niche/platform | |
| candidate_patterns = [ | |
| p for p in self.discovered_patterns.values() | |
| if p.niche == niche and p.platform == platform and p.avg_views >= 1_000_000 | |
| ] | |
| if not candidate_patterns: | |
| return [] | |
| # Compute similarity scores | |
| similarities = [] | |
| for pattern in candidate_patterns: | |
| # Simple similarity based on key features | |
| pace_sim = 1.0 - abs(audio_features.pace_wpm - pattern.optimal_pace) / 100.0 | |
| pitch_sim = 1.0 - abs(audio_features.pitch_mean - pattern.optimal_pitch_range[0]) / 150.0 | |
| energy_sim = 1.0 - abs(audio_features.energy_mean - pattern.optimal_energy) / 0.5 | |
| # Weighted similarity | |
| similarity = np.clip( | |
| pace_sim * 0.35 + pitch_sim * 0.35 + energy_sim * 0.30, | |
| 0, 1 | |
| ) | |
| # Boost by viral efficacy and weight | |
| weighted_similarity = similarity * pattern.viral_efficacy_score * pattern.weight | |
| """ | |
| audio_pattern_learner.py | |
| Autonomous machine learning system for identifying and predicting viral audio patterns. | |
| Target: Consistently produce patterns that drive 5M+ views across platforms. | |
| Core Features: | |
| - High-resolution audio feature ingestion & normalization | |
| - Advanced feature engineering (beat, hook, emotion, spectral) | |
| - Multi-model ML pipeline (XGBoost, LSTM, clustering) | |
| - Pattern discovery & ranking by viral efficacy | |
| - RL integration for continuous optimization | |
| - Real-time API for TTS/voice-sync integration | |
| - Cross-platform, cross-niche pattern learning | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict, deque | |
| import json | |
| import pickle | |
| from pathlib import Path | |
| import logging | |
| # ML/DL imports | |
| try: | |
| import xgboost as xgb | |
| from sklearn.cluster import KMeans, HDBSCAN | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.ensemble import IsolationForest | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| except ImportError: | |
| print("Warning: Some ML libraries not installed. Install: xgboost, scikit-learn, torch") | |
| # Audio processing | |
| try: | |
| import librosa | |
| from scipy import signal, stats | |
| except ImportError: | |
| print("Warning: Audio libraries not installed. Install: librosa, scipy") | |
| # ============================================================================ | |
| # DATA STRUCTURES | |
| # ============================================================================ | |
| @dataclass | |
| class AudioFeatures: | |
| """Comprehensive audio feature set for virality prediction""" | |
| # Basic features | |
| pace_wpm: float | |
| pitch_mean: float | |
| pitch_variance: float | |
| energy_mean: float | |
| energy_variance: float | |
| tempo_bpm: float | |
| # Hook & timing | |
| hook_timing_seconds: List[float] | |
| hook_emphasis_amplitude: List[float] | |
| hook_pitch_jump: List[float] | |
| pause_durations: List[float] | |
| pause_positions: List[float] | |
| beat_alignment_error: float | |
| syllable_timing: List[float] | |
| # Spectral & timbre | |
| mfcc: np.ndarray | |
| spectral_centroid: np.ndarray | |
| spectral_rolloff: np.ndarray | |
| zero_crossing_rate: np.ndarray | |
| chroma: np.ndarray | |
| harmonic_noise_ratio: float | |
| # Emotion & dynamics | |
| emotion_trajectory: List[str] # ['building', 'peak', 'sustain', 'release'] | |
| emotion_intensity: List[float] | |
| voice_tone: str | |
| phoneme_timing: Dict[str, float] | |
| # Context | |
| niche: str | |
| platform: str | |
| beat_type: str | |
| voice_style: str | |
| language: str | |
| music_track: Optional[str] | |
| is_trending_beat: bool | |
| trend_timestamp: datetime | |
| # Embeddings | |
| audio_embedding: Optional[np.ndarray] = None | |
| def to_feature_vector(self) -> np.ndarray: | |
| """Convert to flat feature vector for ML models""" | |
| features = [ | |
| self.pace_wpm, | |
| self.pitch_mean, | |
| self.pitch_variance, | |
| self.energy_mean, | |
| self.energy_variance, | |
| self.tempo_bpm, | |
| np.mean(self.hook_emphasis_amplitude) if self.hook_emphasis_amplitude else 0, | |
| np.mean(self.hook_pitch_jump) if self.hook_pitch_jump else 0, | |
| np.mean(self.pause_durations) if self.pause_durations else 0, | |
| self.beat_alignment_error, | |
| self.harmonic_noise_ratio, | |
| len(self.hook_timing_seconds), | |
| len(self.pause_durations), | |
| np.mean(self.emotion_intensity) if self.emotion_intensity else 0, | |
| ] | |
| # Add aggregated spectral features | |
| if self.mfcc is not None: | |
| features.extend(np.mean(self.mfcc, axis=1).tolist()[:13]) | |
| else: | |
| features.extend([0] * 13) | |
| return np.array(features) | |
| @dataclass | |
| class PerformanceMetrics: | |
| """Video performance metrics for learning""" | |
| video_id: str | |
| views_total: int | |
| retention_2s: float | |
| retention_15s: float | |
| completion_rate: float | |
| replay_rate: float | |
| velocity_per_hour: float | |
| velocity_per_day: float | |
| # Social engagement | |
| likes: int | |
| comments: int | |
| shares: int | |
| saves: int | |
| # Platform | |
| platform: str | |
| upload_timestamp: datetime | |
| # Derived metrics | |
| viral_score: float = 0.0 | |
| velocity_score: float = 0.0 | |
| engagement_ratio: float = 0.0 | |
| def __post_init__(self): | |
| """Calculate derived metrics""" | |
| self.viral_score = ( | |
| self.views_total / 1_000_000 * 0.3 + | |
| self.completion_rate * 0.2 + | |
| self.retention_2s * 0.15 + | |
| self.replay_rate * 0.15 + | |
| (self.shares / max(self.views_total, 1)) * 1000 * 0.2 | |
| ) | |
| self.velocity_score = ( | |
| self.velocity_per_hour * 0.4 + | |
| self.velocity_per_day / 24 * 0.6 | |
| ) | |
| if self.views_total > 0: | |
| self.engagement_ratio = (self.likes + self.comments * 2 + self.shares * 3) / self.views_total | |
| @dataclass | |
| class AudioPattern: | |
| """Discovered audio pattern with viral efficacy""" | |
| pattern_id: str | |
| niche: str | |
| platform: str | |
| # Pattern characteristics | |
| optimal_pace: float | |
| optimal_pitch_range: Tuple[float, float] | |
| optimal_energy: float | |
| hook_timings: List[float] | |
| pause_pattern: List[Tuple[float, float]] # (position, duration) | |
| beat_alignment_target: float | |
| emotion_arc: List[str] | |
| # Efficacy metrics | |
| viral_efficacy_score: float | |
| sample_count: int | |
| avg_views: float | |
| avg_completion: float | |
| confidence: float | |
| # Temporal | |
| discovered_at: datetime | |
| last_validated: datetime | |
| trend_status: str # 'rising', 'peaked', 'declining', 'stable' | |
| # Weights for RL | |
| weight: float = 1.0 | |
| decay_rate: float = 0.95 | |
| @dataclass | |
| class PatternRecommendation: | |
| """Recommendation for TTS/voice-sync engines""" | |
| niche: str | |
| platform: str | |
| beat_type: str | |
| # TTS parameters | |
| pace_wpm: float | |
| pitch_base: float | |
| pitch_variance: float | |
| energy_level: float | |
| voice_style: str | |
| # Timing patterns | |
| hook_placements: List[float] | |
| pause_placements: List[Tuple[float, float]] | |
| emphasis_words: List[str] | |
| # Beat sync | |
| beat_alignment_rules: Dict[str, float] | |
| syllable_timing_guide: Dict[str, float] | |
| # Predicted performance | |
| predicted_viral_score: float | |
| confidence: float | |
| # ============================================================================ | |
| # FEATURE ENGINEERING | |
| # ============================================================================ | |
| class AudioFeatureEngineering: | |
| """Advanced feature engineering for virality prediction""" | |
| @staticmethod | |
| def extract_beat_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract beat-related viral signals""" | |
| features = {} | |
| # Beat alignment score | |
| features['beat_alignment_score'] = 1.0 - audio_features.beat_alignment_error | |
| # Off-beat ratio (syllables that don't align) | |
| if audio_features.syllable_timing: | |
| beat_interval = 60.0 / audio_features.tempo_bpm | |
| off_beats = sum(1 for t in audio_features.syllable_timing | |
| if (t % beat_interval) > beat_interval * 0.3) | |
| features['off_beat_ratio'] = off_beats / len(audio_features.syllable_timing) | |
| else: | |
| features['off_beat_ratio'] = 0.0 | |
| # Hook-to-beat correlation | |
| if audio_features.hook_timing_seconds: | |
| features['hook_beat_sync'] = np.mean([ | |
| 1.0 - (t % (60.0 / audio_features.tempo_bpm)) / (60.0 / audio_features.tempo_bpm) | |
| for t in audio_features.hook_timing_seconds | |
| ]) | |
| else: | |
| features['hook_beat_sync'] = 0.0 | |
| # Beat trend match (is this a trending beat pattern?) | |
| features['beat_trend_match'] = 1.0 if audio_features.is_trending_beat else 0.0 | |
| # Beat innovation score (novelty detection) | |
| features['beat_innovation'] = audio_features.beat_alignment_error * 0.5 # Placeholder | |
| return features | |
| @staticmethod | |
| def extract_hook_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract hook-related viral signals""" | |
| features = {} | |
| if not audio_features.hook_timing_seconds: | |
| return {k: 0.0 for k in ['hook_count', 'hook_early_placement', 'hook_amplitude_avg', | |
| 'hook_pitch_jump_avg', 'hook_spacing_variance']} | |
| features['hook_count'] = len(audio_features.hook_timing_seconds) | |
| features['hook_early_placement'] = 1.0 if audio_features.hook_timing_seconds[0] < 3.0 else 0.0 | |
| features['hook_amplitude_avg'] = np.mean(audio_features.hook_emphasis_amplitude) | |
| features['hook_pitch_jump_avg'] = np.mean(audio_features.hook_pitch_jump) | |
| # Hook spacing consistency | |
| if len(audio_features.hook_timing_seconds) > 1: | |
| spacings = np.diff(audio_features.hook_timing_seconds) | |
| features['hook_spacing_variance'] = np.var(spacings) | |
| else: | |
| features['hook_spacing_variance'] = 0.0 | |
| return features | |
| @staticmethod | |
| def extract_emotion_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract emotion trajectory features""" | |
| features = {} | |
| if not audio_features.emotion_trajectory or not audio_features.emotion_intensity: | |
| return {k: 0.0 for k in ['emotion_arc_score', 'emotion_peak_early', | |
| 'emotion_intensity_avg', 'emotion_variance']} | |
| # Emotion arc score (building -> peak is viral) | |
| arc_map = {'building': 0.3, 'peak': 1.0, 'sustain': 0.7, 'release': 0.5} | |
| arc_scores = [arc_map.get(e, 0.5) for e in audio_features.emotion_trajectory] | |
| features['emotion_arc_score'] = np.mean(arc_scores) | |
| # Peak placement | |
| if 'peak' in audio_features.emotion_trajectory: | |
| peak_idx = audio_features.emotion_trajectory.index('peak') | |
| features['emotion_peak_early'] = 1.0 if peak_idx < len(arc_scores) * 0.4 else 0.0 | |
| else: | |
| features['emotion_peak_early'] = 0.0 | |
| features['emotion_intensity_avg'] = np.mean(audio_features.emotion_intensity) | |
| features['emotion_variance'] = np.var(audio_features.emotion_intensity) | |
| return features | |
| @staticmethod | |
| def extract_pause_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract pause placement patterns""" | |
| features = {} | |
| if not audio_features.pause_durations or not audio_features.pause_positions: | |
| return {k: 0.0 for k in ['pause_count', 'pause_avg_duration', | |
| 'pause_placement_score', 'pause_rewatch_correlation']} | |
| features['pause_count'] = len(audio_features.pause_durations) | |
| features['pause_avg_duration'] = np.mean(audio_features.pause_durations) | |
| # Strategic pause placement (after hooks, before reveals) | |
| strategic_positions = [p for p in audio_features.pause_positions if 2 < p < 8] | |
| features['pause_placement_score'] = len(strategic_positions) / max(len(audio_features.pause_positions), 1) | |
| # Pause-rewatch correlation (placeholder - would need replay timestamp data) | |
| features['pause_rewatch_correlation'] = performance.replay_rate * 0.5 | |
| return features | |
| @staticmethod | |
| def extract_spectral_features(audio_features: AudioFeatures) -> Dict[str, float]: | |
| """Extract spectral/timbre quality features""" | |
| features = {} | |
| if audio_features.spectral_centroid is not None: | |
| features['spectral_centroid_mean'] = np.mean(audio_features.spectral_centroid) | |
| features['spectral_centroid_var'] = np.var(audio_features.spectral_centroid) | |
| else: | |
| features['spectral_centroid_mean'] = 0.0 | |
| features['spectral_centroid_var'] = 0.0 | |
| if audio_features.spectral_rolloff is not None: | |
| features['spectral_rolloff_mean'] = np.mean(audio_features.spectral_rolloff) | |
| else: | |
| features['spectral_rolloff_mean'] = 0.0 | |
| if audio_features.zero_crossing_rate is not None: | |
| features['zero_crossing_mean'] = np.mean(audio_features.zero_crossing_rate) | |
| else: | |
| features['zero_crossing_mean'] = 0.0 | |
| features['harmonic_noise_ratio'] = audio_features.harmonic_noise_ratio | |
| return features | |
| @staticmethod | |
| def extract_velocity_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Extract velocity and virality signals""" | |
| features = {} | |
| features['velocity_per_hour'] = performance.velocity_per_hour | |
| features['velocity_per_day'] = performance.velocity_per_day | |
| features['velocity_score'] = performance.velocity_score | |
| # Retention correlation | |
| features['retention_2s_rate'] = performance.retention_2s | |
| features['retention_completion_ratio'] = performance.completion_rate / max(performance.retention_2s, 0.01) | |
| # Hook to retention correlation (placeholder) | |
| if audio_features.hook_timing_seconds: | |
| first_hook = audio_features.hook_timing_seconds[0] | |
| features['hook_retention_correlation'] = performance.retention_2s if first_hook < 2.0 else performance.retention_2s * 0.8 | |
| else: | |
| features['hook_retention_correlation'] = 0.0 | |
| return features | |
| @staticmethod | |
| def compute_full_feature_set(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]: | |
| """Compute complete engineered feature set""" | |
| all_features = {} | |
| # Base features | |
| all_features['pace_wpm'] = audio_features.pace_wpm | |
| all_features['pitch_mean'] = audio_features.pitch_mean | |
| all_features['pitch_variance'] = audio_features.pitch_variance | |
| all_features['energy_mean'] = audio_features.energy_mean | |
| all_features['energy_variance'] = audio_features.energy_variance | |
| all_features['tempo_bpm'] = audio_features.tempo_bpm | |
| # Engineered features | |
| all_features.update(AudioFeatureEngineering.extract_beat_features(audio_features, performance)) | |
| all_features.update(AudioFeatureEngineering.extract_hook_features(audio_features, performance)) | |
| all_features.update(AudioFeatureEngineering.extract_emotion_features(audio_features, performance)) | |
| all_features.update(AudioFeatureEngineering.extract_pause_features(audio_features, performance)) | |
| all_features.update(AudioFeatureEngineering.extract_spectral_features(audio_features)) | |
| all_features.update(AudioFeatureEngineering.extract_velocity_features(audio_features, performance)) | |
| # Target metrics | |
| all_features['views_total'] = performance.views_total | |
| all_features['viral_score'] = performance.viral_score | |
| all_features['engagement_ratio'] = performance.engagement_ratio | |
| return all_features | |
| # ============================================================================ | |
| # MACHINE LEARNING MODELS | |
| # ============================================================================ | |
| class ViralityPredictor: | |
| """Multi-model ensemble for predicting viral performance with full training""" | |
| def __init__(self): | |
| self.xgb_views_model = None | |
| self.xgb_retention_model = None | |
| self.xgb_velocity_model = None | |
| self.xgb_engagement_model = None | |
| self.scaler = StandardScaler() | |
| self.feature_importance = {} | |
| self.is_trained = False | |
| self.evaluation_metrics = {} | |
| def train(self, X: np.ndarray, y_views: np.ndarray, y_viral: np.ndarray, | |
| y_retention: np.ndarray, y_velocity: np.ndarray, y_engagement: np.ndarray): | |
| """Train XGBoost models on audio features with full multi-target prediction""" | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error | |
| # Scale features | |
| X_scaled = self.scaler.fit_transform(X) | |
| # Split for validation | |
| X_train, X_val, y_v_train, y_v_val = train_test_split( | |
| X_scaled, y_views, test_size=0.2, random_state=42 | |
| ) | |
| _, _, y_r_train, y_r_val = train_test_split( | |
| X_scaled, y_retention, test_size=0.2, random_state=42 | |
| ) | |
| _, _, y_vel_train, y_vel_val = train_test_split( | |
| X_scaled, y_velocity, test_size=0.2, random_state=42 | |
| ) | |
| _, _, y_eng_train, y_eng_val = train_test_split( | |
| X_scaled, y_engagement, test_size=0.2, random_state=42 | |
| ) | |
| # Train view predictor | |
| self.xgb_views_model = xgb.XGBRegressor( | |
| n_estimators=300, | |
| max_depth=10, | |
| learning_rate=0.03, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| objective='reg:squarederror', | |
| tree_method='hist', | |
| early_stopping_rounds=20 | |
| ) | |
| self.xgb_views_model.fit( | |
| X_train, y_v_train, | |
| eval_set=[(X_val, y_v_val)], | |
| verbose=False | |
| ) | |
| # Train retention predictor | |
| self.xgb_retention_model = xgb.XGBRegressor( | |
| n_estimators=200, | |
| max_depth=8, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| colsample_bytree=0.8 | |
| ) | |
| self.xgb_retention_model.fit(X_train, y_r_train) | |
| # Train velocity predictor | |
| self.xgb_velocity_model = xgb.XGBRegressor( | |
| n_estimators=200, | |
| max_depth=8, | |
| learning_rate=0.05 | |
| ) | |
| self.xgb_velocity_model.fit(X_train, y_vel_train) | |
| # Train engagement predictor | |
| self.xgb_engagement_model = xgb.XGBRegressor( | |
| n_estimators=200, | |
| max_depth=8, | |
| learning_rate=0.05 | |
| ) | |
| self.xgb_engagement_model.fit(X_train, y_eng_train) | |
| # Store feature importance | |
| if hasattr(self.xgb_views_model, 'feature_importances_'): | |
| self.feature_importance = dict(enumerate(self.xgb_views_model.feature_importances_)) | |
| # Compute evaluation metrics | |
| y_v_pred = self.xgb_views_model.predict(X_val) | |
| y_r_pred = self.xgb_retention_model.predict(X_val) | |
| self.evaluation_metrics = { | |
| 'views_rmse': np.sqrt(mean_squared_error(y_v_val, y_v_pred)), | |
| 'views_mae': mean_absolute_error(y_v_val, y_v_pred), | |
| 'views_r2': r2_score(y_v_val, y_v_pred), | |
| 'retention_rmse': np.sqrt(mean_squared_error(y_r_val, y_r_pred)), | |
| 'retention_r2': r2_score(y_r_val, y_r_pred) | |
| } | |
| self.is_trained = True | |
| def predict_views(self, X: np.ndarray) -> np.ndarray: | |
| """Predict view count""" | |
| if not self.is_trained: | |
| return np.zeros(X.shape[0]) | |
| X_scaled = self.scaler.transform(X) | |
| return self.xgb_views_model.predict(X_scaled) | |
| def predict_retention(self, X: np.ndarray) -> np.ndarray: | |
| """Predict completion rate""" | |
| if not self.is_trained: | |
| return np.zeros(X.shape[0]) | |
| X_scaled = self.scaler.transform(X) | |
| return self.xgb_retention_model.predict(X_scaled) | |
| def predict_velocity(self, X: np.ndarray) -> np.ndarray: | |
| """Predict velocity score""" | |
| if not self.is_trained: | |
| return np.zeros(X.shape[0]) | |
| X_scaled = self.scaler.transform(X) | |
| return self.xgb_velocity_model.predict(X_scaled) | |
| def predict_engagement(self, X: np.ndarray) -> np.ndarray: | |
| """Predict engagement ratio""" | |
| if not self.is_trained: | |
| return np.zeros(X.shape[0]) | |
| X_scaled = self.scaler.transform(X) | |
| return self.xgb_engagement_model.predict(X_scaled) | |
| def predict_viral_score(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """Predict comprehensive viral score with confidence intervals""" | |
| views = self.predict_views(X) | |
| retention = self.predict_retention(X) | |
| velocity = self.predict_velocity(X) | |
| engagement = self.predict_engagement(X) | |
| # Composite viral score | |
| viral_score = ( | |
| (views / 1_000_000) * 0.35 + | |
| retention * 20 * 0.25 + | |
| velocity / 1000 * 0.20 + | |
| engagement * 1000 * 0.20 | |
| ) | |
| # Confidence based on model agreement and prediction variance | |
| # Use bootstrap-style confidence estimation | |
| confidence = np.clip( | |
| 1.0 - (np.abs(views - np.median(views)) / (np.std(views) + 1e-6)) * 0.1, | |
| 0.3, 1.0 | |
| ) | |
| return viral_score, confidence | |
| def get_top_features(self, n: int = 10) -> List[Tuple[int, float]]: | |
| """Get top N most important features""" | |
| if not self.feature_importance: | |
| return [] | |
| sorted_features = sorted(self.feature_importance.items(), key=lambda x: x[1], reverse=True) | |
| return sorted_features[:n] | |
| def get_evaluation_metrics(self) -> Dict[str, float]: | |
| """Get model performance metrics""" | |
| return self.evaluation_metrics | |
| class SequenceModel(nn.Module): | |
| """LSTM for temporal audio pattern learning""" | |
| def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 2): | |
| super().__init__() | |
| self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.3) | |
| self.fc = nn.Linear(hidden_size, 1) | |
| def forward(self, x): | |
| lstm_out, _ = self.lstm(x) | |
| return self.fc(lstm_out[:, -1, :]) | |
| class PatternClusterer: | |
| """Discover audio pattern clusters using embeddings""" | |
| def __init__(self, n_clusters: int = 20): | |
| self.n_clusters = n_clusters | |
| self.kmeans = None | |
| self.cluster_profiles = {} | |
| def fit(self, embeddings: np.ndarray, performance_scores: np.ndarray): | |
| """Cluster audio patterns and compute cluster performance profiles""" | |
| self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42) | |
| labels = self.kmeans.fit_predict(embeddings) | |
| # Compute cluster profiles | |
| for cluster_id in range(self.n_clusters): | |
| mask = labels == cluster_id | |
| cluster_scores = performance_scores[mask] | |
| self.cluster_profiles[cluster_id] = { | |
| 'count': np.sum(mask), | |
| 'avg_score': np.mean(cluster_scores), | |
| 'std_score': np.std(cluster_scores), | |
| 'viral_probability': np.mean(cluster_scores > 5.0), # >5M views threshold | |
| 'centroid': self.kmeans.cluster_centers_[cluster_id] | |
| } | |
| return labels | |
| def get_high_viral_clusters(self, threshold: float = 0.3) -> List[int]: | |
| """Get cluster IDs with high viral probability""" | |
| return [cid for cid, profile in self.cluster_profiles.items() | |
| if profile['viral_probability'] > threshold] | |
| def predict_cluster(self, embedding: np.ndarray) -> int: | |
| """Predict cluster for new audio pattern""" | |
| if self.kmeans is None: | |
| return -1 | |
| return self.kmeans.predict(embedding.reshape(1, -1))[0] | |
| class AnomalyDetector: | |
| """Detect unusual audio patterns (overperformers and underperformers)""" | |
| def __init__(self): | |
| self.model = IsolationForest(contamination=0.1, random_state=42) | |
| def fit(self, X: np.ndarray): | |
| """Train anomaly detector""" | |
| self.model.fit(X) | |
| def predict(self, X: np.ndarray) -> np.ndarray: | |
| """Predict anomalies (-1 = anomaly, 1 = normal)""" | |
| return self.model.predict(X) | |
| def score(self, X: np.ndarray) -> np.ndarray: | |
| """Get anomaly scores (more negative = more anomalous)""" | |
| return self.model.score_samples(X) | |
| # ============================================================================ | |
| # REINFORCEMENT LEARNING COMPONENTS | |
| # ============================================================================ | |
| class RLAudioPolicy: | |
| """ | |
| Reinforcement Learning policy for audio parameter optimization. | |
| Maps audio features -> parameter adjustments that maximize viral score. | |
| """ | |
| def __init__(self, state_dim: int = 30, action_dim: int = 20, hidden_dim: int = 128): | |
| self.state_dim = state_dim | |
| self.action_dim = action_dim | |
| # Simple feedforward policy network | |
| self.policy_net = nn.Sequential( | |
| nn.Linear(state_dim, hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(0.2), | |
| nn.Linear(hidden_dim, hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(0.2), | |
| nn.Linear(hidden_dim, action_dim), | |
| nn.Tanh() # Actions in [-1, 1] range | |
| ) | |
| self.optimizer = optim.Adam(self.policy_net.parameters(), lr=0.001) | |
| self.action_history = [] | |
| self.reward_history = [] | |
| def select_action(self, state: np.ndarray, explore: bool = True) -> np.ndarray: | |
| """Select audio parameter adjustments""" | |
| state_tensor = torch.FloatTensor(state).unsqueeze(0) | |
| with torch.no_grad(): | |
| action = self.policy_net(state_tensor).squeeze(0).numpy() | |
| # Add exploration noise | |
| if explore: | |
| action += np.random.normal(0, 0.1, size=action.shape) | |
| action = np.clip(action, -1, 1) | |
| return action | |
| def update_policy(self, states: np.ndarray, actions: np.ndarray, rewards: np.ndarray): | |
| """Update policy using REINFORCE-style gradient""" | |
| states_tensor = torch.FloatTensor(states) | |
| actions_tensor = torch.FloatTensor(actions) | |
| rewards_tensor = torch.FloatTensor(rewards) | |
| # Normalize rewards | |
| rewards_tensor = (rewards_tensor - rewards_tensor.mean()) / (rewards_tensor.std() + 1e-8) | |
| # Forward pass | |
| predicted_actions = self.policy_net(states_tensor) | |
| # Policy gradient loss | |
| loss = -torch.mean(rewards_tensor.unsqueeze(1) * torch.log( | |
| 1 - torch.abs(predicted_actions - actions_tensor) + 1e-8 | |
| )) | |
| # Backward pass | |
| self.optimizer.zero_grad() | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0) | |
| self.optimizer.step() | |
| return loss.item() | |
| def decode_action_to_params(self, action: np.ndarray) -> Dict[str, float]: | |
| """Convert action vector to TTS/voice-sync parameters""" | |
| return { | |
| 'pace_adjustment': action[0] * 30, # ยฑ30 WPM | |
| 'pitch_adjustment': action[1] * 50, # ยฑ50 Hz | |
| 'energy_adjustment': action[2] * 0.2, # ยฑ0.2 | |
| 'hook_timing_shift': action[3] * 2.0, # ยฑ2 seconds | |
| 'pause_duration_mult': 1.0 + action[4] * 0.3, # 0.7x - 1.3x | |
| 'emphasis_strength': 0.5 + action[5] * 0.5, # 0-1.0 | |
| 'beat_alignment_target': 0.9 + action[6] * 0.1, # 0.9-1.0 | |
| 'emotion_intensity': 0.5 + action[7] * 0.5, # 0-1.0 | |
| } | |
| class EmbeddingNetwork(nn.Module): | |
| """Neural network for learning audio pattern embeddings""" | |
| def __init__(self, input_dim: int, embedding_dim: int = 64): | |
| super().__init__() | |
| self.encoder = nn.Sequential( | |
| nn.Linear(input_dim, 128), | |
| nn.ReLU(), | |
| nn.BatchNorm1d(128), | |
| nn.Dropout(0.3), | |
| nn.Linear(128, embedding_dim), | |
| ) | |
| def forward(self, x): | |
| return self.encoder(x) | |
| # ============================================================================ | |
| # CORE PATTERN LEARNER (ENHANCED WITH MISSING FEATURES) | |
| # ============================================================================ | |
| class AudioPatternLearner: | |
| """ | |
| Main orchestrator for autonomous viral audio pattern learning. | |
| Continuously learns from video performance and adapts to trends. | |
| """ | |
| def __init__(self, storage_path: str = "./pattern_learner_data"): | |
| self.storage_path = Path(storage_path) | |
| self.storage_path.mkdir(exist_ok=True) | |
| # ML models | |
| self.virality_predictor = ViralityPredictor() | |
| self.pattern_clusterer = PatternClusterer(n_clusters=25) | |
| self.anomaly_detector = AnomalyDetector() | |
| # Stratified models (per platform/niche) | |
| self.stratified_models: Dict[str, ViralityPredictor] = {} | |
| # Embedding network for similarity search | |
| self.embedding_model = None | |
| self.embedding_dim = 64 | |
| # RL components | |
| self.rl_policy = RLAudioPolicy(action_dim=20) | |
| self.rl_reward_history: deque = deque(maxlen=1000) | |
| # Pattern storage | |
| self.discovered_patterns: Dict[str, AudioPattern] = {} | |
| self.pattern_history: deque = deque(maxlen=10000) | |
| # Feature tracking | |
| self.feature_names: List[str] = [] | |
| self.niche_performance: Dict[str, Dict] = defaultdict(lambda: { | |
| 'total_videos': 0, | |
| 'avg_views': 0.0, | |
| 'top_patterns': [], | |
| 'model': None | |
| }) | |
| # Reinforcement learning components | |
| self.pattern_weights: Dict[str, float] = {} | |
| self.replay_buffer: deque = deque(maxlen=5000) | |
| # Caching | |
| self.embedding_cache: Dict[str, np.ndarray] = {} | |
| # Configuration | |
| self.config = { | |
| 'viral_threshold': 5_000_000, | |
| 'min_sample_size': 10, | |
| 'pattern_decay_rate': 0.95, | |
| 'trend_window_days': 30, | |
| 'confidence_threshold': 0.7, | |
| 'update_frequency_hours': 6 | |
| } | |
| # Logging | |
| self.setup_logging() | |
| def setup_logging(self): | |
| """Setup logging system""" | |
| log_file = self.storage_path / "pattern_learner.log" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(log_file), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger('AudioPatternLearner') | |
| def ingest_video_batch(self, audio_features_list: List[AudioFeatures], | |
| performance_list: List[PerformanceMetrics]): | |
| """ | |
| Ingest batch of video audio records with performance metrics. | |
| Main entry point for continuous learning. | |
| """ | |
| self.logger.info(f"Ingesting batch of {len(audio_features_list)} videos") | |
| if len(audio_features_list) != len(performance_list): | |
| raise ValueError("Audio features and performance lists must be same length") | |
| # Process each video | |
| for audio_feat, perf in zip(audio_features_list, performance_list): | |
| # Compute full feature set | |
| engineered_features = AudioFeatureEngineering.compute_full_feature_set(audio_feat, perf) | |
| # Store in replay buffer | |
| self.replay_buffer.append({ | |
| 'audio_features': audio_feat, | |
| 'performance': perf, | |
| 'engineered_features': engineered_features, | |
| 'timestamp': datetime.now() | |
| }) | |
| # Update niche stats | |
| self._update_niche_stats(audio_feat, perf) | |
| # Trigger learning pipeline | |
| self._train_models() | |
| self._discover_patterns() | |
| self._update_pattern_weights() | |
| self._update_rl_policy() | |
| self._detect_anomalies() | |
| self.logger.info(f"Batch processing complete. Total patterns: {len(self.discovered_patterns)}") | |
| def _train_models(self): | |
| """Train/update all ML models with full implementation""" | |
| if len(self.replay_buffer) < self.config['min_sample_size']: | |
| self.logger.warning("Insufficient data for training") | |
| return | |
| self.logger.info(f"Training ML models on {len(self.replay_buffer)} samples...") | |
| # Prepare training data with platform/niche stratification | |
| X_list = [] | |
| y_views = [] | |
| y_viral = [] | |
| y_retention = [] | |
| y_velocity = [] | |
| y_engagement = [] | |
| platform_niche_encodings = [] | |
| for record in self.replay_buffer: | |
| features = record['engineered_features'] | |
| feature_vector = [features.get(k, 0.0) for k in sorted(features.keys()) | |
| if k not in ['views_total', 'viral_score', 'engagement_ratio']] | |
| X_list.append(feature_vector) | |
| y_views.append(record['performance'].views_total) | |
| y_viral.append(record['performance'].viral_score) | |
| y_retention.append(record['performance'].completion_rate) | |
| y_velocity.append(record['performance'].velocity_score) | |
| y_engagement.append(record['performance'].engagement_ratio) | |
| # Encode platform + niche for stratified learning | |
| pn_key = f"{record['audio_features'].platform}_{record['audio_features'].niche}" | |
| platform_niche_encodings.append(pn_key) | |
| X = np.array(X_list) | |
| y_views = np.array(y_views) | |
| y_viral = np.array(y_viral) | |
| y_retention = np.array(y_retention) | |
| y_velocity = np.array(y_velocity) | |
| y_engagement = np.array(y_engagement) | |
| # Store feature names | |
| sample_features = self.replay_buffer[0]['engineered_features'] | |
| self.feature_names = sorted([k for k in sample_features.keys() | |
| if k not in ['views_total', 'viral_score', 'engagement_ratio']]) | |
| # Train virality predictor with multi-target | |
| self.virality_predictor.train(X, y_views, y_viral, y_retention, y_velocity, y_engagement) | |
| # Train platform/niche-specific models | |
| self._train_stratified_models(X, y_views, platform_niche_encodings) | |
| # Train embeddings for similarity search | |
| self._train_embeddings(X, y_views) | |
| # Train pattern clustering | |
| if len(X) >= 50: | |
| embeddings = self._compute_embeddings(X) | |
| self.pattern_clusterer.fit(embeddings, y_views / 1_000_000) # Normalize to millions | |
| self.logger.info(f"Discovered {len(self.pattern_clusterer.cluster_profiles)} clusters") | |
| # Train anomaly detector | |
| self.anomaly_detector.fit(X) | |
| # Evaluate model performance | |
| self._evaluate_models(X, y_views, y_viral) | |
| self.logger.info("โ Model training complete") | |
| def _train_stratified_models(self, X: np.ndarray, y_views: np.ndarray, | |
| platform_niche_keys: List[str]): | |
| """Train separate models for each platform/niche combination""" | |
| from collections import Counter | |
| # Group samples by platform/niche | |
| key_counts = Counter(platform_niche_keys) | |
| for pn_key, count in key_counts.items(): | |
| if count < 20: # Need minimum samples | |
| continue | |
| # Extract samples for this platform/niche | |
| indices = [i for i, k in enumerate(platform_niche_keys) if k == pn_key] | |
| X_subset = X[indices] | |
| y_subset = y_views[indices] | |
| # Train specialized model | |
| model = ViralityPredictor() | |
| model.train( | |
| X_subset, y_subset, y_subset / 1_000_000, | |
| np.zeros(len(y_subset)), np.zeros(len(y_subset)), np.zeros(len(y_subset)) | |
| ) | |
| self.stratified_models[pn_key] = model | |
| # Update niche stats | |
| parts = pn_key.split('_') | |
| if len(parts) >= 2: | |
| niche_key = f"{parts[1]}_{parts[0]}" # niche_platform | |
| self.niche_performance[niche_key]['model'] = model | |
| self.logger.info(f"Trained {len(self.stratified_models)} stratified models") | |
| def _train_embeddings(self, X: np.ndarray, y_views: np.ndarray): | |
| """Train embedding network for similarity search""" | |
| if self.embedding_model is None: | |
| self.embedding_model = EmbeddingNetwork(X.shape[1], self.embedding_dim) | |
| # Prepare triplet training data (anchor, positive, negative) | |
| optimizer = optim.Adam(self.embedding_model.parameters(), lr=0.001) | |
| criterion = nn.TripletMarginLoss(margin=1.0) | |
| # Simple training: high views = similar embeddings | |
| threshold_5m = 5_000_000 | |
| high_performers = X[y_views >= threshold_5m] | |
| low_performers = X[y_views < threshold_5m] | |
| if len(high_performers) > 2 and len(low_performers) > 2: | |
| for epoch in range(50): | |
| # Sample triplets | |
| n_triplets = min(32, len(high_performers) - 1) | |
| anchor_idx = np.random.choice(len(high_performers), n_triplets) | |
| positive_idx = np.random.choice(len(high_performers), n_triplets) | |
| negative_idx = np.random.choice(len(low_performers), n_triplets) | |
| anchors = torch.FloatTensor(high_performers[anchor_idx]) | |
| positives = torch.FloatTensor(high_performers[positive_idx]) | |
| negatives = torch.FloatTensor(low_performers[negative_idx]) | |
| optimizer.zero_grad() | |
| anchor_emb = self.embedding_model(anchors) | |
| positive_emb = self.embedding_model(positives) | |
| negative_emb = self.embedding_model(negatives) | |
| loss = criterion(anchor_emb, positive_emb, negative_emb) | |
| loss.backward() | |
| optimizer.step() | |
| self.logger.info("Embedding model training complete") | |
| def _compute_embeddings(self, X: np.ndarray) -> np.ndarray: | |
| """Compute embeddings for audio features""" | |
| if self.embedding_model is None: | |
| return X # Fallback to raw features | |
| with torch.no_grad(): | |
| X_tensor = torch.FloatTensor(X) | |
| embeddings = self.embedding_model(X_tensor).numpy() | |
| return embeddings | |
| def _evaluate_models(self, X: np.ndarray, y_views: np.ndarray, y_viral: np.ndarray): | |
| """Evaluate model performance and log metrics""" | |
| predictions = self.virality_predictor.predict_views(X) | |
| # Compute metrics | |
| from sklearn.metrics import mean_absolute_error, r2_score | |
| mae = mean_absolute_error(y_views, predictions) | |
| r2 = r2_score(y_views, predictions) | |
| # Check 5M+ prediction accuracy | |
| threshold = 5_000_000 | |
| true_viral = y_views >= threshold | |
| pred_viral = predictions >= threshold | |
| accuracy = np.mean(true_viral == pred_viral) | |
| metrics = { | |
| 'mae': float(mae), | |
| 'r2': float(r2), | |
| 'viral_accuracy': float(accuracy), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self.logger.info(f"Model Evaluation: MAE={mae:.0f}, Rยฒ={r2:.3f}, Viral Acc={accuracy:.2%}") | |
| # Log to file | |
| metrics_file = self.storage_path / "model_metrics.jsonl" | |
| with open(metrics_file, 'a') as f: | |
| f.write(json.dumps(metrics) + '\n') | |
| def _update_rl_policy(self): | |
| """Update RL policy based on recent performance feedback""" | |
| if len(self.replay_buffer) < 50: | |
| return | |
| # Collect recent experiences | |
| recent = list(self.replay_buffer)[-100:] | |
| states = [] | |
| actions = [] | |
| rewards = [] | |
| for record in recent: | |
| # State = audio features | |
| features = record['engineered_features'] | |
| state = [features.get(k, 0.0) for k in self.feature_names] | |
| states.append(state) | |
| # Action = deviation from optimal pattern | |
| audio = record['audio_features'] | |
| optimal_pattern = self.get_recommended_audio_profile( | |
| audio.niche, audio.platform, audio.beat_type | |
| ) | |
| if optimal_pattern: | |
| action = [ | |
| (audio.pace_wpm - optimal_pattern.pace_wpm) / 30.0, | |
| (audio.pitch_mean - optimal_pattern.pitch_base) / 50.0, | |
| (audio.energy_mean - optimal_pattern.energy_level) / 0.2, | |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 | |
| ] | |
| actions.append(action[:20]) | |
| else: | |
| actions.append([0] * 20) | |
| # Reward = viral score (normalized to 0-1) | |
| reward = min(record['performance'].viral_score / 10.0, 1.0) | |
| rewards.append(reward) | |
| # Store for history | |
| self.rl_reward_history.append(reward) | |
| if len(states) > 10: | |
| states = np.array(states[:, :self.rl_policy.state_dim] if states else states) | |
| actions = np.array(actions) | |
| rewards = np.array(rewards) | |
| # Update policy | |
| loss = self.rl_policy.update_policy(states, actions, rewards) | |
| self.logger.info(f"RL policy updated: loss={loss:.4f}, avg_reward={np.mean(rewards):.3f}") | |
| def _discover_patterns(self): | |
| """Discover viral audio patterns through clustering and analysis""" | |
| if len(self.replay_buffer) < self.config['min_sample_size']: | |
| return | |
| self.logger.info("Discovering audio patterns...") | |
| # Group by niche + platform | |
| niche_platform_groups = defaultdict(list) | |
| for record in self.replay_buffer: | |
| audio = record['audio_features'] | |
| key = f"{audio.niche}_{audio.platform}_{audio.beat_type}" | |
| niche_platform_groups[key].append(record) | |
| # Analyze each group | |
| for group_key, records in niche_platform_groups.items(): | |
| if len(records) < 5: | |
| continue | |
| # Filter for high performers (>5M views) | |
| high_performers = [r for r in records if r['performance'].views_total >= self.config['viral_threshold']] | |
| if len(high_performers) < 3: | |
| continue | |
| # Extract common patterns | |
| pattern = self._extract_pattern_from_group(group_key, high_performers, records) | |
| if pattern: | |
| self.discovered_patterns[pattern.pattern_id] = pattern | |
| self.logger.info(f"Discovered pattern: {pattern.pattern_id} (efficacy: {pattern.viral_efficacy_score:.3f})") | |
| def _extract_pattern_from_group(self, group_key: str, high_performers: List[Dict], | |
| all_records: List[Dict]) -> Optional[AudioPattern]: | |
| """Extract common audio pattern from high-performing videos""" | |
| if not high_performers: | |
| return None | |
| # Compute median/mean characteristics | |
| pace_vals = [r['audio_features'].pace_wpm for r in high_performers] | |
| pitch_vals = [r['audio_features'].pitch_mean for r in high_performers] | |
| energy_vals = [r['audio_features'].energy_mean for r in high_performers] | |
| # Hook timing patterns | |
| hook_timings = [] | |
| for r in high_performers: | |
| if r['audio_features'].hook_timing_seconds: | |
| hook_timings.extend(r['audio_features'].hook_timing_seconds[:3]) | |
| # Pause patterns | |
| pause_patterns = [] | |
| for r in high_performers: | |
| audio = r['audio_features'] | |
| if audio.pause_positions and audio.pause_durations: | |
| for pos, dur in zip(audio.pause_positions[:5], audio.pause_durations[:5]): | |
| pause_patterns.append((pos, dur)) | |
| # Emotion arcs | |
| emotion_arcs = [r['audio_features'].emotion_trajectory for r in high_performers | |
| if r['audio_features'].emotion_trajectory] | |
| # Compute efficacy score | |
| avg_views = np.mean([r['performance'].views_total for r in high_performers]) | |
| avg_completion = np.mean([r['performance'].completion_rate for r in high_performers]) | |
| viral_efficacy = ( | |
| (avg_views / 10_000_000) * 0.4 + # Normalize to 10M | |
| avg_completion * 0.3 + | |
| (len(high_performers) / len(all_records)) * 0.3 | |
| ) | |
| # Extract niche/platform from group key | |
| parts = group_key.split('_') | |
| niche = parts[0] if len(parts) > 0 else 'unknown' | |
| platform = parts[1] if len(parts) > 1 else 'unknown' | |
| # Create pattern | |
| pattern = AudioPattern( | |
| pattern_id=f"pattern_{group_key}_{int(datetime.now().timestamp())}", | |
| niche=niche, | |
| platform=platform, | |
| optimal_pace=float(np.median(pace_vals)), | |
| optimal_pitch_range=(float(np.percentile(pitch_vals, 25)), float(np.percentile(pitch_vals, 75))), | |
| optimal_energy=float(np.median(energy_vals)), | |
| hook_timings=hook_timings[:5] if hook_timings else [], | |
| pause_pattern=pause_patterns[:5] if pause_patterns else [], | |
| beat_alignment_target=0.95, | |
| emotion_arc=emotion_arcs[0] if emotion_arcs else ['building', 'peak'], | |
| viral_efficacy_score=viral_efficacy, | |
| sample_count=len(high_performers), | |
| avg_views=avg_views, | |
| avg_completion=avg_completion, | |
| confidence=min(len(high_performers) / 20, 1.0), | |
| discovered_at=datetime.now(), | |
| last_validated=datetime.now(), | |
| trend_status='stable', | |
| weight=1.0, | |
| decay_rate=self.config['pattern_decay_rate'] | |
| ) | |
| return pattern | |
| def _update_pattern_weights(self): | |
| """Update pattern weights based on recent performance (RL mechanism)""" | |
| current_time = datetime.now() | |
| for pattern_id, pattern in list(self.discovered_patterns.items()): | |
| # Apply temporal decay | |
| days_old = (current_time - pattern.last_validated).days | |
| decay_factor = pattern.decay_rate ** days_old | |
| pattern.weight *= decay_factor | |
| # Remove patterns with very low weight | |
| if pattern.weight < 0.1: | |
| del self.discovered_patterns[pattern_id] | |
| self.logger.info(f"Removed low-weight pattern: {pattern_id}") | |
| continue | |
| # Boost patterns that are still performing | |
| recent_matches = self._find_recent_pattern_matches(pattern) | |
| if recent_matches: | |
| avg_recent_views = np.mean([m['performance'].views_total for m in recent_matches]) | |
| if avg_recent_views >= self.config['viral_threshold']: | |
| pattern.weight *= 1.1 | |
| pattern.last_validated = current_time | |
| def _find_recent_pattern_matches(self, pattern: AudioPattern, window_days: int = 7) -> List[Dict]: | |
| """Find recent videos matching this pattern""" | |
| cutoff = datetime.now() - timedelta(days=window_days) | |
| matches = [] | |
| for record in self.replay_buffer: | |
| if record['timestamp'] < cutoff: | |
| continue | |
| audio = record['audio_features'] | |
| if audio.niche != pattern.niche or audio.platform != pattern.platform: | |
| continue | |
| # Check similarity | |
| if abs(audio.pace_wpm - pattern.optimal_pace) < 20: | |
| if pattern.optimal_pitch_range[0] <= audio.pitch_mean <= pattern.optimal_pitch_range[1]: | |
| matches.append(record) | |
| return matches | |
| def _detect_anomalies(self): | |
| """Detect anomalous patterns (novel viral strategies)""" | |
| if not self.anomaly_detector.model: | |
| return | |
| X_list = [] | |
| records = [] | |
| for record in list(self.replay_buffer)[-1000:]: | |
| features = record['engineered_features'] | |
| feature_vector = [features.get(k, 0.0) for k in self.feature_names] | |
| X_list.append(feature_vector) | |
| records.append(record) | |
| if not X_list: | |
| return | |
| X = np.array(X_list) | |
| anomaly_scores = self.anomaly_detector.score(X) | |
| # Find overperforming anomalies | |
| for i, (score, record) in enumerate(zip(anomaly_scores, records)): | |
| if score < -0.5 and record['performance'].views_total >= self.config['viral_threshold']: | |
| self.logger.info(f"Anomalous high performer detected: {record['performance'].video_id}") | |
| # Log for further analysis | |
| self._log_anomaly(record, score) | |
| def _log_anomaly(self, record: Dict, anomaly_score: float): | |
| """Log anomalous pattern for analysis""" | |
| anomaly_log = { | |
| 'video_id': record['performance'].video_id, | |
| 'views': record['performance'].views_total, | |
| 'anomaly_score': float(anomaly_score), | |
| 'niche': record['audio_features'].niche, | |
| 'platform': record['audio_features'].platform, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| log_path = self.storage_path / "anomalies.jsonl" | |
| with open(log_path, 'a') as f: | |
| f.write(json.dumps(anomaly_log) + '\n') | |
| def _update_niche_stats(self, audio: AudioFeatures, perf: PerformanceMetrics): | |
| """Update performance statistics per niche""" | |
| key = f"{audio.niche}_{audio.platform}" | |
| stats = self.niche_performance[key] | |
| stats['total_videos'] += 1 | |
| # Running average | |
| n = stats['total_videos'] | |
| stats['avg_views'] = ((n - 1) * stats['avg_views'] + perf.views_total) / n | |
| # ======================================================================== | |
| # PUBLIC API FOR TTS/VOICE-SYNC INTEGRATION | |
| # ======================================================================== | |
| def get_recommended_audio_profile(self, niche: str, platform: str, | |
| beat_type: str, use_rl: bool = True) -> Optional[PatternRecommendation]: | |
| """ | |
| Get recommended audio profile for TTS/voice-sync engines WITH RL optimization. | |
| Returns optimal parameters for generating viral audio. | |
| """ | |
| # Find matching patterns | |
| matching_patterns = [ | |
| p for p in self.discovered_patterns.values() | |
| if p.niche == niche and p.platform == platform | |
| ] | |
| if not matching_patterns: | |
| self.logger.warning(f"No patterns found for {niche}/{platform}, using global patterns") | |
| # Fallback to any patterns from this platform | |
| matching_patterns = [p for p in self.discovered_patterns.values() if p.platform == platform] | |
| if not matching_patterns: | |
| return None | |
| # Get highest efficacy pattern (weighted by recency) | |
| best_pattern = max(matching_patterns, key=lambda p: p.viral_efficacy_score * p.weight * p.confidence) | |
| # Base recommendation from pattern | |
| base_recommendation = PatternRecommendation( | |
| niche=niche, | |
| platform=platform, | |
| beat_type=beat_type, | |
| pace_wpm=best_pattern.optimal_pace, | |
| pitch_base=best_pattern.optimal_pitch_range[0], | |
| pitch_variance=(best_pattern.optimal_pitch_range[1] - best_pattern.optimal_pitch_range[0]) / 2, | |
| energy_level=best_pattern.optimal_energy, | |
| voice_style='dynamic', | |
| hook_placements=best_pattern.hook_timings[:5], | |
| pause_placements=best_pattern.pause_pattern[:5], | |
| emphasis_words=[], | |
| beat_alignment_rules={'target_error': best_pattern.beat_alignment_target}, | |
| syllable_timing_guide={}, | |
| predicted_viral_score=best_pattern.viral_efficacy_score, | |
| confidence=best_pattern.confidence | |
| ) | |
| # Apply RL policy adjustments | |
| if use_rl and self.rl_policy and len(self.rl_reward_history) > 20: | |
| # Build state from pattern features | |
| state = np.array([ | |
| best_pattern.optimal_pace / 200.0, | |
| best_pattern.optimal_pitch_range[0] / 300.0, | |
| best_pattern.optimal_energy, | |
| len(best_pattern.hook_timings) / 10.0, | |
| best_pattern.viral_efficacy_score / 10.0, | |
| best_pattern.confidence, | |
| best_pattern.weight, | |
| *([0] * (self.rl_policy.state_dim - 7)) # Pad to state_dim | |
| ])[:self.rl_policy.state_dim] | |
| # Get RL action | |
| action = self.rl_policy.select_action(state, explore=False) | |
| adjustments = self.rl_policy.decode_action_to_params(action) | |
| # Apply adjustments | |
| base_recommendation.pace_wpm += adjustments['pace_adjustment'] | |
| base_recommendation.pitch_base += adjustments['pitch_adjustment'] | |
| base_recommendation.energy_level += adjustments['energy_adjustment'] | |
| # Adjust hook timings | |
| if base_recommendation.hook_placements: | |
| base_recommendation.hook_placements = [ | |
| max(0, h + adjustments['hook_timing_shift']) | |
| for h in base_recommendation.hook_placements | |
| ] | |
| # Adjust pause durations | |
| if base_recommendation.pause_placements: | |
| base_recommendation.pause_placements = [ | |
| (pos, dur * adjustments['pause_duration_mult']) | |
| for pos, dur in base_recommendation.pause_placements | |
| ] | |
| # Update beat alignment target | |
| base_recommendation.beat_alignment_rules['target_error'] = adjustments['beat_alignment_target'] | |
| self.logger.info(f"Applied RL adjustments: pace={adjustments['pace_adjustment']:.1f}, " | |
| f"pitch={adjustments['pitch_adjustment']:.1f}") | |
| # Clamp to reasonable ranges | |
| base_recommendation.pace_wpm = np.clip(base_recommendation.pace_wpm, 120, 200) | |
| base_recommendation.pitch_base = np.clip(base_recommendation.pitch_base, 100, 400) | |
| base_recommendation.energy_level = np.clip(base_recommendation.energy_level, 0.4, 0.9) | |
| self.logger.info(f"Generated recommendation for {niche}/{platform}: " | |
| f"score={base_recommendation.predicted_viral_score:.3f}, " | |
| f"confidence={base_recommendation.confidence:.2%}") | |
| return base_recommendation | |
| def find_similar_viral_patterns(self, audio_features: AudioFeatures, top_k: int = 5) -> List[Tuple[str, float]]: | |
| """ | |
| Find top K most similar historical viral patterns using embeddings. | |
| Returns list of (pattern_id, similarity_score) tuples. | |
| """ | |
| if not self.discovered_patterns: | |
| return [] | |
| # Compute embedding for query audio | |
| dummy_perf = PerformanceMetrics( | |
| video_id='query', | |
| views_total=0, | |
| retention_2s=0, | |
| retention_15s=0, | |
| completion_rate=0, | |
| replay_rate=0, | |
| velocity_per_hour=0, | |
| velocity_per_day=0, | |
| likes=0, | |
| comments=0, | |
| shares=0, | |
| saves=0, | |
| platform=audio_features.platform, | |
| upload_timestamp=datetime.now() | |
| ) | |
| query_features = AudioFeatureEngineering.compute_full_feature_set(audio_features, dummy_perf) | |
| query_vector = np.array([query_features.get(k, 0.0) for k in self.feature_names]).reshape(1, -1) | |
| query_embedding = self._compute_embeddings(query_vector)[0] | |
| # Compute similarities to all patterns | |
| similarities = [] | |
| for pattern_id, pattern in self.discovered_patterns.items(): | |
| # Build pattern feature vector | |
| pattern_features = { | |
| 'pace_wpm': pattern.optimal_pace, | |
| 'pitch_mean': pattern.optimal_pitch_range[0], | |
| 'pitch_variance': (pattern.optimal_pitch_range[1] - pattern.optimal_pitch_range[0]) / 2, | |
| 'energy_mean': pattern.optimal_energy, | |
| 'beat_alignment_score': pattern.beat_alignment_target, | |
| **{k: 0.0 for k in self.feature_names if k not in ['pace_wpm', 'pitch_mean', 'pitch_variance', 'energy_mean']} | |
| } | |
| pattern_vector = np.array([pattern_features.get(k, 0.0) for k in self.feature_names]).reshape(1, -1) | |
| pattern_embedding = self._compute_embeddings(pattern_vector)[0] | |
| # Cosine similarity | |
| similarity = np.dot(query_embedding, pattern_embedding) / ( | |
| np.linalg.norm(query_embedding) * np.linalg.norm(pattern_embedding) + 1e-8 | |
| ) | |
| similarities.append((pattern_id, float(similarity))) | |
| # Sort by similarity | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| return similarities[:top_k] | |
| def get_optimization_suggestions(self, audio_features: AudioFeatures) -> Dict[str, Any]: | |
| """ | |
| Analyze audio features and provide specific optimization suggestions. | |
| Returns actionable recommendations to improve viral potential. | |
| """ | |
| suggestions = { | |
| 'current_score': 0.0, | |
| 'potential_score': 0.0, | |
| 'improvements': [], | |
| 'warnings': [], | |
| 'similar_successful': [] | |
| } | |
| # Get current prediction | |
| current_score, confidence, breakdown = self.predict_viral_success(audio_features, return_confidence=True) | |
| suggestions['current_score'] = current_score | |
| suggestions['confidence'] = confidence | |
| # Find similar successful patterns | |
| similar = self.find_similar_viral_patterns(audio_features, top_k=3) | |
| suggestions['similar_successful'] = [ | |
| { | |
| 'pattern_id': pid, | |
| 'similarity': sim, | |
| 'efficacy': self.discovered_patterns[pid].viral_efficacy_score | |
| } | |
| for pid, sim in similar if pid in self.discovered_patterns | |
| ] | |
| # Get recommended profile | |
| recommended = self.get_recommended_audio_profile( | |
| audio_features.niche, | |
| audio_features.platform, | |
| audio_features.beat_type | |
| ) | |
| if recommended: | |
| # Compare and suggest improvements | |
| if abs(audio_features.pace_wpm - recommended.pace_wpm) > 15: | |
| suggestions['improvements'].append({ | |
| 'parameter': 'pace', | |
| 'current': audio_features.pace_wpm, | |
| 'recommended': recommended.pace_wpm, | |
| 'impact': 'high', | |
| 'reason': f"Optimal pace for {audio_features.niche} is {recommended.pace_wpm:.0f} WPM" | |
| }) | |
| if abs(audio_features.pitch_mean - recommended.pitch_base) > 30: | |
| suggestions['improvements'].append({ | |
| 'parameter': 'pitch', | |
| 'current': audio_features.pitch_mean, | |
| 'recommended': recommended.pitch_base, | |
| 'impact': 'medium', | |
| 'reason': f"Pitch should be around {recommended.pitch_base:.0f} Hz for better retention" | |
| }) | |
| if abs(audio_features.energy_mean - recommended.energy_level) > 0.15: | |
| suggestions['improvements'].append({ | |
| 'parameter': 'energy', | |
| 'current': audio_features.energy_mean, | |
| 'recommended': recommended.energy_level, | |
| 'impact': 'medium', | |
| 'reason': f"Energy level should be {recommended.energy_level:.2f} for maximum engagement" | |
| }) | |
| # Check hook timing | |
| if audio_features.hook_timing_seconds and audio_features.hook_timing_seconds[0] > 3.0: | |
| suggestions['warnings'].append({ | |
| 'issue': 'late_first_hook', | |
| 'severity': 'high', | |
| 'message': f"First hook at {audio_features.hook_timing_seconds[0]:.1f}s - should be <2s for viral potential" | |
| }) | |
| # Estimate potential score with improvements | |
| suggestions['potential_score'] = current_score * 1.3 # Rough estimate | |
| return suggestions | |
| def predict_viral_success(self, audio_features: AudioFeatures, | |
| return_confidence: bool = True) -> Union[float, Tuple[float, float, Dict]]: | |
| """ | |
| Predict viral success score for given audio features with confidence and breakdown. | |
| Returns score 0-10+ (higher = more likely to hit 5M+ views). | |
| """ | |
| if not self.virality_predictor.is_trained: | |
| self.logger.warning("Model not trained yet") | |
| return 0.0 if not return_confidence else (0.0, 0.0, {}) | |
| # Create dummy performance for feature engineering | |
| dummy_perf = PerformanceMetrics( | |
| video_id='prediction', | |
| views_total=0, | |
| retention_2s=0.8, | |
| retention_15s=0.5, | |
| completion_rate=0.3, | |
| replay_rate=0.1, | |
| velocity_per_hour=1000, | |
| velocity_per_day=10000, | |
| likes=0, | |
| comments=0, | |
| shares=0, | |
| saves=0, | |
| platform=audio_features.platform, | |
| upload_timestamp=datetime.now() | |
| ) | |
| # Compute features | |
| engineered_features = AudioFeatureEngineering.compute_full_feature_set(audio_features, dummy_perf) | |
| feature_vector = [engineered_features.get(k, 0.0) for k in self.feature_names] | |
| X = np.array(feature_vector).reshape(1, -1) | |
| # Try stratified model first | |
| pn_key = f"{audio_features.platform}_{audio_features.niche}" | |
| if pn_key in self.stratified_models: | |
| predicted_views = self.stratified_models[pn_key].predict_views(X)[0] | |
| predicted_retention = self.stratified_models[pn_key].predict_retention(X)[0] | |
| else: | |
| predicted_views = self.virality_predictor.predict_views(X)[0] | |
| predicted_retention = self.virality_predictor.predict_retention(X)[0] | |
| predicted_velocity = self.virality_predictor.predict_velocity(X)[0] | |
| predicted_engagement = self.virality_predictor.predict_engagement(X)[0] | |
| # Compute composite viral score | |
| viral_score = ( | |
| (predicted_views / 1_000_000) * 0.40 + | |
| predicted_retention * 15 * 0.30 + | |
| predicted_velocity / 500 * 0.15 + | |
| predicted_engagement * 500 * 0.15 | |
| ) | |
| if not return_confidence: | |
| return float(viral_score) | |
| # Compute confidence based on pattern similarity | |
| confidence = 0.5 # Default | |
| # Check if similar patterns exist | |
| matching_patterns = [ | |
| p for p in self.discovered_patterns.values() | |
| if p.niche == audio_features.niche and p.platform == audio_features.platform | |
| ] | |
| if matching_patterns: | |
| # High confidence if features match known patterns | |
| best_pattern = max(matching_patterns, key=lambda p: p.viral_efficacy_score) | |
| pace_match = 1.0 - abs(audio_features.pace_wpm - best_pattern.optimal_pace) / 50.0 | |
| pitch_match = 1.0 if best_pattern.optimal_pitch_range[0] <= audio_features.pitch_mean <= best_pattern.optimal_pitch_range[1] else 0.5 | |
| confidence = np.clip((pace_match + pitch_match) / 2, 0.3, 0.95) | |
| # Build detailed breakdown | |
| breakdown = { | |
| 'predicted_views': float(predicted_views), | |
| 'predicted_retention': float(predicted_retention), | |
| 'predicted_velocity': float(predicted_velocity), | |
| 'predicted_engagement': float(predicted_engagement), | |
| 'model_used': 'stratified' if pn_key in self.stratified_models else 'global', | |
| 'similar_patterns_found': len(matching_patterns), | |
| 'confidence_level': 'high' if confidence > 0.7 else 'medium' if confidence > 0.5 else 'low' | |
| } | |
| return float(viral_score), float(confidence), breakdown | |
| def get_top_patterns(self, n: int = 10, niche: Optional[str] = None) -> List[AudioPattern]: | |
| """Get top N patterns by viral efficacy""" | |
| patterns = list(self.discovered_patterns.values()) | |
| if niche: | |
| patterns = [p for p in patterns if p.niche == niche] | |
| patterns.sort(key=lambda p: p.viral_efficacy_score * p.weight, reverse=True) | |
| return patterns[:n] | |
| def get_feature_importance(self) -> Dict[str, float]: | |
| """Get feature importance rankings""" | |
| if not self.virality_predictor.feature_importance: | |
| return {} | |
| importance_dict = {} | |
| for idx, importance in self.virality_predictor.feature_importance.items(): | |
| if idx < len(self.feature_names): | |
| importance_dict[self.feature_names[idx]] = float(importance) | |
| return dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)) | |
| def get_niche_performance_summary(self) -> Dict[str, Dict]: | |
| """Get performance summary for all niches""" | |
| return dict(self.niche_performance) | |
| def schedule_continuous_learning(self, check_interval_hours: int = 6): | |
| """ | |
| Schedule continuous learning updates. | |
| In production, this would be called by a scheduler (cron, Airflow, etc.) | |
| """ | |
| from datetime import timedelta | |
| last_update = datetime.now() | |
| def check_and_update(): | |
| nonlocal last_update | |
| current_time = datetime.now() | |
| if (current_time - last_update).total_hours() >= check_interval_hours: | |
| self.logger.info("๐ Running scheduled model update...") | |
| # Retrain models with latest data | |
| if len(self.replay_buffer) >= self.config['min_sample_size']: | |
| self._train_models() | |
| self._discover_patterns() | |
| self._update_pattern_weights() | |
| self._update_rl_policy() | |
| # Save updated state | |
| self.save_state() | |
| last_update = current_time | |
| self.logger.info("โ Scheduled update complete") | |
| return check_and_update | |
| def evaluate_recent_predictions(self, hours_back: int = 24) -> Dict[str, float]: | |
| """ | |
| Evaluate how well recent predictions matched actual performance. | |
| Critical for maintaining 5M+ baseline accuracy. | |
| """ | |
| cutoff = datetime.now() - timedelta(hours=hours_back) | |
| recent_records = [r for r in self.replay_buffer if r['timestamp'] >= cutoff] | |
| if not recent_records: | |
| return {'error': 'No recent data'} | |
| # Compare predictions vs actuals | |
| predictions = [] | |
| actuals = [] | |
| for record in recent_records: | |
| audio = record['audio_features'] | |
| perf = record['performance'] | |
| # Get prediction | |
| predicted_score, confidence, _ = self.predict_viral_success(audio, return_confidence=True) | |
| actual_score = perf.views_total / 1_000_000 | |
| predictions.append(predicted_score) | |
| actuals.append(actual_score) | |
| predictions = np.array(predictions) | |
| actuals = np.array(actuals) | |
| # Compute metrics | |
| from sklearn.metrics import mean_absolute_error, r2_score | |
| mae = mean_absolute_error(actuals, predictions) | |
| r2 = r2_score(actuals, predictions) | |
| # 5M+ classification accuracy | |
| threshold = 5.0 | |
| pred_viral = predictions >= threshold | |
| actual_viral = actuals >= threshold | |
| viral_accuracy = np.mean(pred_viral == actual_viral) | |
| metrics = { | |
| 'samples': len(recent_records), | |
| 'mae_millions': float(mae), | |
| 'r2_score': float(r2), | |
| 'viral_5m_accuracy': float(viral_accuracy), | |
| 'false_positives': int(np.sum((predictions >= threshold) & (actuals < threshold))), | |
| 'false_negatives': int(np.sum((predictions < threshold) & (actuals >= threshold))), | |
| 'time_window_hours': hours_back | |
| } | |
| self.logger.info(f"Prediction Evaluation ({hours_back}h): MAE={mae:.2f}M, Rยฒ={r2:.3f}, " | |
| f"5M+ Acc={viral_accuracy:.2%}") | |
| return metrics | |
| # ======================================================================== | |
| # PERSISTENCE & STATE MANAGEMENT | |
| # ======================================================================== | |
| def save_state(self): | |
| """Save learner state to disk""" | |
| self.logger.info("Saving learner state...") | |
| state = { | |
| 'discovered_patterns': {k: self._pattern_to_dict(v) for k, v in self.discovered_patterns.items()}, | |
| 'niche_performance': dict(self.niche_performance), | |
| 'feature_names': self.feature_names, | |
| 'config': self.config, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| with open(self.storage_path / 'learner_state.json', 'w') as f: | |
| json.dump(state, f, indent=2) | |
| # Save models | |
| if self.virality_predictor.is_trained: | |
| with open(self.storage_path / 'virality_predictor.pkl', 'wb') as f: | |
| pickle.dump(self.virality_predictor, f) | |
| self.logger.info("State saved successfully") | |
| def load_state(self): | |
| """Load learner state from disk""" | |
| state_file = self.storage_path / 'learner_state.json' | |
| if not state_file.exists(): | |
| self.logger.warning("No saved state found") | |
| return | |
| self.logger.info("Loading learner state...") | |
| with open(state_file, 'r') as f: | |
| state = json.load(f) | |
| self.discovered_patterns = {k: self._dict_to_pattern(v) for k, v in state['discovered_patterns'].items()} | |
| self.niche_performance = defaultdict(lambda: {'total_videos': 0, 'avg_views': 0.0, 'top_patterns': []}, | |
| state['niche_performance']) | |
| self.feature_names = state['feature_names'] | |
| self.config.update(state['config']) | |
| # Load models | |
| model_file = self.storage_path / 'virality_predictor.pkl' | |
| if model_file.exists(): | |
| with open(model_file, 'rb') as f: | |
| self.virality_predictor = pickle.load(f) | |
| self.logger.info(f"State loaded: {len(self.discovered_patterns)} patterns") | |
| def _pattern_to_dict(self, pattern: AudioPattern) -> Dict: | |
| """Convert AudioPattern to JSON-serializable dict""" | |
| return { | |
| 'pattern_id': pattern.pattern_id, | |
| 'niche': pattern.niche, | |
| 'platform': pattern.platform, | |
| 'optimal_pace': pattern.optimal_pace, | |
| 'optimal_pitch_range': pattern.optimal_pitch_range, | |
| 'optimal_energy': pattern.optimal_energy, | |
| 'hook_timings': pattern.hook_timings, | |
| 'pause_pattern': pattern.pause_pattern, | |
| 'beat_alignment_target': pattern.beat_alignment_target, | |
| 'emotion_arc': pattern.emotion_arc, | |
| 'viral_efficacy_score': pattern.viral_efficacy_score, | |
| 'sample_count': pattern.sample_count, | |
| 'avg_views': pattern.avg_views, | |
| 'avg_completion': pattern.avg_completion, | |
| 'confidence': pattern.confidence, | |
| 'discovered_at': pattern.discovered_at.isoformat(), | |
| 'last_validated': pattern.last_validated.isoformat(), | |
| 'trend_status': pattern.trend_status, | |
| 'weight': pattern.weight, | |
| 'decay_rate': pattern.decay_rate | |
| } | |
| def _dict_to_pattern(self, d: Dict) -> AudioPattern: | |
| """Convert dict back to AudioPattern""" | |
| return AudioPattern( | |
| pattern_id=d['pattern_id'], | |
| niche=d['niche'], | |
| platform=d['platform'], | |
| optimal_pace=d['optimal_pace'], | |
| optimal_pitch_range=tuple(d['optimal_pitch_range']), | |
| optimal_energy=d['optimal_energy'], | |
| hook_timings=d['hook_timings'], | |
| pause_pattern=[tuple(p) for p in d['pause_pattern']], | |
| beat_alignment_target=d['beat_alignment_target'], | |
| emotion_arc=d['emotion_arc'], | |
| viral_efficacy_score=d['viral_efficacy_score'], | |
| sample_count=d['sample_count'], | |
| avg_views=d['avg_views'], | |
| avg_completion=d['avg_completion'], | |
| confidence=d['confidence'], | |
| discovered_at=datetime.fromisoformat(d['discovered_at']), | |
| last_validated=datetime.fromisoformat(d['last_validated']), | |
| trend_status=d['trend_status'], | |
| weight=d['weight'], | |
| decay_rate=d['decay_rate'] | |
| ) | |
| # ============================================================================ | |
| # EXAMPLE USAGE & INTEGRATION | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| print("=" * 80) | |
| print("๐ฏ AUDIO PATTERN LEARNER - Production Ready for 5M+ Views") | |
| print("=" * 80) | |
| # Initialize learner | |
| learner = AudioPatternLearner(storage_path="./viral_audio_learner") | |
| # Load previous state if exists | |
| learner.load_state() | |
| # === SIMULATION: Ingest batch of videos === | |
| print("\n๐ Simulating video batch ingestion...") | |
| # Create sample high-performer (7.5M views) | |
| sample_audio_viral = AudioFeatures( | |
| pace_wpm=165, | |
| pitch_mean=220.0, | |
| pitch_variance=50.0, | |
| energy_mean=0.75, | |
| energy_variance=0.15, | |
| tempo_bpm=128, | |
| hook_timing_seconds=[1.2, 7.8, 14.5], | |
| hook_emphasis_amplitude=[0.95, 0.88, 0.82], | |
| hook_pitch_jump=[55, 48, 42], | |
| pause_durations=[0.35, 0.5, 0.4], | |
| pause_positions=[4.8, 11.5, 19.2], | |
| beat_alignment_error=0.03, | |
| syllable_timing=[0.12, 0.28, 0.45, 0.62, 0.78], | |
| mfcc=np.random.randn(13, 100), | |
| spectral_centroid=np.random.randn(100) + 2000, | |
| spectral_rolloff=np.random.randn(100) + 4000, | |
| zero_crossing_rate=np.random.randn(100) * 0.1 + 0.15, | |
| chroma=np.random.randn(12, 100), | |
| harmonic_noise_ratio=0.88, | |
| emotion_trajectory=['building', 'peak', 'sustain', 'peak'], | |
| emotion_intensity=[0.65, 0.92, 0.78, 0.88], | |
| voice_tone='energetic', | |
| phoneme_timing={'a': 0.1, 'e': 0.15, 'i': 0.12}, | |
| niche='finance', | |
| platform='tiktok', | |
| beat_type='trap', | |
| voice_style='male_young', | |
| language='en', | |
| music_track='trending_beat_001', | |
| is_trending_beat=True, | |
| trend_timestamp=datetime.now() | |
| ) | |
| sample_perf_viral = PerformanceMetrics( | |
| video_id='vid_viral_001', | |
| views_total=7_500_000, | |
| retention_2s=0.87, | |
| retention_15s=0.58, | |
| completion_rate=0.38, | |
| replay_rate=0.14, | |
| velocity_per_hour=18500, | |
| velocity_per_day=225000, | |
| likes=520000, | |
| comments=15000, | |
| shares=92000, | |
| saves=135000, | |
| platform='tiktok', | |
| upload_timestamp=datetime.now() | |
| ) | |
| # Create sample low-performer (100K views) | |
| sample_audio_low = AudioFeatures( | |
| pace_wpm=140, | |
| pitch_mean=180.0, | |
| pitch_variance=30.0, | |
| energy_mean=0.55, | |
| energy_variance=0.08, | |
| tempo_bpm=100, | |
| hook_timing_seconds=[5.2, 15.8], | |
| hook_emphasis_amplitude=[0.65, 0.60], | |
| hook_pitch_jump=[25, 20], | |
| pause_durations=[0.2], | |
| pause_positions=[10.0], | |
| beat_alignment_error=0.15, | |
| syllable_timing=[0.2, 0.4, 0.6], | |
| mfcc=np.random.randn(13, 100), | |
| spectral_centroid=np.random.randn(100) + 1500, | |
| spectral_rolloff=np.random.randn(100) + 3000, | |
| zero_crossing_rate=np.random.randn(100) * 0.1 + 0.12, | |
| chroma=np.random.randn(12, 100), | |
| harmonic_noise_ratio=0.72, | |
| emotion_trajectory=['steady', 'building'], | |
| emotion_intensity=[0.5, 0.6], | |
| voice_tone='calm', | |
| phoneme_timing={'a': 0.15, 'e': 0.18}, | |
| niche='finance', | |
| platform='tiktok', | |
| beat_type='lofi', | |
| voice_style='male_mature', | |
| language='en', | |
| music_track=None, | |
| is_trending_beat=False, | |
| trend_timestamp=datetime.now() | |
| ) | |
| sample_perf_low = PerformanceMetrics( | |
| video_id='vid_low_001', | |
| views_total=120_000, | |
| retention_2s=0.62, | |
| retention_15s=0.35, | |
| completion_rate=0.18, | |
| replay_rate=0.04, | |
| velocity_per_hour=250, | |
| velocity_per_day=3500, | |
| likes=3200, | |
| comments=150, | |
| shares=280, | |
| saves=420, | |
| platform='tiktok', | |
| upload_timestamp=datetime.now() | |
| ) | |
| # Ingest batch (simulate 50 videos) | |
| audio_batch = [sample_audio_viral] * 30 + [sample_audio_low] * 20 | |
| perf_batch = [sample_perf_viral] * 30 + [sample_perf_low] * 20 | |
| learner.ingest_video_batch(audio_batch, perf_batch) | |
| # === TEST 1: Get recommendation === | |
| print("\n" + "=" * 80) | |
| print("๐ค TEST 1: Get Recommended Audio Profile") | |
| print("=" * 80) | |
| recommendation = learner.get_recommended_audio_profile('finance', 'tiktok', 'trap', use_rl=True) | |
| if recommendation: | |
| print(f"\nโ Recommendation for finance/tiktok/trap:") | |
| print(f" ๐ Predicted Viral Score: {recommendation.predicted_viral_score:.2f}/10") | |
| print(f" ๐ฏ Confidence: {recommendation.confidence:.1%}") | |
| print(f" ๐ฃ๏ธ Pace: {recommendation.pace_wpm:.0f} WPM") | |
| print(f" ๐ต Pitch: {recommendation.pitch_base:.0f} Hz ยฑ {recommendation.pitch_variance:.0f}") | |
| print(f" โก Energy: {recommendation.energy_level:.2f}") | |
| print(f" ๐ฃ Hook Placements: {[f'{h:.1f}s' for h in recommendation.hook_placements[:3]]}") | |
| print(f" โธ๏ธ Pause Placements: {[(f'{p:.1f}s', f'{d:.2f}s') for p, d in recommendation.pause_placements[:2]]}") | |
| # === TEST 2: Predict viral success === | |
| print("\n" + "=" * 80) | |
| print("๐ฎ TEST 2: Predict Viral Success for New Audio") | |
| print("=" * 80) | |
| test_audio = sample_audio_viral | |
| viral_score, confidence, breakdown = learner.predict_viral_success(test_audio, return_confidence=True) | |
| print(f"\nโ Prediction Results:") | |
| print(f" ๐ Viral Score: {viral_score:.2f}/10 (Target: 5+ for 5M+ views)") | |
| print(f" ๐ฏ Confidence: {confidence:.1%} ({breakdown['confidence_level']})") | |
| print(f" ๐๏ธ Predicted Views: {breakdown['predicted_views']:,.0f}") | |
| print(f" โฑ๏ธ Predicted Retention: {breakdown['predicted_retention']:.1%}") | |
| print(f" ๐ Predicted Velocity: {breakdown['predicted_velocity']:.0f}/hour") | |
| print(f" ๐ฌ Predicted Engagement: {breakdown['predicted_engagement']:.4f}") | |
| print(f" ๐ค Model Used: {breakdown['model_used']}") | |
| print(f" ๐ Similar Patterns: {breakdown['similar_patterns_found']}") | |
| # === TEST 3: Get optimization suggestions === | |
| print("\n" + "=" * 80) | |
| print("๐ก TEST 3: Get Optimization Suggestions") | |
| print("=" * 80) | |
| suggestions = learner.get_optimization_suggestions(sample_audio_low) | |
| print(f"\n๐ Current Score: {suggestions['current_score']:.2f}/10") | |
| print(f"๐ Potential Score: {suggestions['potential_score']:.2f}/10 (with improvements)") | |
| print(f"๐ฏ Confidence: {suggestions['confidence']:.1%}") | |
| if suggestions['improvements']: | |
| print(f"\n๐ง Recommended Improvements:") | |
| for imp in suggestions['improvements']: | |
| print(f" โข {imp['parameter'].upper()}: {imp['current']:.1f} โ {imp['recommended']:.1f}") | |
| print(f" Impact: {imp['impact']} | {imp['reason']}") | |
| if suggestions['warnings']: | |
| print(f"\nโ ๏ธ Warnings:") | |
| for warn in suggestions['warnings']: | |
| print(f" โข [{warn['severity'].upper()}] {warn['message']}") | |
| if suggestions['similar_successful']: | |
| print(f"\n๐ฏ Similar Successful Patterns:") | |
| for sim in suggestions['similar_successful']: | |
| print(f" โข Pattern {sim['pattern_id']}: similarity={sim['similarity']:.2f}, efficacy={sim['efficacy']:.2f}") | |
| # === TEST 4: Top patterns === | |
| print("\n" + "=" * 80) | |
| print("๐ TEST 4: Top Viral Patterns") | |
| print("=" * 80) | |
| top_patterns = learner.get_top_patterns(n=5, niche='finance') | |
| print(f"\n๐ Top 5 Patterns for Finance:") | |
| for i, pattern in enumerate(top_patterns, 1): | |
| print(f" {i}. {pattern.pattern_id}") | |
| print(f" Efficacy: {pattern.viral_efficacy_score:.3f} | Samples: {pattern.sample_count}") | |
| print(f" Pace: {pattern.optimal_pace:.0f} WPM | Pitch: {pattern.optimal_pitch_range}") | |
| print(f" Weight: {pattern.weight:.2f} | Confidence: {pattern.confidence:.1%}") | |
| # === TEST 5: Feature importance === | |
| print("\n" + "=" * 80) | |
| print("๐ TEST 5: Feature Importance Analysis") | |
| print("=" * 80) | |
| importance = learner.get_feature_importance() | |
| print(f"\n๐ฏ Top 10 Features Driving Virality:") | |
| for i, (feat, imp) in enumerate(list(importance.items())[:10], 1): | |
| print(f" {i}. {feat}: {imp:.4f}") | |
| # === TEST 6: Model evaluation === | |
| print("\n" + "=" * 80) | |
| print("๐ TEST 6: Model Performance Metrics") | |
| print("=" * 80) | |
| metrics = learner.virality_predictor.get_evaluation_metrics() | |
| if metrics: | |
| print(f"\nโ Model Performance:") | |
| print(f" Views RMSE: {metrics['views_rmse']:,.0f}") | |
| print(f" Views MAE: {metrics['views_mae']:,.0f}") | |
| print(f" Views Rยฒ: {metrics['views_r2']:.3f}") | |
| print(f" Retention RMSE: {metrics['retention_rmse']:.3f}") | |
| print(f" Retention Rยฒ: {metrics['retention_r2']:.3f}") | |
| # === Save state === | |
| learner.save_state() | |
| print("\n" + "=" * 80) | |
| print("โ ALL TESTS COMPLETE - Pattern Learner Ready for Production") | |
| print("=" * 80) | |
| print("\n๐ Integration Ready:") | |
| print(" โข Pull data from: audio_performance_store.py") | |
| print(" โข Push recommendations to: tts_engine.py, voice_sync.py") | |
| print(" โข Log anomalies to: audio_anomaly_logger.py") | |
| print(" โข Schedule continuous learning every 6 hours") | |
| print("\n๐ก Key Features:") | |
| print(" โ Multi-target ML prediction (views, retention, velocity, engagement)") | |
| print(" โ Stratified learning per platform/niche") | |
| print(" โ RL policy for continuous optimization") | |
| print(" โ Embedding-based similarity search") | |
| print(" โ Confidence scoring & uncertainty quantification") | |
| print(" โ Real-time API for TTS/voice-sync") | |
| print(" โ Continuous pattern decay & reinforcement") | |
| print(" โ Anomaly detection for novel strategies") | |
| print(" โ Cross-platform normalization") | |
| print("\n๐ฏ Target: Consistently predict and generate 5M+ view patterns") | |
| print("=" * 80) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment