Created
December 31, 2025 00:33
-
-
Save bogged-broker/fca2e3b869e473e3520bb30618997593 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| audio_performance_store.py | |
| CRITICAL: Single source of truth for all audio decisions in autonomous viral content system. | |
| Every downstream learning, punishment, and reward depends on this data. | |
| SCALE: 20k-100k videos/day, append-only with indexed retrieval | |
| INTEGRATION: Orchestration scheduler, enforcement layers, RL feedback loops | |
| FAILURE MODE: Incorrect data = catastrophic system failure | |
| SCHEMA VERSION: 2.0.0 | |
| """ | |
| import sqlite3 | |
| import threading | |
| import time | |
| import hashlib | |
| import json | |
| import pickle | |
| from collections import defaultdict, deque | |
| from contextlib import contextmanager | |
| from dataclasses import dataclass, field, asdict | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any, Tuple, Callable | |
| from enum import Enum | |
| import numpy as np | |
| import logging | |
| from statistics import mean, stdev | |
| from sklearn.decomposition import PCA | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor | |
| from sklearn.model_selection import train_test_split, cross_val_score, TimeSeriesSplit | |
| from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error | |
| from scipy.spatial.distance import cosine, euclidean | |
| from scipy.stats import pearsonr, norm | |
| from scipy.signal import find_peaks | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Attempt to import deep learning libraries (graceful degradation if not available) | |
| try: | |
| import torch | |
| import torch.nn as nn | |
| TORCH_AVAILABLE = True | |
| except ImportError: | |
| TORCH_AVAILABLE = False | |
| logger = logging.getLogger(__name__) | |
| logger.warning("PyTorch not available - using GBM fallback for sequence modeling") | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Schema version | |
| SCHEMA_VERSION = "2.0.0" | |
| # Access control | |
| AUTHORIZED_MODULES = { | |
| "audio_generation_controller", | |
| "audio_reinforcement_loop", | |
| "feedback_ingestor", | |
| "scheduler", | |
| "audio_pattern_learner" | |
| } | |
| class UnauthorizedAccessError(Exception): | |
| """Raised when unauthorized module attempts access.""" | |
| pass | |
| class DataIntegrityError(Exception): | |
| """Raised when data validation fails.""" | |
| pass | |
| class EventType(Enum): | |
| """Event types for orchestration integration.""" | |
| EXTREME_SUCCESS = "extreme_success" | |
| EXTREME_FAILURE = "extreme_failure" | |
| ANOMALY_DETECTED = "anomaly_detected" | |
| RECORD_STORED = "record_stored" | |
| THRESHOLD_CROSSED = "threshold_crossed" | |
| def verify_caller_authorization(): | |
| """Verify calling module is authorized. MANDATORY for all public methods.""" | |
| import inspect | |
| frame = inspect.currentframe() | |
| try: | |
| caller_frame = frame.f_back.f_back | |
| caller_module = inspect.getmodule(caller_frame) | |
| if caller_module: | |
| module_name = caller_module.__name__.split('.')[-1] | |
| if module_name not in AUTHORIZED_MODULES: | |
| raise UnauthorizedAccessError( | |
| f"SECURITY: Module '{module_name}' unauthorized. " | |
| f"Authorized: {AUTHORIZED_MODULES}" | |
| ) | |
| finally: | |
| del frame | |
| @dataclass | |
| class SyllableLevelTiming: | |
| """Time-segmented syllable-level audio features.""" | |
| syllable_index: int | |
| start_time_ms: float | |
| duration_ms: float | |
| pitch_hz: float | |
| energy_db: float | |
| beat_alignment_error_ms: float # Deviation from expected beat position | |
| phoneme_sequence: List[str] | |
| stress_level: float # 0.0 to 1.0 | |
| @dataclass | |
| class WordLevelFeatures: | |
| """Word-level emotion and delivery metrics.""" | |
| word_index: int | |
| word_text: str | |
| start_time_ms: float | |
| duration_ms: float | |
| emotion_intensity: float # 0.0 to 1.0 | |
| emotion_class: str # e.g., "excited", "calm", "urgent" | |
| emphasis_score: float # Relative emphasis vs surrounding words | |
| clarity_score: float # Pronunciation clarity | |
| @dataclass | |
| class PitchContour: | |
| """Time-series pitch data with statistical features.""" | |
| timestamps_ms: List[float] | |
| pitch_hz_values: List[float] | |
| pitch_variance: float | |
| pitch_range_semitones: float | |
| pitch_slope: float # Linear regression slope | |
| pitch_inflection_points: List[int] # Indices of significant changes | |
| @dataclass | |
| class EnergyEnvelope: | |
| """Time-series energy/amplitude data.""" | |
| timestamps_ms: List[float] | |
| energy_db_values: List[float] | |
| peak_energy_db: float | |
| mean_energy_db: float | |
| energy_variance: float | |
| dynamic_range_db: float | |
| attack_times_ms: List[float] # Time to reach peaks | |
| decay_times_ms: List[float] # Time from peaks to valleys | |
| @dataclass | |
| class PauseDensityMetrics: | |
| """Pause timing and distribution analysis.""" | |
| total_pause_count: int | |
| pause_durations_ms: List[float] | |
| pause_positions_ms: List[float] | |
| mean_pause_duration_ms: float | |
| pause_variance_ms: float | |
| inter_pause_intervals_ms: List[float] | |
| strategic_pause_count: int # Pauses before hooks/key phrases | |
| @dataclass | |
| class BeatAlignmentMetrics: | |
| """Multi-timestamp beat synchronization metrics.""" | |
| beat_timestamps_ms: List[float] # Expected beat positions | |
| syllable_timestamps_ms: List[float] # Actual syllable positions | |
| alignment_errors_ms: List[float] # Per-syllable deviations | |
| mean_error_ms: float | |
| max_error_ms: float | |
| on_beat_percentage: float # % within acceptable tolerance | |
| sync_quality_score: float # 0.0 to 1.0 | |
| @dataclass | |
| class SpectralFeatures: | |
| """Spectral analysis across frequency bands.""" | |
| timestamps_ms: List[float] | |
| low_band_energy: List[float] # 0-200 Hz | |
| mid_low_energy: List[float] # 200-500 Hz | |
| mid_energy: List[float] # 500-2000 Hz | |
| mid_high_energy: List[float] # 2000-4000 Hz | |
| high_energy: List[float] # 4000+ Hz | |
| spectral_centroid: List[float] | |
| spectral_rolloff: List[float] | |
| dominant_band_per_segment: List[str] | |
| @dataclass | |
| class SegmentedAudioFeatures: | |
| """Complete time-segmented audio feature set.""" | |
| # Syllable-level data | |
| syllable_features: List[SyllableLevelTiming] | |
| # Word-level data | |
| word_features: List[WordLevelFeatures] | |
| # Time-series data | |
| pitch_contour: PitchContour | |
| energy_envelope: EnergyEnvelope | |
| # Timing metrics | |
| pause_metrics: PauseDensityMetrics | |
| beat_alignment: BeatAlignmentMetrics | |
| # Spectral data | |
| spectral_features: SpectralFeatures | |
| # Overall metrics | |
| total_duration_ms: float | |
| words_per_minute: float | |
| syllables_per_second: float | |
| # Extraction metadata | |
| extraction_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| extractor_version: str = SCHEMA_VERSION | |
| @dataclass | |
| class PlatformMetrics: | |
| """Platform-normalized performance metrics with negative signals.""" | |
| # Retention breakdown | |
| retention_1s: float # Critical early hook | |
| retention_2s: float | |
| retention_3s: float | |
| completion_rate: float | |
| # Engagement signals | |
| rewatch_count: int | |
| rewatch_rate: float # Rewatches per view | |
| shares: int | |
| shares_per_impression: float | |
| saves: int | |
| # Negative signals (CRITICAL for failure detection) | |
| scroll_away_velocity: float # Avg ms before skip | |
| mute_rate: float # % who muted audio | |
| skip_rate: float # % who skipped before completion | |
| negative_feedback_count: int # "Not interested" etc. | |
| # Normalized scores | |
| platform_engagement_score: float # Platform-specific normalization | |
| virality_coefficient: float # Shares / (views * time_decay) | |
| # Collection metadata | |
| platform: str | |
| collection_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| metrics_version: str = SCHEMA_VERSION | |
| @dataclass | |
| class AudioRecord: | |
| """Complete append-only record for a single video's audio performance.""" | |
| # Primary identifiers | |
| record_id: str # Unique hash: video_id + timestamp | |
| video_id: str | |
| timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| # Audio features (immutable after creation) | |
| audio_features: SegmentedAudioFeatures | |
| # Platform performance (immutable after final collection) | |
| platform_metrics: PlatformMetrics | |
| # Mandatory tags for retrieval | |
| niche: str | |
| platform: str | |
| beat_id: str | |
| beat_version_lineage: str # e.g., "beat_v3 <- beat_v2 <- beat_v1" | |
| voice_profile_hash: str # SHA256 of voice config | |
| orchestration_job_id: str | |
| # Additional context | |
| language: str | |
| trend_id: Optional[str] = None | |
| content_type: str = "audio_overlay" # For future expansion | |
| # Metadata (append-only guarantees) | |
| schema_version: str = SCHEMA_VERSION | |
| ingestion_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| is_anomaly: bool = False | |
| anomaly_reason: Optional[str] = None | |
| def __post_init__(self): | |
| """Generate deterministic record_id.""" | |
| if not self.record_id: | |
| hash_input = f"{self.video_id}:{self.timestamp}:{self.orchestration_job_id}" | |
| self.record_id = hashlib.sha256(hash_input.encode()).hexdigest()[:16] | |
| class PhaseAwareEmbeddingEngine: | |
| """ | |
| Enhanced embedding engine with microsecond-level phase timing. | |
| Critical for distinguishing viral from non-viral patterns. | |
| """ | |
| def __init__(self): | |
| self.embedding_dim = 128 | |
| self.phase_dim = 32 # Dedicated phase features | |
| self.pca = None | |
| self.scaler = StandardScaler() | |
| self.lock = threading.Lock() | |
| self.feature_cache = deque(maxlen=10000) | |
| def compute_phase_aware_embedding(self, audio_features: SegmentedAudioFeatures) -> np.ndarray: | |
| """ | |
| Compute phase-aware embedding with microsecond timing precision. | |
| Returns 128-dim embedding with dedicated phase sequence features. | |
| """ | |
| features = [] | |
| # === PHASE SEQUENCE FEATURES (32 dimensions) === | |
| if audio_features.syllable_features and audio_features.beat_alignment.beat_timestamps_ms: | |
| # Extract phase offsets for first 16 syllables | |
| phase_offsets = [] | |
| beat_times = audio_features.beat_alignment.beat_timestamps_ms | |
| for i, syl in enumerate(audio_features.syllable_features[:16]): | |
| # Find nearest beat | |
| nearest_beat_idx = np.argmin([abs(syl.start_time_ms - bt) for bt in beat_times]) | |
| nearest_beat = beat_times[nearest_beat_idx] | |
| # Phase offset in milliseconds (can be negative) | |
| phase_offset = syl.start_time_ms - nearest_beat | |
| phase_offsets.append(phase_offset) | |
| # Pad to 16 if needed | |
| while len(phase_offsets) < 16: | |
| phase_offsets.append(0.0) | |
| features.extend(phase_offsets[:16]) | |
| # Phase sequence statistics | |
| features.extend([ | |
| np.mean(phase_offsets), | |
| np.std(phase_offsets), | |
| np.max(np.abs(phase_offsets)), | |
| np.mean(np.diff(phase_offsets)) if len(phase_offsets) > 1 else 0.0 # Phase drift | |
| ]) | |
| # Beat-phase microscale timing (critical for virality) | |
| beat_periods = np.diff(beat_times) if len(beat_times) > 1 else [500] | |
| features.extend([ | |
| np.mean(beat_periods), | |
| np.std(beat_periods), | |
| np.min(beat_periods) if len(beat_periods) > 0 else 500, | |
| np.max(beat_periods) if len(beat_periods) > 0 else 500 | |
| ]) | |
| # Tempo-normalized phase alignment | |
| if len(beat_periods) > 0: | |
| avg_period = np.mean(beat_periods) | |
| normalized_offsets = [abs(off) / avg_period for off in phase_offsets] | |
| features.extend([ | |
| np.mean(normalized_offsets), | |
| np.max(normalized_offsets), | |
| sum(1 for x in normalized_offsets if x < 0.05) / len(normalized_offsets) # % within 5% of beat | |
| ]) | |
| else: | |
| features.extend([0.0, 0.0, 0.0]) | |
| # Phase velocity (rate of change) | |
| if len(phase_offsets) > 2: | |
| phase_velocity = np.diff(phase_offsets) | |
| features.extend([ | |
| np.mean(phase_velocity), | |
| np.std(phase_velocity) | |
| ]) | |
| else: | |
| features.extend([0.0, 0.0]) | |
| else: | |
| features.extend([0.0] * 32) # No phase data | |
| # === SYLLABLE TIMING FEATURES (20 dimensions) === | |
| if audio_features.syllable_features: | |
| syl_durations = [s.duration_ms for s in audio_features.syllable_features[:20]] | |
| syl_energies = [s.energy_db for s in audio_features.syllable_features[:20]] | |
| syl_pitches = [s.pitch_hz for s in audio_features.syllable_features[:20]] | |
| syl_stress = [s.stress_level for s in audio_features.syllable_features[:20]] | |
| features.extend([ | |
| np.mean(syl_durations), np.std(syl_durations), | |
| np.min(syl_durations), np.max(syl_durations), | |
| np.mean(syl_energies), np.std(syl_energies), | |
| np.mean(syl_pitches), np.std(syl_pitches), | |
| np.mean(syl_stress), np.max(syl_stress), | |
| # Syllable rhythm entropy | |
| -np.sum([p * np.log(p + 1e-10) for p in np.histogram(syl_durations, bins=5)[0] / len(syl_durations) if p > 0]), | |
| # Energy attack rate | |
| np.mean(np.diff(syl_energies)) if len(syl_energies) > 1 else 0.0, | |
| # Pitch trajectory slope | |
| np.polyfit(range(len(syl_pitches)), syl_pitches, 1)[0] if len(syl_pitches) > 1 else 0.0, | |
| # Stress distribution | |
| sum(1 for s in syl_stress if s > 0.7) / max(len(syl_stress), 1), | |
| # Early vs late syllable characteristics | |
| np.mean(syl_energies[:5]) - np.mean(syl_energies[5:10]) if len(syl_energies) >= 10 else 0.0, | |
| np.mean(syl_pitches[:5]) - np.mean(syl_pitches[5:10]) if len(syl_pitches) >= 10 else 0.0, | |
| # Syllable clustering (consecutive similar durations) | |
| sum(1 for i in range(len(syl_durations)-1) if abs(syl_durations[i] - syl_durations[i+1]) < 20) / max(len(syl_durations)-1, 1), | |
| # Peak syllable metrics | |
| syl_energies[np.argmax(syl_energies)] if syl_energies else 0.0, | |
| syl_pitches[np.argmax(syl_pitches)] if syl_pitches else 0.0, | |
| syl_stress[np.argmax(syl_stress)] if syl_stress else 0.0 | |
| ]) | |
| else: | |
| features.extend([0.0] * 20) | |
| # === WORD-LEVEL EMOTION SEQUENCE (15 dimensions) === | |
| if audio_features.word_features: | |
| emotions = [w.emotion_intensity for w in audio_features.word_features[:15]] | |
| emphasis = [w.emphasis_score for w in audio_features.word_features[:15]] | |
| clarity = [w.clarity_score for w in audio_features.word_features[:15]] | |
| features.extend([ | |
| np.mean(emotions), np.std(emotions), np.max(emotions), | |
| np.mean(emphasis), np.max(emphasis), | |
| np.mean(clarity), | |
| # Emotional trajectory | |
| np.polyfit(range(len(emotions)), emotions, 1)[0] if len(emotions) > 1 else 0.0, | |
| # Emotional peaks per second | |
| sum(1 for e in emotions if e > 0.8) / (audio_features.total_duration_ms / 1000.0), | |
| # Emotion variance in first 2 seconds | |
| np.var([w.emotion_intensity for w in audio_features.word_features if w.start_time_ms < 2000]) if any(w.start_time_ms < 2000 for w in audio_features.word_features) else 0.0, | |
| # Emphasis clustering | |
| sum(1 for i in range(len(emphasis)-1) if emphasis[i] > 0.7 and emphasis[i+1] > 0.7) / max(len(emphasis)-1, 1), | |
| # Emotional acceleration (second derivative) | |
| np.mean(np.diff(np.diff(emotions))) if len(emotions) > 2 else 0.0, | |
| # Clarity consistency | |
| np.std(clarity), | |
| # Hook detection (high emotion + emphasis in first 3 words) | |
| np.mean([w.emotion_intensity * w.emphasis_score for w in audio_features.word_features[:3]]), | |
| # Emotional range | |
| max(emotions) - min(emotions) if emotions else 0.0, | |
| # Emphasis distribution | |
| sum(1 for e in emphasis if e > 0.6) / max(len(emphasis), 1) | |
| ]) | |
| else: | |
| features.extend([0.0] * 15) | |
| # === SPECTRAL + ENERGY (15 dimensions) === | |
| pc = audio_features.pitch_contour | |
| ee = audio_features.energy_envelope | |
| sf = audio_features.spectral_features | |
| features.extend([ | |
| pc.pitch_variance, pc.pitch_range_semitones, pc.pitch_slope, | |
| len(pc.pitch_inflection_points) / max(len(pc.timestamps_ms), 1), | |
| ee.peak_energy_db, ee.mean_energy_db, ee.dynamic_range_db, | |
| np.mean(ee.attack_times_ms) if ee.attack_times_ms else 0.0, | |
| np.mean(sf.low_band_energy[:10]) if sf.low_band_energy else 0.0, | |
| np.mean(sf.mid_energy[:10]) if sf.mid_energy else 0.0, | |
| np.mean(sf.high_energy[:10]) if sf.high_energy else 0.0, | |
| np.mean(sf.spectral_centroid[:10]) if sf.spectral_centroid else 0.0, | |
| # Energy envelope shape | |
| np.std(ee.energy_db_values[:20]) if len(ee.energy_db_values) >= 20 else 0.0, | |
| # Spectral flux | |
| np.mean(np.diff(sf.spectral_centroid[:10])) if len(sf.spectral_centroid) >= 10 else 0.0, | |
| # High frequency emphasis (presence) | |
| np.mean(sf.high_energy[:10]) / (np.mean(sf.low_band_energy[:10]) + 1e-6) if sf.high_energy and sf.low_band_energy else 1.0 | |
| ]) | |
| # === PAUSE + TIMING (10 dimensions) === | |
| pm = audio_features.pause_metrics | |
| ba = audio_features.beat_alignment | |
| features.extend([ | |
| pm.total_pause_count, | |
| pm.mean_pause_duration_ms, | |
| pm.strategic_pause_count, | |
| ba.mean_error_ms, | |
| ba.max_error_ms, | |
| ba.on_beat_percentage, | |
| ba.sync_quality_score, | |
| audio_features.total_duration_ms, | |
| audio_features.words_per_minute, | |
| audio_features.syllables_per_second | |
| ]) | |
| # === MICRO-TIMING FEATURES (6 dimensions) === | |
| # Critical for distinguishing viral patterns | |
| if audio_features.syllable_features and len(audio_features.syllable_features) > 5: | |
| # Sub-100ms timing precision | |
| first_5_timings = [s.start_time_ms for s in audio_features.syllable_features[:5]] | |
| timing_deltas = np.diff(first_5_timings) | |
| features.extend([ | |
| np.mean(timing_deltas), | |
| np.std(timing_deltas), | |
| np.min(timing_deltas), | |
| # Timing consistency (low variance = robotic, high = natural) | |
| np.var(timing_deltas) / (np.mean(timing_deltas) + 1e-6), | |
| # Rush vs drag (early vs late timing) | |
| sum(1 for s in audio_features.syllable_features[:10] if s.beat_alignment_error_ms < -5) / 10.0, | |
| # Syncopation index | |
| sum(1 for s in audio_features.syllable_features[:10] if 50 < abs(s.beat_alignment_error_ms) < 150) / 10.0 | |
| ]) | |
| else: | |
| features.extend([0.0] * 6) | |
| # Convert to array and handle edge cases | |
| feature_array = np.array(features, dtype=np.float32) | |
| feature_array = np.nan_to_num(feature_array, nan=0.0, posinf=1e6, neginf=-1e6) | |
| # Apply PCA if trained | |
| if self.pca is not None: | |
| with self.lock: | |
| feature_array = self.scaler.transform(feature_array.reshape(1, -1)) | |
| embedding = self.pca.transform(feature_array)[0] | |
| else: | |
| embedding = feature_array | |
| # Ensure exactly 128 dimensions | |
| if len(embedding) < self.embedding_dim: | |
| embedding = np.pad(embedding, (0, self.embedding_dim - len(embedding))) | |
| elif len(embedding) > self.embedding_dim: | |
| embedding = embedding[:self.embedding_dim] | |
| return embedding | |
| class LSTMSequenceModel(nn.Module) if TORCH_AVAILABLE else object: | |
| """ | |
| Production LSTM for temporal sequence modeling. | |
| Captures time-dependent virality patterns. | |
| """ | |
| def __init__(self, input_dim: int = 10, hidden_dim: int = 64, num_layers: int = 2): | |
| if not TORCH_AVAILABLE: | |
| return | |
| super().__init__() | |
| self.hidden_dim = hidden_dim | |
| self.num_layers = num_layers | |
| self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.3) | |
| self.fc1 = nn.Linear(hidden_dim, 32) | |
| self.relu = nn.ReLU() | |
| self.dropout = nn.Dropout(0.3) | |
| self.fc2 = nn.Linear(32, 1) | |
| def forward(self, x): | |
| # x shape: (batch, seq_len, input_dim) | |
| lstm_out, _ = self.lstm(x) | |
| # Take last output | |
| last_out = lstm_out[:, -1, :] | |
| out = self.fc1(last_out) | |
| out = self.relu(out) | |
| out = self.dropout(out) | |
| out = self.fc2(out) | |
| return out | |
| class ProductionSequenceEngine: | |
| """ | |
| Production-grade sequence modeling with LSTM/Transformer support. | |
| Captures temporal hook impact and retention evolution. | |
| """ | |
| def __init__(self): | |
| self.sequence_models = {} | |
| self.lock = threading.Lock() | |
| self.sequence_length = 20 | |
| self.use_lstm = TORCH_AVAILABLE | |
| if self.use_lstm: | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| logger.info(f"Using LSTM sequence models on {self.device}") | |
| else: | |
| logger.info("Using GBM fallback for sequence modeling") | |
| def extract_feature_sequence(self, audio_features: SegmentedAudioFeatures) -> np.ndarray: | |
| """Extract time-series feature sequence.""" | |
| sequences = [] | |
| duration_ms = audio_features.total_duration_ms | |
| interval_ms = duration_ms / self.sequence_length | |
| for i in range(self.sequence_length): | |
| window_start = i * interval_ms | |
| window_end = (i + 1) * interval_ms | |
| syls_in_window = [ | |
| s for s in audio_features.syllable_features | |
| if window_start <= s.start_time_ms < window_end | |
| ] | |
| words_in_window = [ | |
| w for w in audio_features.word_features | |
| if window_start <= w.start_time_ms < window_end | |
| ] | |
| # Extract comprehensive features per time step | |
| if syls_in_window: | |
| avg_energy = np.mean([s.energy_db for s in syls_in_window]) | |
| avg_pitch = np.mean([s.pitch_hz for s in syls_in_window]) | |
| avg_beat_error = np.mean([abs(s.beat_alignment_error_ms) for s in syls_in_window]) | |
| avg_stress = np.mean([s.stress_level for s in syls_in_window]) | |
| else: | |
| avg_energy = -40.0 | |
| avg_pitch = 200.0 | |
| avg_beat_error = 50.0 | |
| avg_stress = 0.5 | |
| if words_in_window: | |
| avg_emotion = np.mean([w.emotion_intensity for w in words_in_window]) | |
| avg_emphasis = np.mean([w.emphasis_score for w in words_in_window]) | |
| avg_clarity = np.mean([w.clarity_score for w in words_in_window]) | |
| else: | |
| avg_emotion = 0.5 | |
| avg_emphasis = 0.5 | |
| avg_clarity = 0.8 | |
| if i < len(audio_features.spectral_features.low_band_energy): | |
| low_energy = audio_features.spectral_features.low_band_energy[i] | |
| mid_energy = audio_features.spectral_features.mid_energy[i] | |
| high_energy = audio_features.spectral_features.high_energy[i] | |
| else: | |
| low_energy = 0.0 | |
| mid_energy = 0.0 | |
| high_energy = 0.0 | |
| step_features = [ | |
| avg_energy, | |
| avg_pitch, | |
| avg_beat_error, | |
| avg_emotion, | |
| avg_emphasis, | |
| avg_clarity, | |
| avg_stress, | |
| low_energy, | |
| mid_energy, | |
| high_energy, | |
| len(syls_in_window), | |
| len(words_in_window), | |
| window_start / duration_ms, # Normalized position | |
| ] | |
| sequences.append(step_features) | |
| return np.array(sequences, dtype=np.float32) | |
| def train_lstm_model( | |
| self, | |
| niche: str, | |
| platform: str, | |
| sequences: List[np.ndarray], | |
| targets: List[float], | |
| metric_name: str, | |
| epochs: int = 50 | |
| ): | |
| """Train LSTM model on sequences.""" | |
| if not self.use_lstm: | |
| return self._train_gbm_fallback(niche, platform, sequences, targets, metric_name) | |
| key = f"{niche}:{platform}:{metric_name}" | |
| with self.lock: | |
| if len(sequences) < 100: | |
| logger.warning(f"Insufficient data for LSTM: {len(sequences)}") | |
| return | |
| # Prepare data | |
| X = np.array(sequences, dtype=np.float32) | |
| y = np.array(targets, dtype=np.float32).reshape(-1, 1) | |
| # Train/val split (80/20) | |
| split_idx = int(0.8 * len(X)) | |
| X_train, X_val = X[:split_idx], X[split_idx:] | |
| y_train, y_val = y[:split_idx], y[split_idx:] | |
| # Convert to tensors | |
| X_train_t = torch.FloatTensor(X_train).to(self.device) | |
| y_train_t = torch.FloatTensor(y_train).to(self.device) | |
| X_val_t = torch.FloatTensor(X_val).to(self.device) | |
| y_val_t = torch.FloatTensor(y_val).to(self.device) | |
| # Initialize model | |
| input_dim = X.shape[2] | |
| model = LSTMSequenceModel(input_dim=input_dim, hidden_dim=64, num_layers=2) | |
| model = model.to(self.device) | |
| # Training setup | |
| criterion = nn.MSELoss() | |
| optimizer = torch.optim.Adam(model.parameters(), lr=0.001) | |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5) | |
| best_val_loss = float('inf') | |
| patience_counter = 0 | |
| # Training loop | |
| for epoch in range(epochs): | |
| model.train() | |
| optimizer.zero_grad() | |
| outputs = model(X_train_t) | |
| loss = criterion(outputs, y_train_t) | |
| loss.backward() | |
| optimizer.step() | |
| # Validation | |
| model.eval() | |
| with torch.no_grad(): | |
| val_outputs = model(X_val_t) | |
| val_loss = criterion(val_outputs, y_val_t) | |
| scheduler.step(val_loss) | |
| # Early stopping | |
| if val_loss < best_val_loss: | |
| best_val_loss = val_loss | |
| patience_counter = 0 | |
| # Save best model | |
| best_model_state = model.state_dict() | |
| else: | |
| patience_counter += 1 | |
| if patience_counter >= 10: | |
| break | |
| if epoch % 10 == 0: | |
| logger.debug(f"Epoch {epoch}: train_loss={loss.item():.4f}, val_loss={val_loss.item():.4f}") | |
| # Load best model | |
| model.load_state_dict(best_model_state) | |
| self.sequence_models[key] = { | |
| "model": model, | |
| "type": "lstm", | |
| "val_loss": float(best_val_loss), | |
| "n_samples": len(X), | |
| "input_dim": input_dim | |
| } | |
| logger.info(f"Trained LSTM for {key}: val_loss={best_val_loss:.4f}, samples={len(X)}") | |
| def _train_gbm_fallback( | |
| self, | |
| niche: str, | |
| platform: str, | |
| sequences: List[np.ndarray], | |
| targets: List[float], | |
| metric_name: str | |
| ): | |
| """GBM fallback when LSTM unavailable.""" | |
| key = f"{niche}:{platform}:{metric_name}" | |
| with self.lock: | |
| if len(sequences) < 50: | |
| return | |
| X = [] | |
| for seq in sequences: | |
| features = [] | |
| features.extend(np.mean(seq, axis=0)) | |
| features.extend(np.std(seq, axis=0)) | |
| features.extend(np.max(seq, axis=0)) | |
| features.extend(np.min(seq, axis=0)) | |
| if len(seq) > 1: | |
| features.extend(np.mean(np.diff(seq, axis=0), axis=0)) | |
| else: | |
| features.extend([0.0] * seq.shape[1]) | |
| early = np.mean(seq[:5], axis=0) | |
| late = np.mean(seq[-5:], axis=0) | |
| features.extend(late - early) | |
| X.append(features) | |
| X = np.array(X) | |
| y = np.array(targets) | |
| model = GradientBoostingRegressor( | |
| n_estimators=200, | |
| max_depth=6, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| random_state=42 | |
| ) | |
| model.fit(X, y) | |
| self.sequence_models[key] = {"model": model, "type": "gbm", "n_samples": len(X)} | |
| logger.info(f"Trained GBM fallback for {key} with {len(X)} samples") | |
| def predict_from_sequence( | |
| self, | |
| niche: str, | |
| platform: str, | |
| sequence: np.ndarray, | |
| metric_name: str | |
| ) -> Optional[float]: | |
| """Predict using trained sequence model.""" | |
| key = f"{niche}:{platform}:{metric_name}" | |
| with self.lock: | |
| if key not in self.sequence_models: | |
| return None | |
| model_data = self.sequence_models[key] | |
| if model_data["type"] == "lstm" and self.use_lstm: | |
| model = model_data["model"] | |
| model.eval() | |
| with torch.no_grad(): | |
| X = torch.FloatTensor(sequence).unsqueeze(0).to(self.device) | |
| prediction = model(X).cpu().numpy()[0][0] | |
| return float(prediction) | |
| else: # GBM | |
| model = model_data["model"] | |
| features = [] | |
| features.extend(np.mean(sequence, axis=0)) | |
| features.extend(np.std(sequence, axis=0)) | |
| features.extend(np.max(sequence, axis=0)) | |
| features.extend(np.min(sequence, axis=0)) | |
| if len(sequence) > 1: | |
| features.extend(np.mean(np.diff(sequence, axis=0), axis=0)) | |
| else: | |
| features.extend([0.0] * sequence.shape[1]) | |
| early = np.mean(sequence[:5], axis=0) | |
| late = np.mean(sequence[-5:], axis=0) | |
| features.extend(late - early) | |
| X = np.array(features).reshape(1, -1) | |
| prediction = model.predict(X)[0] | |
| return float(prediction) | |
| class SilencePatternOptimizer: | |
| """ | |
| Optimizes micro-silence intervals for maximum retention lift. | |
| Silence before hooks/drops is critical for dopamine spikes. | |
| """ | |
| def __init__(self): | |
| self.silence_patterns = defaultdict(list) # niche:platform -> [(duration_ms, position_ms, retention_lift)] | |
| self.optimal_patterns = {} | |
| self.lock = threading.Lock() | |
| def add_silence_observation( | |
| self, | |
| niche: str, | |
| platform: str, | |
| silence_duration_ms: float, | |
| position_ms: float, | |
| context: str, # "hook", "drop", "punchline" | |
| retention_lift: float # Measured impact on retention | |
| ): | |
| """Add observed silence pattern performance.""" | |
| key = f"{niche}:{platform}:{context}" | |
| with self.lock: | |
| self.silence_patterns[key].append({ | |
| "duration_ms": silence_duration_ms, | |
| "position_ms": position_ms, | |
| "retention_lift": retention_lift, | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| def calculate_optimal_silence( | |
| self, | |
| niche: str, | |
| platform: str, | |
| context: str, | |
| position_ms: float | |
| ) -> Optional[float]: | |
| """ | |
| Calculate optimal silence duration for specific context. | |
| Returns recommended silence duration in milliseconds. | |
| """ | |
| key = f"{niche}:{platform}:{context}" | |
| with self.lock: | |
| patterns = self.silence_patterns.get(key, []) | |
| if len(patterns) < 10: | |
| # Defaults based on context | |
| defaults = { | |
| "hook": 150.0, # 150ms before hook | |
| "drop": 200.0, # 200ms before drop | |
| "punchline": 120.0 # 120ms before punchline | |
| } | |
| return defaults.get(context, 150.0) | |
| # Find patterns with highest retention lift | |
| sorted_patterns = sorted(patterns, key=lambda x: x["retention_lift"], reverse=True) | |
| top_10_percent = sorted_patterns[:max(1, len(sorted_patterns) // 10)] | |
| # Average optimal duration | |
| optimal_duration = np.mean([p["duration_ms"] for p in top_10_percent]) | |
| self.optimal_patterns[key] = optimal_duration | |
| return float(optimal_duration) | |
| def score_silence_pattern( | |
| self, | |
| audio_features: SegmentedAudioFeatures, | |
| niche: str, | |
| platform: str | |
| ) -> float: | |
| """ | |
| Score existing silence pattern in audio. | |
| Returns score 0.0-1.0 indicating quality. | |
| """ | |
| pm = audio_features.pause_metrics | |
| if not pm.pause_positions_ms: | |
| return 0.5 # No pauses, neutral | |
| score = 0.0 | |
| total_weight = 0.0 | |
| # Check silences before likely hooks (first 3 seconds) | |
| early_pauses = [p for p in pm.pause_positions_ms if 500 < p < 3000] | |
| for pause_pos in early_pauses: | |
| # Find corresponding pause duration | |
| pause_idx = pm.pause_positions_ms.index(pause_pos) | |
| if pause_idx < len(pm.pause_durations_ms): | |
| pause_dur = pm.pause_durations_ms[pause_idx] | |
| # Get optimal for this position | |
| optimal_hook = self.calculate_optimal_silence(niche, platform, "hook", pause_pos) | |
| if optimal_hook: | |
| # Score based on deviation from optimal | |
| deviation = abs(pause_dur - optimal_hook) | |
| weight = 1.0 / (1.0 + pause_pos / 1000.0) # Earlier pauses matter more | |
| if deviation < 20: # Within 20ms | |
| score += 1.0 * weight | |
| elif deviation < 50: # Within 50ms | |
| score += 0.7 * weight | |
| else: | |
| score += 0.3 * weight | |
| total_weight += weight | |
| if total_weight > 0: | |
| return score / total_weight | |
| return 0.5 | |
| class HumanPhysiologyAligner: | |
| """ | |
| Aligns audio to human attention curves and dopamine response. | |
| Maximizes engagement based on neurophysiology. | |
| """ | |
| def __init__(self): | |
| # Attention decay curve (exponential) | |
| self.attention_half_life_ms = 7000.0 # 7 seconds | |
| # Dopamine spike parameters | |
| self.dopamine_peak_delay_ms = 150.0 # Peak occurs 150ms after stimulus | |
| self.dopamine_recovery_ms = 800.0 # Recovery time between spikes | |
| self.lock = threading.Lock() | |
| def calculate_attention_curve(self, duration_ms: float, num_points: int = 20) -> np.ndarray: | |
| """ | |
| Calculate expected attention level over time. | |
| Returns array of attention levels (0.0 to 1.0) over duration. | |
| """ | |
| times = np.linspace(0, duration_ms, num_points) | |
| # Exponential decay with periodic refreshes from hooks | |
| base_attention = np.exp(-times / self.attention_half_life_ms) | |
| # Add periodic attention spikes (every 3-4 seconds) | |
| spike_interval = 3500.0 | |
| for i, t in enumerate(times): | |
| if t % spike_interval < 500: # 500ms spike window | |
| base_attention[i] = min(1.0, base_attention[i] + 0.3) | |
| return base_attention | |
| def predict_dopamine_response( | |
| self, | |
| audio_features: SegmentedAudioFeatures | |
| ) -> Dict[str, Any]: | |
| """ | |
| Predict dopamine response curve for audio pattern. | |
| Returns timing of expected dopamine peaks. | |
| """ | |
| dopamine_events = [] | |
| # Detect potential dopamine triggers | |
| for i, word in enumerate(audio_features.word_features): | |
| # High emotion + emphasis = dopamine trigger | |
| if word.emotion_intensity > 0.75 and word.emphasis_score > 0.7: | |
| peak_time = word.start_time_ms + self.dopamine_peak_delay_ms | |
| # Check if too close to previous peak (dopamine fatigue) | |
| if dopamine_events: | |
| time_since_last = peak_time - dopamine_events[-1]["peak_time_ms"] | |
| if time_since_last < self.dopamine_recovery_ms: | |
| continue # Skip, too soon | |
| dopamine_events.append({ | |
| "trigger_time_ms": word.start_time_ms, | |
| "peak_time_ms": peak_time, | |
| "intensity": word.emotion_intensity * word.emphasis_score, | |
| "word": word.word_text | |
| }) | |
| # Calculate dopamine curve | |
| duration_ms = audio_features.total_duration_ms | |
| num_points = 20 | |
| times = np.linspace(0, duration_ms, num_points) | |
| dopamine_curve = np.zeros(num_points) | |
| for event in dopamine_events: | |
| peak_time = event["peak_time_ms"] | |
| intensity = event["intensity"] | |
| # Gaussian spike centered at peak | |
| for i, t in enumerate(times): | |
| time_diff = t - peak_time | |
| dopamine_curve[i] += intensity * np.exp(-(time_diff ** 2) / (2 * 300 ** 2)) | |
| return { | |
| "events": dopamine_events, | |
| "curve": dopamine_curve.tolist(), | |
| "times_ms": times.tolist(), | |
| "total_dopamine_load": float(np.sum(dopamine_curve)), | |
| "peak_count": len(dopamine_events) | |
| } | |
| def score_physiological_alignment( | |
| self, | |
| audio_features: SegmentedAudioFeatures, | |
| predicted_retention: np.ndarray | |
| ) -> float: | |
| """ | |
| Score how well audio aligns with human physiology. | |
| Args: | |
| predicted_retention: Expected retention curve over time | |
| Returns: | |
| Score 0.0-1.0 | |
| """ | |
| # Get dopamine response | |
| dopamine = self.predict_dopamine_response(audio_features) | |
| # Get attention curve | |
| attention = self.calculate_attention_curve(audio_features.total_duration_ms) | |
| # Align lengths | |
| min_len = min(len(dopamine["curve"]), len(attention), len(predicted_retention)) | |
| dopamine_curve = np.array(dopamine["curve"][:min_len]) | |
| attention_curve = attention[:min_len] | |
| retention_curve = predicted_retention[:min_len] | |
| # Calculate alignments | |
| dopamine_retention_corr = np.corrcoef(dopamine_curve, retention_curve)[0, 1] | |
| attention_retention_corr = np.corrcoef(attention_curve, retention_curve)[0, 1] | |
| # Penalties | |
| penalty = 0.0 | |
| # Penalty for no early dopamine spike | |
| if len(dopamine["events"]) == 0 or dopamine["events"][0]["trigger_time_ms"] > 2000: | |
| penalty += 0.2 | |
| # Penalty for too few peaks | |
| if dopamine["peak_count"] < 2: | |
| penalty += 0.15 | |
| # Penalty for too many peaks (fatigue) | |
| if dopamine["peak_count"] > 6: | |
| penalty += 0.1 | |
| # Calculate final score | |
| score = ( | |
| 0.5 * max(0, dopamine_retention_corr) + | |
| 0.3 * max(0, attention_retention_corr) + | |
| 0.2 * (dopamine["total_dopamine_load"] / 10.0) # Normalized | |
| ) | |
| score = max(0.0, min(1.0, score - penalty)) | |
| return float(score) | |
| """ | |
| Sequence-aware models for temporal pattern learning. | |
| Captures time-dependent virality signals like hook impact evolution. | |
| """ | |
| def __init__(self): | |
| self.sequence_models = {} | |
| self.lock = threading.Lock() | |
| self.sequence_length = 20 # Track 20 time steps | |
| def extract_feature_sequence(self, audio_features: SegmentedAudioFeatures) -> np.ndarray: | |
| """ | |
| Extract time-series feature sequence for RNN/LSTM. | |
| Returns: [sequence_length, feature_dim] array | |
| """ | |
| sequences = [] | |
| # Time-align features to 20 equal intervals | |
| duration_ms = audio_features.total_duration_ms | |
| interval_ms = duration_ms / self.sequence_length | |
| for i in range(self.sequence_length): | |
| window_start = i * interval_ms | |
| window_end = (i + 1) * interval_ms | |
| # Syllables in this window | |
| syls_in_window = [ | |
| s for s in audio_features.syllable_features | |
| if window_start <= s.start_time_ms < window_end | |
| ] | |
| # Words in this window | |
| words_in_window = [ | |
| w for w in audio_features.word_features | |
| if window_start <= w.start_time_ms < window_end | |
| ] | |
| # Extract features for this time step | |
| if syls_in_window: | |
| avg_energy = np.mean([s.energy_db for s in syls_in_window]) | |
| avg_pitch = np.mean([s.pitch_hz for s in syls_in_window]) | |
| avg_beat_error = np.mean([abs(s.beat_alignment_error_ms) for s in syls_in_window]) | |
| else: | |
| avg_energy = -40.0 | |
| avg_pitch = 200.0 | |
| avg_beat_error = 50.0 | |
| if words_in_window: | |
| avg_emotion = np.mean([w.emotion_intensity for w in words_in_window]) | |
| avg_emphasis = np.mean([w.emphasis_score for w in words_in_window]) | |
| else: | |
| avg_emotion = 0.5 | |
| avg_emphasis = 0.5 | |
| # Energy in spectral bands | |
| if i < len(audio_features.spectral_features.low_band_energy): | |
| low_energy = audio_features.spectral_features.low_band_energy[i] | |
| mid_energy = audio_features.spectral_features.mid_energy[i] | |
| high_energy = audio_features.spectral_features.high_energy[i] | |
| else: | |
| low_energy = 0.0 | |
| mid_energy = 0.0 | |
| high_energy = 0.0 | |
| step_features = [ | |
| avg_energy, | |
| avg_pitch, | |
| avg_beat_error, | |
| avg_emotion, | |
| avg_emphasis, | |
| low_energy, | |
| mid_energy, | |
| high_energy, | |
| len(syls_in_window), # Syllable density | |
| len(words_in_window) # Word density | |
| ] | |
| sequences.append(step_features) | |
| return np.array(sequences, dtype=np.float32) | |
| def train_sequence_model( | |
| self, | |
| niche: str, | |
| platform: str, | |
| sequences: List[np.ndarray], | |
| targets: List[float], | |
| metric_name: str | |
| ): | |
| """ | |
| Train sequence model (simplified - in production use LSTM/Transformer). | |
| For now, use temporal aggregations + GBM. | |
| """ | |
| key = f"{niche}:{platform}:{metric_name}" | |
| with self.lock: | |
| if len(sequences) < 50: | |
| return | |
| # Aggregate sequence features | |
| X = [] | |
| for seq in sequences: | |
| # Statistical aggregations over time | |
| features = [] | |
| features.extend(np.mean(seq, axis=0)) # 10 features | |
| features.extend(np.std(seq, axis=0)) # 10 features | |
| features.extend(np.max(seq, axis=0)) # 10 features | |
| features.extend(np.min(seq, axis=0)) # 10 features | |
| # Temporal dynamics | |
| if len(seq) > 1: | |
| features.extend(np.mean(np.diff(seq, axis=0), axis=0)) # 10 features (rate of change) | |
| else: | |
| features.extend([0.0] * 10) | |
| # Early vs late comparison (first 5 vs last 5 time steps) | |
| early = np.mean(seq[:5], axis=0) | |
| late = np.mean(seq[-5:], axis=0) | |
| features.extend(late - early) # 10 features | |
| X.append(features) | |
| X = np.array(X) | |
| y = np.array(targets) | |
| # Train GBM | |
| model = GradientBoostingRegressor( | |
| n_estimators=100, | |
| max_depth=5, | |
| learning_rate=0.1, | |
| random_state=42 | |
| ) | |
| model.fit(X, y) | |
| self.sequence_models[key] = {"model": model, "n_samples": len(X)} | |
| logger.info(f"Trained sequence model for {key} with {len(X)} samples") | |
| def predict_from_sequence( | |
| self, | |
| niche: str, | |
| platform: str, | |
| sequence: np.ndarray, | |
| metric_name: str | |
| ) -> Optional[float]: | |
| """Predict using sequence model.""" | |
| key = f"{niche}:{platform}:{metric_name}" | |
| with self.lock: | |
| if key not in self.sequence_models: | |
| return None | |
| model_data = self.sequence_models[key] | |
| model = model_data["model"] | |
| # Aggregate features same way as training | |
| features = [] | |
| features.extend(np.mean(sequence, axis=0)) | |
| features.extend(np.std(sequence, axis=0)) | |
| features.extend(np.max(sequence, axis=0)) | |
| features.extend(np.min(sequence, axis=0)) | |
| if len(sequence) > 1: | |
| features.extend(np.mean(np.diff(sequence, axis=0), axis=0)) | |
| else: | |
| features.extend([0.0] * 10) | |
| early = np.mean(sequence[:5], axis=0) | |
| late = np.mean(sequence[-5:], axis=0) | |
| features.extend(late - early) | |
| X = np.array(features).reshape(1, -1) | |
| prediction = model.predict(X)[0] | |
| return float(prediction) | |
| def compute_audio_embedding(self, audio_features: SegmentedAudioFeatures) -> np.ndarray: | |
| """ | |
| Compute 128-dimensional embedding from audio features. | |
| Combines MFCC-like, phase, timing, and spectral features. | |
| """ | |
| features = [] | |
| # 1. Syllable timing features (statistical summaries) | |
| if audio_features.syllable_features: | |
| syl_durations = [s.duration_ms for s in audio_features.syllable_features[:20]] | |
| syl_energies = [s.energy_db for s in audio_features.syllable_features[:20]] | |
| syl_pitches = [s.pitch_hz for s in audio_features.syllable_features[:20]] | |
| syl_beat_errors = [abs(s.beat_alignment_error_ms) for s in audio_features.syllable_features[:20]] | |
| features.extend([ | |
| np.mean(syl_durations), np.std(syl_durations), np.max(syl_durations), | |
| np.mean(syl_energies), np.std(syl_energies), np.max(syl_energies), | |
| np.mean(syl_pitches), np.std(syl_pitches), | |
| np.mean(syl_beat_errors), np.max(syl_beat_errors) | |
| ]) | |
| else: | |
| features.extend([0.0] * 10) | |
| # 2. Word-level emotion features | |
| if audio_features.word_features: | |
| emotions = [w.emotion_intensity for w in audio_features.word_features[:15]] | |
| emphasis = [w.emphasis_score for w in audio_features.word_features[:15]] | |
| features.extend([ | |
| np.mean(emotions), np.std(emotions), np.max(emotions), | |
| np.mean(emphasis), np.std(emphasis) | |
| ]) | |
| else: | |
| features.extend([0.0] * 5) | |
| # 3. Pitch contour features | |
| pc = audio_features.pitch_contour | |
| features.extend([ | |
| pc.pitch_variance, | |
| pc.pitch_range_semitones, | |
| pc.pitch_slope, | |
| len(pc.pitch_inflection_points) / max(len(pc.timestamps_ms), 1) | |
| ]) | |
| # 4. Energy envelope features | |
| ee = audio_features.energy_envelope | |
| features.extend([ | |
| ee.peak_energy_db, | |
| ee.mean_energy_db, | |
| ee.energy_variance, | |
| ee.dynamic_range_db, | |
| np.mean(ee.attack_times_ms) if ee.attack_times_ms else 0.0, | |
| np.mean(ee.decay_times_ms) if ee.decay_times_ms else 0.0 | |
| ]) | |
| # 5. Pause metrics | |
| pm = audio_features.pause_metrics | |
| features.extend([ | |
| pm.total_pause_count, | |
| pm.mean_pause_duration_ms, | |
| pm.pause_variance_ms, | |
| pm.strategic_pause_count | |
| ]) | |
| # 6. Beat alignment | |
| ba = audio_features.beat_alignment | |
| features.extend([ | |
| ba.mean_error_ms, | |
| ba.max_error_ms, | |
| ba.on_beat_percentage, | |
| ba.sync_quality_score | |
| ]) | |
| # 7. Spectral features (sample from time-series) | |
| sf = audio_features.spectral_features | |
| if sf.low_band_energy: | |
| features.extend([ | |
| np.mean(sf.low_band_energy[:10]), | |
| np.mean(sf.mid_energy[:10]), | |
| np.mean(sf.high_energy[:10]), | |
| np.mean(sf.spectral_centroid[:10]) if sf.spectral_centroid else 0.0 | |
| ]) | |
| else: | |
| features.extend([0.0] * 4) | |
| # 8. Overall metrics | |
| features.extend([ | |
| audio_features.total_duration_ms, | |
| audio_features.words_per_minute, | |
| audio_features.syllables_per_second | |
| ]) | |
| # Pad or truncate to consistent dimension before PCA | |
| feature_array = np.array(features, dtype=np.float32) | |
| # Handle NaN/inf | |
| feature_array = np.nan_to_num(feature_array, nan=0.0, posinf=1e6, neginf=-1e6) | |
| # If we have enough data, apply PCA | |
| if self.pca is not None: | |
| with self.lock: | |
| feature_array = self.scaler.transform(feature_array.reshape(1, -1)) | |
| embedding = self.pca.transform(feature_array)[0] | |
| else: | |
| # Not yet trained, return raw features (will be reduced later) | |
| embedding = feature_array | |
| # Ensure output is exactly 128 dimensions | |
| if len(embedding) < self.embedding_dim: | |
| embedding = np.pad(embedding, (0, self.embedding_dim - len(embedding))) | |
| elif len(embedding) > self.embedding_dim: | |
| embedding = embedding[:self.embedding_dim] | |
| return embedding | |
| def compute_performance_embedding(self, platform_metrics: PlatformMetrics) -> np.ndarray: | |
| """Compute performance-focused embedding.""" | |
| features = [ | |
| platform_metrics.retention_1s, | |
| platform_metrics.retention_2s, | |
| platform_metrics.retention_3s, | |
| platform_metrics.completion_rate, | |
| platform_metrics.rewatch_rate, | |
| platform_metrics.shares_per_impression, | |
| 1.0 - platform_metrics.skip_rate, # scroll_stop_probability | |
| 1.0 - platform_metrics.mute_rate, # audio_kept_on | |
| platform_metrics.platform_engagement_score, | |
| platform_metrics.virality_coefficient, | |
| np.log1p(platform_metrics.rewatch_count), | |
| np.log1p(platform_metrics.shares) | |
| ] | |
| return np.array(features, dtype=np.float32) | |
| def train_pca(self, feature_matrix: np.ndarray): | |
| """Train PCA for dimensionality reduction.""" | |
| with self.lock: | |
| self.scaler.fit(feature_matrix) | |
| normalized = self.scaler.transform(feature_matrix) | |
| n_components = min(self.embedding_dim, feature_matrix.shape[1], feature_matrix.shape[0]) | |
| self.pca = PCA(n_components=n_components) | |
| self.pca.fit(normalized) | |
| logger.info(f"PCA trained: {n_components} components, " | |
| f"explained variance: {self.pca.explained_variance_ratio_.sum():.2%}") | |
| def update_incremental(self, features: np.ndarray): | |
| """Add features to cache for incremental PCA updates.""" | |
| with self.lock: | |
| self.feature_cache.append(features) | |
| # Retrain PCA every 1000 samples | |
| if len(self.feature_cache) >= 1000: | |
| matrix = np.vstack(list(self.feature_cache)) | |
| self.train_pca(matrix) | |
| self.feature_cache.clear() | |
| class AdvancedViralityPredictor: | |
| """ | |
| Production-grade virality prediction with ensemble models, | |
| uncertainty quantification, and >95% confidence guarantees. | |
| """ | |
| def __init__(self): | |
| self.models = {} # ensemble_name -> {models, metadata} | |
| self.lock = threading.Lock() | |
| self.training_data = defaultdict(lambda: {"X": [], "y": {}, "sequences": []}) | |
| self.model_performance = defaultdict(dict) | |
| def add_training_sample( | |
| self, | |
| niche: str, | |
| platform: str, | |
| audio_embedding: np.ndarray, | |
| sequence_features: np.ndarray, | |
| actual_metrics: Dict[str, float] | |
| ): | |
| """Add sample with both embedding and sequence features.""" | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| self.training_data[key]["X"].append(audio_embedding) | |
| self.training_data[key]["sequences"].append(sequence_features) | |
| for metric, value in actual_metrics.items(): | |
| if metric not in self.training_data[key]["y"]: | |
| self.training_data[key]["y"][metric] = [] | |
| self.training_data[key]["y"][metric].append(value) | |
| def train_ensemble_models( | |
| self, | |
| niche: str, | |
| platform: str, | |
| target_metrics: List[str] = ["views_24h", "retention_2s", "engagement_score"] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Train ensemble of GBM, RF, and linear models with cross-validation. | |
| Returns performance metrics and confidence intervals. | |
| """ | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| data = self.training_data[key] | |
| if len(data["X"]) < 100: | |
| logger.warning(f"Insufficient data for {key}: {len(data['X'])} samples") | |
| return {"status": "insufficient_data", "n_samples": len(data["X"])} | |
| X = np.vstack(data["X"]) | |
| results = {} | |
| ensemble = {} | |
| for metric in target_metrics: | |
| if metric not in data["y"] or len(data["y"][metric]) != len(X): | |
| continue | |
| y = np.array(data["y"][metric]) | |
| # Train/test split with time-based validation | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=0.2, random_state=42, shuffle=False # Time-based split | |
| ) | |
| # Model 1: Gradient Boosting | |
| gbm = GradientBoostingRegressor( | |
| n_estimators=200, | |
| max_depth=6, | |
| learning_rate=0.05, | |
| subsample=0.8, | |
| random_state=42 | |
| ) | |
| gbm.fit(X_train, y_train) | |
| gbm_pred = gbm.predict(X_test) | |
| gbm_score = r2_score(y_test, gbm_pred) | |
| gbm_mse = mean_squared_error(y_test, gbm_pred) | |
| # Model 2: Random Forest | |
| rf = RandomForestRegressor( | |
| n_estimators=100, | |
| max_depth=10, | |
| random_state=42, | |
| n_jobs=-1 | |
| ) | |
| rf.fit(X_train, y_train) | |
| rf_pred = rf.predict(X_test) | |
| rf_score = r2_score(y_test, rf_pred) | |
| # Model 3: Linear (for interpretability) | |
| from sklearn.linear_model import Ridge | |
| linear = Ridge(alpha=1.0) | |
| linear.fit(X_train, y_train) | |
| linear_pred = linear.predict(X_test) | |
| linear_score = r2_score(y_test, linear_pred) | |
| # Ensemble prediction (weighted average) | |
| weights = np.array([gbm_score, rf_score, linear_score]) | |
| weights = np.maximum(weights, 0) # No negative weights | |
| weights = weights / (weights.sum() + 1e-10) | |
| ensemble_pred = ( | |
| weights[0] * gbm_pred + | |
| weights[1] * rf_pred + | |
| weights[2] * linear_pred | |
| ) | |
| ensemble_score = r2_score(y_test, ensemble_pred) | |
| ensemble_mse = mean_squared_error(y_test, ensemble_pred) | |
| # Store ensemble | |
| ensemble[metric] = { | |
| "gbm": gbm, | |
| "rf": rf, | |
| "linear": linear, | |
| "weights": weights, | |
| "score": ensemble_score, | |
| "mse": ensemble_mse, | |
| "training_samples": len(X_train) | |
| } | |
| # Calculate prediction intervals (uncertainty quantification) | |
| residuals = y_test - ensemble_pred | |
| std_residual = np.std(residuals) | |
| results[metric] = { | |
| "r2_score": ensemble_score, | |
| "rmse": np.sqrt(ensemble_mse), | |
| "std_residual": std_residual, | |
| "confidence_95": 1.96 * std_residual, # 95% confidence interval | |
| "model_weights": weights.tolist(), | |
| "individual_scores": { | |
| "gbm": gbm_score, | |
| "rf": rf_score, | |
| "linear": linear_score | |
| } | |
| } | |
| logger.info( | |
| f"Trained ensemble for {key}:{metric} - " | |
| f"R²={ensemble_score:.3f}, RMSE={np.sqrt(ensemble_mse):.3f}" | |
| ) | |
| self.models[key] = ensemble | |
| self.model_performance[key] = results | |
| return results | |
| def predict_with_uncertainty( | |
| self, | |
| niche: str, | |
| platform: str, | |
| audio_embedding: np.ndarray, | |
| confidence_level: float = 0.95 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Predict with uncertainty quantification. | |
| Returns: | |
| { | |
| metric_name: { | |
| "mean": prediction, | |
| "lower": lower_bound, | |
| "upper": upper_bound, | |
| "confidence": confidence_level, | |
| "probability_5m_plus": P(views > 5M) | |
| } | |
| } | |
| """ | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| if key not in self.models: | |
| return {"status": "no_model_trained"} | |
| ensemble = self.models[key] | |
| performance = self.model_performance.get(key, {}) | |
| predictions = {} | |
| X = audio_embedding.reshape(1, -1) | |
| for metric, model_data in ensemble.items(): | |
| # Get predictions from each model | |
| gbm_pred = model_data["gbm"].predict(X)[0] | |
| rf_pred = model_data["rf"].predict(X)[0] | |
| linear_pred = model_data["linear"].predict(X)[0] | |
| # Weighted ensemble | |
| weights = model_data["weights"] | |
| mean_pred = ( | |
| weights[0] * gbm_pred + | |
| weights[1] * rf_pred + | |
| weights[2] * linear_pred | |
| ) | |
| # Uncertainty from residuals | |
| if metric in performance: | |
| z_score = 1.96 if confidence_level == 0.95 else 2.576 # 99% | |
| margin = z_score * performance[metric]["std_residual"] | |
| lower_bound = mean_pred - margin | |
| upper_bound = mean_pred + margin | |
| # Clip to valid ranges | |
| if "rate" in metric or "retention" in metric or "score" in metric: | |
| mean_pred = np.clip(mean_pred, 0.0, 1.0) | |
| lower_bound = np.clip(lower_bound, 0.0, 1.0) | |
| upper_bound = np.clip(upper_bound, 0.0, 1.0) | |
| elif "views" in metric: | |
| mean_pred = max(0.0, mean_pred) | |
| lower_bound = max(0.0, lower_bound) | |
| upper_bound = max(0.0, upper_bound) | |
| # Calculate probability of exceeding thresholds | |
| if "views" in metric: | |
| # P(X > 5M) using normal approximation | |
| std = performance[metric]["std_residual"] | |
| z_5m = (5_000_000 - mean_pred) / (std + 1e-10) | |
| prob_5m_plus = 1.0 - 0.5 * (1 + np.tanh(z_5m / np.sqrt(2))) | |
| z_30m = (30_000_000 - mean_pred) / (std + 1e-10) | |
| prob_30m_plus = 1.0 - 0.5 * (1 + np.tanh(z_30m / np.sqrt(2))) | |
| else: | |
| prob_5m_plus = None | |
| prob_30m_plus = None | |
| predictions[metric] = { | |
| "mean": float(mean_pred), | |
| "lower": float(lower_bound), | |
| "upper": float(upper_bound), | |
| "confidence": confidence_level, | |
| "std": float(performance[metric]["std_residual"]), | |
| "probability_5m_plus": float(prob_5m_plus) if prob_5m_plus is not None else None, | |
| "probability_30m_plus": float(prob_30m_plus) if prob_30m_plus is not None else None, | |
| "model_confidence": float(model_data["score"]) | |
| } | |
| else: | |
| predictions[metric] = { | |
| "mean": float(mean_pred), | |
| "confidence": 0.5 | |
| } | |
| return predictions | |
| class ModelLifecycleManager: | |
| """ | |
| Manages model training, evaluation, promotion, and retirement. | |
| Ensures only high-quality models are used in production. | |
| """ | |
| def __init__(self, store: 'AudioPerformanceStore'): | |
| self.store = store | |
| self.lock = threading.Lock() | |
| self.training_schedule = {} | |
| self.model_versions = defaultdict(list) | |
| self.active_models = {} | |
| self.last_train_time = {} | |
| def schedule_training( | |
| self, | |
| niche: str, | |
| platform: str, | |
| interval_hours: int = 24, | |
| min_new_samples: int = 100 | |
| ): | |
| """Schedule automatic model retraining.""" | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| self.training_schedule[key] = { | |
| "interval_hours": interval_hours, | |
| "min_new_samples": min_new_samples, | |
| "last_check": datetime.utcnow() | |
| } | |
| def check_and_train(self) -> List[str]: | |
| """Check all scheduled models and train if needed.""" | |
| trained = [] | |
| with self.lock: | |
| current_time = datetime.utcnow() | |
| for key, schedule in self.training_schedule.items(): | |
| last_check = schedule["last_check"] | |
| interval = timedelta(hours=schedule["interval_hours"]) | |
| if current_time - last_check >= interval: | |
| niche, platform = key.split(":") | |
| # Check if enough new samples | |
| # (In production, track sample count per key) | |
| logger.info(f"Triggering scheduled training for {key}") | |
| results = self.store.train_predictive_models(niche, platform) | |
| if results.get("status") != "insufficient_data": | |
| self._evaluate_and_promote(niche, platform, results) | |
| trained.append(key) | |
| schedule["last_check"] = current_time | |
| return trained | |
| def _evaluate_and_promote( | |
| self, | |
| niche: str, | |
| platform: str, | |
| training_results: Dict[str, Any] | |
| ): | |
| """Evaluate new model and promote if better than current.""" | |
| key = f"{niche}:{platform}" | |
| # Version the model | |
| version = len(self.model_versions[key]) + 1 | |
| model_metadata = { | |
| "version": version, | |
| "trained_at": datetime.utcnow().isoformat(), | |
| "performance": training_results, | |
| "status": "candidate" | |
| } | |
| # Check if model meets quality thresholds | |
| promote = True | |
| for metric, perf in training_results.items(): | |
| if isinstance(perf, dict) and "r2_score" in perf: | |
| if perf["r2_score"] < 0.3: # Minimum R² threshold | |
| promote = False | |
| logger.warning(f"Model {key}:v{version} R² too low for {metric}: {perf['r2_score']:.3f}") | |
| if promote: | |
| # Compare to active model | |
| if key in self.active_models: | |
| current_performance = self.active_models[key]["performance"] | |
| # Calculate aggregate score | |
| new_score = self._aggregate_performance(training_results) | |
| old_score = self._aggregate_performance(current_performance) | |
| if new_score > old_score * 1.05: # 5% improvement threshold | |
| model_metadata["status"] = "active" | |
| self.active_models[key] = model_metadata | |
| logger.info(f"PROMOTED: {key}:v{version} (score: {new_score:.3f} > {old_score:.3f})") | |
| else: | |
| model_metadata["status"] = "retired_underperformed" | |
| logger.info(f"NOT PROMOTED: {key}:v{version} insufficient improvement") | |
| else: | |
| # No existing model, promote automatically | |
| model_metadata["status"] = "active" | |
| self.active_models[key] = model_metadata | |
| logger.info(f"PROMOTED: {key}:v{version} (first model)") | |
| else: | |
| model_metadata["status"] = "retired_quality" | |
| self.model_versions[key].append(model_metadata) | |
| def _aggregate_performance(self, results: Dict[str, Any]) -> float: | |
| """Calculate aggregate performance score across metrics.""" | |
| scores = [] | |
| weights = { | |
| "views_24h": 0.4, | |
| "retention_2s": 0.3, | |
| "engagement_score": 0.3 | |
| } | |
| for metric, weight in weights.items(): | |
| if metric in results and isinstance(results[metric], dict): | |
| if "r2_score" in results[metric]: | |
| scores.append(weight * results[metric]["r2_score"]) | |
| return sum(scores) if scores else 0.0 | |
| def get_active_model_info(self, niche: str, platform: str) -> Dict[str, Any]: | |
| """Get information about currently active model.""" | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| if key in self.active_models: | |
| return self.active_models[key] | |
| return {"status": "no_active_model"} | |
| def add_training_sample( | |
| self, | |
| niche: str, | |
| platform: str, | |
| audio_embedding: np.ndarray, | |
| actual_metrics: Dict[str, float] | |
| ): | |
| """Add sample to training data.""" | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| self.training_data[key]["X"].append(audio_embedding) | |
| for metric, value in actual_metrics.items(): | |
| if metric not in self.training_data[key]["y"]: | |
| self.training_data[key]["y"][metric] = [] | |
| self.training_data[key]["y"][metric].append(value) | |
| def train_models(self, niche: str, platform: str): | |
| """Train prediction models for specific niche/platform.""" | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| data = self.training_data[key] | |
| if len(data["X"]) < 50: # Need minimum samples | |
| logger.warning(f"Insufficient training data for {key}: {len(data['X'])} samples") | |
| return | |
| X = np.vstack(data["X"]) | |
| # Train simple linear models for each metric | |
| # In production, use gradient boosting or neural nets | |
| models = {} | |
| for metric, y_values in data["y"].items(): | |
| if len(y_values) != len(X): | |
| continue | |
| y = np.array(y_values) | |
| # Simple linear regression via normal equation | |
| # θ = (X^T X)^-1 X^T y | |
| try: | |
| X_with_bias = np.c_[np.ones(len(X)), X] | |
| theta = np.linalg.lstsq(X_with_bias, y, rcond=None)[0] | |
| models[metric] = theta | |
| except Exception as e: | |
| logger.error(f"Failed to train {metric} model: {e}") | |
| self.models[key] = models | |
| logger.info(f"Trained {len(models)} prediction models for {key}") | |
| def predict( | |
| self, | |
| niche: str, | |
| platform: str, | |
| audio_embedding: np.ndarray | |
| ) -> Dict[str, float]: | |
| """ | |
| Predict performance metrics before posting. | |
| Returns: | |
| Dict with predicted_views_24h, predicted_completion_rate, etc. | |
| """ | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| if key not in self.models: | |
| # No model trained yet, return neutral predictions | |
| return { | |
| "predicted_views_24h": 50000.0, # Conservative baseline | |
| "predicted_completion_rate": 0.5, | |
| "predicted_engagement_score": 0.5, | |
| "predicted_retention_2s": 0.6, | |
| "confidence": 0.1 # Low confidence | |
| } | |
| models = self.models[key] | |
| predictions = {} | |
| X = np.r_[1.0, audio_embedding] # Add bias term | |
| for metric, theta in models.items(): | |
| if len(theta) != len(X): | |
| continue | |
| pred = np.dot(X, theta) | |
| # Clip to valid ranges | |
| if "rate" in metric or "retention" in metric or "score" in metric: | |
| pred = np.clip(pred, 0.0, 1.0) | |
| elif "views" in metric: | |
| pred = max(0.0, pred) | |
| predictions[f"predicted_{metric}"] = float(pred) | |
| # Add confidence based on training samples | |
| predictions["confidence"] = min(len(self.training_data[key]["X"]) / 500.0, 1.0) | |
| return predictions | |
| class TrendMomentumTracker: | |
| """ | |
| Tracks temporal momentum and decay for patterns. | |
| Enables adaptive trending vs stale pattern detection. | |
| """ | |
| def __init__(self, window_hours: int = 24): | |
| self.window_hours = window_hours | |
| self.pattern_timeseries = defaultdict(list) # pattern_key -> [(timestamp, engagement)] | |
| self.lock = threading.Lock() | |
| def add_performance_point( | |
| self, | |
| pattern_key: str, | |
| timestamp: datetime, | |
| engagement_score: float | |
| ): | |
| """Add performance data point for pattern.""" | |
| with self.lock: | |
| self.pattern_timeseries[pattern_key].append((timestamp, engagement_score)) | |
| # Prune old data | |
| cutoff = datetime.utcnow() - timedelta(hours=self.window_hours * 7) # Keep 7 windows | |
| self.pattern_timeseries[pattern_key] = [ | |
| (ts, score) for ts, score in self.pattern_timeseries[pattern_key] | |
| if ts > cutoff | |
| ] | |
| def calculate_momentum(self, pattern_key: str) -> Dict[str, float]: | |
| """ | |
| Calculate trend momentum and decay rate. | |
| Returns: | |
| { | |
| "trend_momentum": weighted recent growth, | |
| "decay_rate": exponential decay coefficient, | |
| "is_trending": boolean indicator, | |
| "velocity": rate of change | |
| } | |
| """ | |
| with self.lock: | |
| data = self.pattern_timeseries.get(pattern_key, []) | |
| if len(data) < 3: | |
| return { | |
| "trend_momentum": 0.0, | |
| "decay_rate": 0.0, | |
| "is_trending": False, | |
| "velocity": 0.0 | |
| } | |
| # Sort by timestamp | |
| data = sorted(data, key=lambda x: x[0]) | |
| # Split into recent and older | |
| midpoint = len(data) // 2 | |
| older_scores = [score for _, score in data[:midpoint]] | |
| recent_scores = [score for _, score in data[midpoint:]] | |
| # Calculate momentum as weighted growth | |
| older_avg = np.mean(older_scores) | |
| recent_avg = np.mean(recent_scores) | |
| if older_avg > 0: | |
| momentum = (recent_avg - older_avg) / older_avg | |
| else: | |
| momentum = 0.0 | |
| # Calculate exponential decay rate | |
| # Fit exponential: y = a * exp(b * t) | |
| timestamps = [(ts - data[0][0]).total_seconds() / 3600.0 for ts, _ in data] | |
| scores = [score for _, score in data] | |
| try: | |
| # Log-linear regression | |
| log_scores = np.log(np.maximum(scores, 1e-6)) | |
| coeffs = np.polyfit(timestamps, log_scores, 1) | |
| decay_rate = float(coeffs[0]) # Negative means decay | |
| except: | |
| decay_rate = 0.0 | |
| # Velocity (recent rate of change) | |
| if len(recent_scores) >= 2: | |
| recent_times = timestamps[midpoint:] | |
| velocity = (recent_scores[-1] - recent_scores[0]) / max(recent_times[-1] - recent_times[0], 1.0) | |
| else: | |
| velocity = 0.0 | |
| is_trending = momentum > 0.1 and decay_rate > -0.05 | |
| return { | |
| "trend_momentum": float(momentum), | |
| "decay_rate": float(decay_rate), | |
| "is_trending": is_trending, | |
| "velocity": float(velocity) | |
| } | |
| class AdaptiveAnomalyDetector: | |
| """ | |
| Enhanced anomaly detection with drift modeling and predictive capabilities. | |
| Uses exponential moving averages and rolling windows for adaptive thresholds. | |
| """ | |
| def __init__(self, alpha: float = 0.2, window_size: int = 1000): | |
| self.alpha = alpha # EMA learning rate | |
| self.window_size = window_size | |
| self.ema_stats = defaultdict(lambda: {"mean": 0.0, "var": 1.0, "n": 0}) | |
| self.rolling_windows = defaultdict(lambda: deque(maxlen=window_size)) | |
| self.drift_detector = defaultdict(lambda: {"last_mean": 0.0, "drift_count": 0}) | |
| self.lock = threading.Lock() | |
| def update_statistics( | |
| self, | |
| key: str, | |
| value: float, | |
| detect_drift: bool = True | |
| ): | |
| """Update EMA statistics with drift detection.""" | |
| with self.lock: | |
| stats = self.ema_stats[key] | |
| window = self.rolling_windows[key] | |
| # Update rolling window | |
| window.append(value) | |
| # Update EMA | |
| if stats["n"] == 0: | |
| stats["mean"] = value | |
| stats["var"] = 0.0 | |
| else: | |
| # Exponential moving average | |
| delta = value - stats["mean"] | |
| stats["mean"] += self.alpha * delta | |
| stats["var"] = (1 - self.alpha) * (stats["var"] + self.alpha * delta ** 2) | |
| stats["n"] += 1 | |
| # Drift detection | |
| if detect_drift and len(window) >= 100: | |
| self._detect_drift(key, window) | |
| def _detect_drift(self, key: str, window: deque): | |
| """Detect concept drift in distribution.""" | |
| drift = self.drift_detector[key] | |
| # Compare recent mean to older mean | |
| recent = list(window)[-50:] | |
| older = list(window)[:50] | |
| recent_mean = np.mean(recent) | |
| older_mean = np.mean(older) | |
| # Check for significant shift | |
| if abs(recent_mean - older_mean) > 2 * np.std(list(window)): | |
| drift["drift_count"] += 1 | |
| logger.warning(f"Drift detected for {key}: {older_mean:.3f} -> {recent_mean:.3f}") | |
| drift["last_mean"] = recent_mean | |
| def is_anomalous( | |
| self, | |
| key: str, | |
| value: float, | |
| n_sigma: float = 3.0 | |
| ) -> Tuple[bool, float]: | |
| """ | |
| Check if value is anomalous using adaptive thresholds. | |
| Returns: | |
| (is_anomaly, z_score) | |
| """ | |
| with self.lock: | |
| stats = self.ema_stats[key] | |
| if stats["n"] < 10: | |
| return False, 0.0 | |
| std = np.sqrt(max(stats["var"], 1e-6)) | |
| z_score = abs(value - stats["mean"]) / std | |
| is_anomaly = z_score > n_sigma | |
| return is_anomaly, float(z_score) | |
| def predict_failure_probability( | |
| self, | |
| record: AudioRecord | |
| ) -> Tuple[float, List[str]]: | |
| """ | |
| Predict probability of failure BEFORE posting. | |
| Returns: | |
| (failure_probability, risk_factors) | |
| """ | |
| risks = [] | |
| risk_scores = [] | |
| af = record.audio_features | |
| pm = record.platform_metrics | |
| # Check beat alignment risk | |
| beat_key = f"{record.niche}:beat_alignment" | |
| if beat_key in self.ema_stats: | |
| is_anom, z = self.is_anomalous(beat_key, af.beat_alignment.sync_quality_score) | |
| if is_anom and af.beat_alignment.sync_quality_score < self.ema_stats[beat_key]["mean"]: | |
| risks.append("poor_beat_alignment") | |
| risk_scores.append(min(z / 3.0, 1.0)) | |
| # Check energy risk | |
| energy_key = f"{record.niche}:energy" | |
| if energy_key in self.ema_stats: | |
| is_anom, z = self.is_anomalous(energy_key, af.energy_envelope.mean_energy_db) | |
| if is_anom and af.energy_envelope.mean_energy_db < self.ema_stats[energy_key]["mean"]: | |
| risks.append("low_energy") | |
| risk_scores.append(min(z / 3.0, 1.0)) | |
| # Check emotional intensity risk | |
| if af.word_features: | |
| early_emotion = np.mean([w.emotion_intensity for w in af.word_features[:5]]) | |
| emotion_key = f"{record.niche}:emotion" | |
| if emotion_key in self.ema_stats: | |
| is_anom, z = self.is_anomalous(emotion_key, early_emotion) | |
| if is_anom and early_emotion < self.ema_stats[emotion_key]["mean"]: | |
| risks.append("low_emotional_intensity") | |
| risk_scores.append(min(z / 3.0, 1.0)) | |
| # Aggregate risk | |
| if risk_scores: | |
| failure_prob = min(np.mean(risk_scores), 1.0) | |
| else: | |
| failure_prob = 0.1 # Baseline risk | |
| return failure_prob, risks | |
| def detect_anomalies(self, record: AudioRecord) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Detect anomalies using adaptive thresholds. | |
| Returns: | |
| (is_anomaly, reason) | |
| """ | |
| anomalies = [] | |
| # Check retention cliff | |
| ret_1s = record.platform_metrics.retention_1s | |
| ret_2s = record.platform_metrics.retention_2s | |
| if ret_1s > 0.8 and ret_2s < 0.3: | |
| anomalies.append("retention_cliff_1s_to_2s") | |
| # Check extreme negative signals | |
| if record.platform_metrics.mute_rate > 0.5: | |
| anomalies.append("high_mute_rate") | |
| if record.platform_metrics.scroll_away_velocity < 500: | |
| anomalies.append("instant_scroll_away") | |
| # Check beat alignment failure | |
| if record.audio_features.beat_alignment.on_beat_percentage < 0.3: | |
| anomalies.append("severe_beat_misalignment") | |
| # Check extreme pitch variance | |
| if record.audio_features.pitch_contour.pitch_variance > 5000: | |
| anomalies.append("extreme_pitch_variance") | |
| # Check silence detection | |
| if record.audio_features.energy_envelope.mean_energy_db < -60: | |
| anomalies.append("audio_too_quiet") | |
| # Adaptive statistical outlier detection | |
| key = f"{record.niche}:{record.platform}" | |
| is_eng_anom, z_eng = self.is_anomalous( | |
| f"{key}:engagement", | |
| record.platform_metrics.platform_engagement_score, | |
| n_sigma=3.5 | |
| ) | |
| if is_eng_anom and z_eng > 4.0: | |
| anomalies.append(f"statistical_outlier_z{z_eng:.1f}") | |
| # Check beat alignment adaptive | |
| is_beat_anom, z_beat = self.is_anomalous( | |
| f"{key}:beat", | |
| record.audio_features.beat_alignment.sync_quality_score | |
| ) | |
| if is_beat_anom and record.audio_features.beat_alignment.sync_quality_score < 0.5: | |
| anomalies.append(f"poor_beat_sync_z{z_beat:.1f}") | |
| if anomalies: | |
| return True, "; ".join(anomalies) | |
| return False, None | |
| class AudioPerformanceStore: | |
| """ | |
| Production-grade audio performance store for autonomous viral content system. | |
| CRITICAL PROPERTIES: | |
| - Append-only: No silent overwrites | |
| - Indexed retrieval: Optimized for RL queries | |
| - Event emission: Real-time orchestration integration | |
| - Anomaly detection: Automatic failure flagging | |
| - Scale: 20k-100k videos/day | |
| """ | |
| def __init__(self, db_path: str = "audio_performance_ground_truth.db"): | |
| """Initialize store with production-grade SQLite backend.""" | |
| verify_caller_authorization() | |
| self.db_path = Path(db_path) | |
| self.lock = threading.RLock() | |
| # Production ML components | |
| self.embedding_engine = PhaseAwareEmbeddingEngine() | |
| self.sequence_engine = ProductionSequenceEngine() | |
| self.virality_predictor = AdvancedViralityPredictor() | |
| self.trend_tracker = TrendMomentumTracker() | |
| self.anomaly_detector = AdaptiveAnomalyDetector() | |
| self.model_lifecycle = ModelLifecycleManager(self) | |
| self.rl_interface = RLIntegrationInterface(self) | |
| # Advanced optimization components | |
| self.fork_manager = EnhancedMultiForkManager(self) | |
| self.near_miss_analyzer = NearMissAnalyzer(self) | |
| self.silence_optimizer = SilencePatternOptimizer() | |
| self.physiology_aligner = HumanPhysiologyAligner() | |
| self.contract_enforcer = CrossModuleContractEnforcer() | |
| self.latency_compensator = PlatformLatencyCompensator() | |
| # Event system | |
| self._event_listeners: Dict[EventType, List[Callable]] = defaultdict(list) | |
| # Performance tracking | |
| self._ingest_count = 0 | |
| self._ingest_errors = 0 | |
| self._last_stats_reset = time.time() | |
| # Threshold configuration | |
| self.thresholds = { | |
| "extreme_success_retention_2s": 0.85, | |
| "extreme_failure_retention_2s": 0.15, | |
| "extreme_success_engagement": 0.9, | |
| "extreme_failure_engagement": 0.1, | |
| "viral_views_threshold": 5_000_000, | |
| "mega_viral_threshold": 30_000_000, | |
| "ultra_viral_threshold": 200_000_000 | |
| } | |
| # Initialize database | |
| self._init_database() | |
| # Schedule automatic model training | |
| self._schedule_background_tasks() | |
| logger.info(f"AudioPerformanceStore initialized (schema v{SCHEMA_VERSION})") | |
| logger.info(f"Database: {self.db_path.absolute()}") | |
| logger.info(f"="*80) | |
| logger.info(f"PRODUCTION CAPABILITIES ENABLED:") | |
| logger.info(f" ✓ Phase-aware embeddings (±0.5ms precision, 32 phase features)") | |
| logger.info(f" ✓ LSTM sequence modeling (temporal patterns, hook evolution)") | |
| logger.info(f" ✓ Ensemble ML (GBM+RF+Linear, 95%+ confidence intervals)") | |
| logger.info(f" ✓ Multi-fork optimization (15 variants, auto-pruning)") | |
| logger.info(f" ✓ Silence pattern optimization (retention lift scoring)") | |
| logger.info(f" ✓ Human physiology alignment (dopamine + attention curves)") | |
| logger.info(f" ✓ Cross-module contract enforcement (REJECT_AND_REGENERATE)") | |
| logger.info(f" ✓ Platform latency compensation (all devices/codecs)") | |
| logger.info(f" ✓ Model lifecycle management (auto train/promote/retire)") | |
| logger.info(f" ✓ Direct RL integration (action->reward->policy)") | |
| logger.info(f" ✓ Near-miss learning (counterfactual predictions)") | |
| logger.info(f"="*80) | |
| logger.info(f"TARGET: 5M+ baseline (10/10), 30M-300M+ repeatable (9.8-10/10)") | |
| def _schedule_background_tasks(self): | |
| """Schedule automatic model training and maintenance.""" | |
| def background_worker(): | |
| while True: | |
| try: | |
| time.sleep(3600) # Every hour | |
| # Check model lifecycle | |
| trained = self.model_lifecycle.check_and_train() | |
| if trained: | |
| logger.info(f"Background training completed: {trained}") | |
| # Prune old data (keep last 90 days) | |
| cutoff = (datetime.utcnow() - timedelta(days=90)).isoformat() | |
| # Implementation would go here | |
| except Exception as e: | |
| logger.error(f"Background task error: {e}") | |
| thread = threading.Thread(target=background_worker, daemon=True) | |
| thread.start() | |
| logger.info("Background task scheduler started") | |
| def _init_database(self): | |
| """Initialize production-grade database schema with indices.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Main records table (append-only) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS audio_records ( | |
| record_id TEXT PRIMARY KEY, | |
| video_id TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| audio_features_json TEXT NOT NULL, | |
| platform_metrics_json TEXT NOT NULL, | |
| niche TEXT NOT NULL, | |
| platform TEXT NOT NULL, | |
| beat_id TEXT NOT NULL, | |
| beat_version_lineage TEXT NOT NULL, | |
| voice_profile_hash TEXT NOT NULL, | |
| orchestration_job_id TEXT NOT NULL, | |
| language TEXT NOT NULL, | |
| trend_id TEXT, | |
| content_type TEXT NOT NULL, | |
| schema_version TEXT NOT NULL, | |
| ingestion_timestamp TEXT NOT NULL, | |
| is_anomaly INTEGER NOT NULL, | |
| anomaly_reason TEXT | |
| ) | |
| """) | |
| # Performance-critical indices | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_video_id ON audio_records(video_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_id ON audio_records(orchestration_job_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_platform_niche ON audio_records(platform, niche)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_beat_id ON audio_records(beat_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_voice_hash ON audio_records(voice_profile_hash)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_ingestion_time ON audio_records(ingestion_timestamp)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_anomaly ON audio_records(is_anomaly)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON audio_records(timestamp)") | |
| # Materialized view for fast winner/loser queries (ENHANCED) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS performance_summary ( | |
| record_id TEXT PRIMARY KEY, | |
| video_id TEXT NOT NULL, | |
| niche TEXT NOT NULL, | |
| platform TEXT NOT NULL, | |
| retention_1s REAL NOT NULL, | |
| retention_2s REAL NOT NULL, | |
| retention_3s REAL NOT NULL, | |
| completion_rate REAL NOT NULL, | |
| engagement_score REAL NOT NULL, | |
| mute_rate REAL NOT NULL, | |
| scroll_away_velocity REAL NOT NULL, | |
| beat_alignment_score REAL NOT NULL, | |
| is_winner INTEGER NOT NULL, | |
| is_loser INTEGER NOT NULL, | |
| audio_embedding BLOB, | |
| performance_embedding BLOB, | |
| predicted_views_24h REAL, | |
| predicted_completion_rate REAL, | |
| predicted_engagement_score REAL, | |
| predicted_retention_2s REAL, | |
| prediction_confidence REAL, | |
| trend_momentum REAL, | |
| decay_rate REAL, | |
| is_trending INTEGER, | |
| failure_risk_score REAL, | |
| ingestion_timestamp TEXT NOT NULL, | |
| FOREIGN KEY (record_id) REFERENCES audio_records(record_id) | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_winners ON performance_summary(is_winner, engagement_score DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_losers ON performance_summary(is_loser, retention_2s ASC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_engagement ON performance_summary(engagement_score DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_predicted_views ON performance_summary(predicted_views_24h DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_trending ON performance_summary(is_trending DESC, trend_momentum DESC)") | |
| # Pattern similarity graph | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS pattern_similarity ( | |
| record_id TEXT NOT NULL, | |
| similar_record_id TEXT NOT NULL, | |
| similarity_score REAL NOT NULL, | |
| similarity_type TEXT NOT NULL, | |
| computed_at TEXT NOT NULL, | |
| PRIMARY KEY (record_id, similar_record_id, similarity_type), | |
| FOREIGN KEY (record_id) REFERENCES audio_records(record_id), | |
| FOREIGN KEY (similar_record_id) REFERENCES audio_records(record_id) | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_similarity_score ON pattern_similarity(similarity_score DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_similarity_record ON pattern_similarity(record_id)") | |
| # Temporal performance trajectory | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS performance_trajectory ( | |
| trajectory_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| record_id TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| hours_since_post REAL NOT NULL, | |
| views INTEGER NOT NULL, | |
| engagement_score REAL NOT NULL, | |
| velocity REAL NOT NULL, | |
| FOREIGN KEY (record_id) REFERENCES audio_records(record_id) | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_trajectory_record ON performance_trajectory(record_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_trajectory_time ON performance_trajectory(hours_since_post)") | |
| # System metadata | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS system_metadata ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT NOT NULL, | |
| updated_at TEXT NOT NULL | |
| ) | |
| """) | |
| # Event log | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS event_log ( | |
| event_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| event_type TEXT NOT NULL, | |
| record_id TEXT, | |
| event_data TEXT NOT NULL, | |
| timestamp TEXT NOT NULL | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON event_log(event_type)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_time ON event_log(timestamp)") | |
| # Store schema version | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO system_metadata (key, value, updated_at) | |
| VALUES (?, ?, ?) | |
| """, ("schema_version", SCHEMA_VERSION, datetime.utcnow().isoformat())) | |
| conn.commit() | |
| @contextmanager | |
| def _get_connection(self): | |
| """Thread-safe database connection with WAL mode for concurrency.""" | |
| conn = sqlite3.connect( | |
| str(self.db_path), | |
| timeout=30.0, | |
| check_same_thread=False | |
| ) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| conn.execute("PRAGMA synchronous=NORMAL") | |
| conn.execute("PRAGMA cache_size=-64000") # 64MB cache | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| def store_audio_record(self, record: AudioRecord) -> bool: | |
| """ | |
| Store audio record with validation and anomaly detection. | |
| APPEND-ONLY: No overwrites. Duplicates are rejected. | |
| Args: | |
| record: Complete AudioRecord instance | |
| Returns: | |
| bool: True if stored successfully | |
| Raises: | |
| DataIntegrityError: If validation fails | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| try: | |
| # Validate record | |
| self._validate_record(record) | |
| # Compute phase-aware embeddings | |
| audio_embedding = self.embedding_engine.compute_phase_aware_embedding(record.audio_features) | |
| performance_embedding = self.embedding_engine.compute_performance_embedding(record.platform_metrics) | |
| # Extract sequence features | |
| sequence_features = self.sequence_engine.extract_feature_sequence(record.audio_features) | |
| # Get predictions with uncertainty | |
| predictions = self.virality_predictor.predict_with_uncertainty( | |
| record.niche, | |
| record.platform, | |
| audio_embedding, | |
| confidence_level=0.95 | |
| ) | |
| # Calculate trend momentum | |
| pattern_key = f"{record.niche}:{record.beat_id}" | |
| self.trend_tracker.add_performance_point( | |
| pattern_key, | |
| datetime.fromisoformat(record.timestamp), | |
| record.platform_metrics.platform_engagement_score | |
| ) | |
| momentum_metrics = self.trend_tracker.calculate_momentum(pattern_key) | |
| # Adaptive anomaly detection with predictive failure | |
| is_anomaly, reason = self.anomaly_detector.detect_anomalies(record) | |
| failure_prob, risk_factors = self.anomaly_detector.predict_failure_probability(record) | |
| if risk_factors: | |
| reason = (reason + "; " if reason else "") + ", ".join(risk_factors) | |
| record.is_anomaly = is_anomaly | |
| record.anomaly_reason = reason | |
| # Serialize complex objects | |
| audio_json = json.dumps(asdict(record.audio_features)) | |
| metrics_json = json.dumps(asdict(record.platform_metrics)) | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Append-only: Check for duplicates | |
| cursor.execute( | |
| "SELECT record_id FROM audio_records WHERE record_id = ?", | |
| (record.record_id,) | |
| ) | |
| if cursor.fetchone(): | |
| raise DataIntegrityError( | |
| f"APPEND-ONLY VIOLATION: Record {record.record_id} already exists" | |
| ) | |
| # Insert main record | |
| cursor.execute(""" | |
| INSERT INTO audio_records ( | |
| record_id, video_id, timestamp, audio_features_json, | |
| platform_metrics_json, niche, platform, beat_id, | |
| beat_version_lineage, voice_profile_hash, orchestration_job_id, | |
| language, trend_id, content_type, schema_version, | |
| ingestion_timestamp, is_anomaly, anomaly_reason | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| record.record_id, record.video_id, record.timestamp, | |
| audio_json, metrics_json, record.niche, record.platform, | |
| record.beat_id, record.beat_version_lineage, | |
| record.voice_profile_hash, record.orchestration_job_id, | |
| record.language, record.trend_id, record.content_type, | |
| record.schema_version, record.ingestion_timestamp, | |
| 1 if record.is_anomaly else 0, record.anomaly_reason | |
| )) | |
| # Update enhanced performance summary | |
| self._update_performance_summary( | |
| cursor, record, audio_embedding, performance_embedding, | |
| predictions, momentum_metrics, failure_prob | |
| ) | |
| conn.commit() | |
| # Update adaptive statistics | |
| self.anomaly_detector.update_statistics( | |
| f"{record.niche}:engagement", | |
| record.platform_metrics.platform_engagement_score | |
| ) | |
| self.anomaly_detector.update_statistics( | |
| f"{record.niche}:beat_alignment", | |
| record.audio_features.beat_alignment.sync_quality_score | |
| ) | |
| self.anomaly_detector.update_statistics( | |
| f"{record.niche}:energy", | |
| record.audio_features.energy_envelope.mean_energy_db | |
| ) | |
| # Add to training data with sequences | |
| self.virality_predictor.add_training_sample( | |
| record.niche, | |
| record.platform, | |
| audio_embedding, | |
| sequence_features, | |
| { | |
| "views_24h": record.platform_metrics.shares * 100, # Proxy | |
| "completion_rate": record.platform_metrics.completion_rate, | |
| "engagement_score": record.platform_metrics.platform_engagement_score, | |
| "retention_2s": record.platform_metrics.retention_2s | |
| } | |
| ) | |
| # Add to sequence model training data | |
| self.sequence_engine.train_sequence_model( | |
| record.niche, | |
| record.platform, | |
| [sequence_features], | |
| [record.platform_metrics.platform_engagement_score], | |
| "engagement_score" | |
| ) | |
| # Update embedding engine | |
| self.embedding_engine.update_incremental(audio_embedding) | |
| # Compute and store similarities (async in production) | |
| self._compute_similarities(record.record_id, audio_embedding) | |
| # Track ingestion | |
| self._ingest_count += 1 | |
| self._check_ingestion_rate() | |
| # Emit events | |
| self._emit_event(EventType.RECORD_STORED, record) | |
| if is_anomaly: | |
| self._emit_event(EventType.ANOMALY_DETECTED, record) | |
| self._log_event(EventType.ANOMALY_DETECTED, record.record_id, { | |
| "reason": reason, | |
| "video_id": record.video_id | |
| }) | |
| # Check extreme thresholds | |
| self._check_extreme_performance(record) | |
| logger.debug(f"Stored record {record.record_id} (anomaly: {is_anomaly})") | |
| return True | |
| except Exception as e: | |
| self._ingest_errors += 1 | |
| logger.error(f"CRITICAL: Failed to store record: {e}", exc_info=True) | |
| return False | |
| def _validate_record(self, record: AudioRecord): | |
| """Validate record before storage. Raises DataIntegrityError if invalid.""" | |
| # Validate required fields | |
| if not record.video_id or not record.orchestration_job_id: | |
| raise DataIntegrityError("Missing required identifiers") | |
| # Validate retention values | |
| pm = record.platform_metrics | |
| if not (0 <= pm.retention_1s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_1s: {pm.retention_1s}") | |
| if not (0 <= pm.retention_2s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_2s: {pm.retention_2s}") | |
| if not (0 <= pm.retention_3s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_3s: {pm.retention_3s}") | |
| # Validate retention ordering | |
| if pm.retention_2s > pm.retention_1s + 0.01: # Small tolerance | |
| raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)") | |
| if pm.retention_3s > pm.retention_2s + 0.01: | |
| raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)") | |
| # Validate audio features | |
| af = record.audio_features | |
| if af.total_duration_ms <= 0: | |
| raise DataIntegrityError("Invalid duration") | |
| if len(af.syllable_features) == 0: | |
| raise DataIntegrityError("Missing syllable features") | |
| if len(af.word_features) == 0: | |
| raise DataIntegrityError("Missing word features") | |
| def _update_performance_summary( | |
| self, | |
| cursor, | |
| record: AudioRecord, | |
| audio_embedding: np.ndarray, | |
| performance_embedding: np.ndarray, | |
| predictions: Dict[str, float], | |
| momentum_metrics: Dict[str, float], | |
| failure_risk: float | |
| ): | |
| """Update enhanced materialized view with ML features.""" | |
| pm = record.platform_metrics | |
| ba = record.audio_features.beat_alignment | |
| # Determine winner/loser status | |
| is_winner = ( | |
| pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and | |
| pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"] | |
| ) | |
| is_loser = ( | |
| pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or | |
| pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"] | |
| ) | |
| # Serialize embeddings | |
| audio_emb_bytes = pickle.dumps(audio_embedding) | |
| perf_emb_bytes = pickle.dumps(performance_embedding) | |
| cursor.execute(""" | |
| INSERT INTO performance_summary ( | |
| record_id, video_id, niche, platform, retention_1s, retention_2s, | |
| retention_3s, completion_rate, engagement_score, mute_rate, | |
| scroll_away_velocity, beat_alignment_score, is_winner, is_loser, | |
| audio_embedding, performance_embedding, | |
| predicted_views_24h, predicted_completion_rate, predicted_engagement_score, | |
| predicted_retention_2s, prediction_confidence, | |
| trend_momentum, decay_rate, is_trending, failure_risk_score, | |
| ingestion_timestamp | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| record.record_id, record.video_id, record.niche, record.platform, | |
| pm.retention_1s, pm.retention_2s, pm.retention_3s, pm.completion_rate, | |
| pm.platform_engagement_score, pm.mute_rate, pm.scroll_away_velocity, | |
| ba.sync_quality_score, 1 if is_winner else 0, 1 if is_loser else 0, | |
| audio_emb_bytes, perf_emb_bytes, | |
| predictions.get("predicted_views_24h", 0.0), | |
| predictions.get("predicted_completion_rate", 0.0), | |
| predictions.get("predicted_engagement_score", 0.0), | |
| predictions.get("predicted_retention_2s", 0.0), | |
| predictions.get("confidence", 0.0), | |
| momentum_metrics["trend_momentum"], | |
| momentum_metrics["decay_rate"], | |
| 1 if momentum_metrics["is_trending"] else 0, | |
| failure_risk, | |
| record.ingestion_timestamp | |
| )) | |
| def _compute_similarities(self, record_id: str, embedding: np.ndarray): | |
| """Compute and store pattern similarities (k-nearest neighbors).""" | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Get recent embeddings for comparison | |
| cursor.execute(""" | |
| SELECT record_id, audio_embedding | |
| FROM performance_summary | |
| WHERE record_id != ? | |
| ORDER BY ingestion_timestamp DESC | |
| LIMIT 1000 | |
| """, (record_id,)) | |
| rows = cursor.fetchall() | |
| similarities = [] | |
| for row in rows: | |
| other_id = row[0] | |
| other_emb = pickle.loads(row[1]) | |
| # Cosine similarity | |
| cos_sim = 1.0 - cosine(embedding, other_emb) | |
| if cos_sim > 0.7: # Only store high similarities | |
| similarities.append((other_id, cos_sim)) | |
| # Store top-K similarities | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| for other_id, score in similarities[:20]: # Top 20 | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO pattern_similarity | |
| (record_id, similar_record_id, similarity_score, similarity_type, computed_at) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ( | |
| record_id, other_id, score, "audio_cosine", | |
| datetime.utcnow().isoformat() | |
| )) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"Failed to compute similarities: {e}") | |
| def _check_extreme_performance(self, record: AudioRecord): | |
| """Check for extreme success/failure and emit events.""" | |
| pm = record.platform_metrics | |
| if (pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and | |
| pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"]): | |
| self._emit_event(EventType.EXTREME_SUCCESS, record) | |
| self._log_event(EventType.EXTREME_SUCCESS, record.record_id, { | |
| "retention_2s": pm.retention_2s, | |
| "engagement": pm.platform_engagement_score, | |
| "niche": record.niche | |
| }) | |
| logger.info(f"EXTREME SUCCESS: {record.video_id} (ret={pm.retention_2s:.2f})") | |
| elif (pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or | |
| pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"]): | |
| self._emit_event(EventType.EXTREME_FAILURE, record) | |
| self._log_event(EventType.EXTREME_FAILURE, record.record_id, { | |
| "retention_2s": pm.retention_2s, | |
| "engagement": pm.platform_engagement_score, | |
| "mute_rate": pm.mute_rate, | |
| "niche": record.niche | |
| }) | |
| logger.warning(f"EXTREME FAILURE: {record.video_id} (ret={pm.retention_2s:.2f})") | |
| def get_winners_vs_losers( | |
| self, | |
| filters: Optional[Dict[str, Any]] = None, | |
| limit_per_group: int = 1000 | |
| ) -> Dict[str, List[AudioRecord]]: | |
| """ | |
| Get winners and losers for comparison (critical for RL). | |
| Returns: | |
| {"winners": [...], "losers": [...]} | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| winners = self._query_performance_group(filters, "winners", limit_per_group) | |
| losers = self._query_performance_group(filters, "losers", limit_per_group) | |
| return { | |
| "winners": winners, | |
| "losers": losers | |
| } | |
| def analyze_retention_killers( | |
| self, | |
| filters: Optional[Dict[str, Any]] = None, | |
| threshold_ms: int = 2000 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Identify what killed retention early (< threshold_ms). | |
| Returns list of analysis dicts with audio feature correlations. | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| # Get records with early retention failure | |
| query = """ | |
| SELECT ar.* FROM audio_records ar | |
| JOIN performance_summary ps ON ar.record_id = ps.record_id | |
| WHERE ps.retention_2s < 0.3 | |
| """ | |
| params = [] | |
| if filters: | |
| conditions = [] | |
| for key, value in filters.items(): | |
| if key in ["niche", "platform", "beat_id"]: | |
| conditions.append(f"ar.{key} = ?") | |
| params.append(value) | |
| if conditions: | |
| query += " AND " + " AND ".join(conditions) | |
| query += " ORDER BY ps.retention_2s ASC LIMIT 500" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| if not rows: | |
| return [] | |
| records = [self._row_to_record(row) for row in rows] | |
| # Analyze common patterns | |
| analyses = [] | |
| for record in records: | |
| af = record.audio_features | |
| pm = record.platform_metrics | |
| killers = [] | |
| # Check first 2 seconds of audio | |
| early_syllables = [s for s in af.syllable_features if s.start_time_ms < threshold_ms] | |
| if early_syllables: | |
| avg_energy = mean([s.energy_db for s in early_syllables]) | |
| if avg_energy < -50: | |
| killers.append("low_energy_start") | |
| avg_beat_error = mean([abs(s.beat_alignment_error_ms) for s in early_syllables]) | |
| if avg_beat_error > 150: | |
| killers.append("poor_beat_alignment_start") | |
| # Check pause density early | |
| early_pauses = [p for p in af.pause_metrics. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment