Last active
December 31, 2025 00:11
-
-
Save bogged-broker/ff2ff7a67bb97202c4d8f1069e4b4176 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| audio_performance_store.py | |
| CRITICAL: Single source of truth for all audio decisions in autonomous viral content system. | |
| Every downstream learning, punishment, and reward depends on this data. | |
| SCALE: 20k-100k videos/day, append-only with indexed retrieval | |
| INTEGRATION: Orchestration scheduler, enforcement layers, RL feedback loops | |
| FAILURE MODE: Incorrect data = catastrophic system failure | |
| SCHEMA VERSION: 2.0.0 | |
| """ | |
| import sqlite3 | |
| import threading | |
| import time | |
| import hashlib | |
| import json | |
| import pickle | |
| from collections import defaultdict, deque | |
| from contextlib import contextmanager | |
| from dataclasses import dataclass, field, asdict | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any, Tuple, Callable | |
| from enum import Enum | |
| import numpy as np | |
| import logging | |
| from statistics import mean, stdev | |
| from sklearn.decomposition import PCA | |
| from sklearn.preprocessing import StandardScaler | |
| from scipy.spatial.distance import cosine, euclidean | |
| from scipy.stats import pearsonr | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Schema version | |
| SCHEMA_VERSION = "2.0.0" | |
| # Access control | |
| AUTHORIZED_MODULES = { | |
| "audio_generation_controller", | |
| "audio_reinforcement_loop", | |
| "feedback_ingestor", | |
| "scheduler", | |
| "audio_pattern_learner" | |
| } | |
| class UnauthorizedAccessError(Exception): | |
| """Raised when unauthorized module attempts access.""" | |
| pass | |
| class DataIntegrityError(Exception): | |
| """Raised when data validation fails.""" | |
| pass | |
| class EventType(Enum): | |
| """Event types for orchestration integration.""" | |
| EXTREME_SUCCESS = "extreme_success" | |
| EXTREME_FAILURE = "extreme_failure" | |
| ANOMALY_DETECTED = "anomaly_detected" | |
| RECORD_STORED = "record_stored" | |
| THRESHOLD_CROSSED = "threshold_crossed" | |
| def verify_caller_authorization(): | |
| """Verify calling module is authorized. MANDATORY for all public methods.""" | |
| import inspect | |
| frame = inspect.currentframe() | |
| try: | |
| caller_frame = frame.f_back.f_back | |
| caller_module = inspect.getmodule(caller_frame) | |
| if caller_module: | |
| module_name = caller_module.__name__.split('.')[-1] | |
| if module_name not in AUTHORIZED_MODULES: | |
| raise UnauthorizedAccessError( | |
| f"SECURITY: Module '{module_name}' unauthorized. " | |
| f"Authorized: {AUTHORIZED_MODULES}" | |
| ) | |
| finally: | |
| del frame | |
| @dataclass | |
| class SyllableLevelTiming: | |
| """Time-segmented syllable-level audio features.""" | |
| syllable_index: int | |
| start_time_ms: float | |
| duration_ms: float | |
| pitch_hz: float | |
| energy_db: float | |
| beat_alignment_error_ms: float # Deviation from expected beat position | |
| phoneme_sequence: List[str] | |
| stress_level: float # 0.0 to 1.0 | |
| @dataclass | |
| class WordLevelFeatures: | |
| """Word-level emotion and delivery metrics.""" | |
| word_index: int | |
| word_text: str | |
| start_time_ms: float | |
| duration_ms: float | |
| emotion_intensity: float # 0.0 to 1.0 | |
| emotion_class: str # e.g., "excited", "calm", "urgent" | |
| emphasis_score: float # Relative emphasis vs surrounding words | |
| clarity_score: float # Pronunciation clarity | |
| @dataclass | |
| class PitchContour: | |
| """Time-series pitch data with statistical features.""" | |
| timestamps_ms: List[float] | |
| pitch_hz_values: List[float] | |
| pitch_variance: float | |
| pitch_range_semitones: float | |
| pitch_slope: float # Linear regression slope | |
| pitch_inflection_points: List[int] # Indices of significant changes | |
| @dataclass | |
| class EnergyEnvelope: | |
| """Time-series energy/amplitude data.""" | |
| timestamps_ms: List[float] | |
| energy_db_values: List[float] | |
| peak_energy_db: float | |
| mean_energy_db: float | |
| energy_variance: float | |
| dynamic_range_db: float | |
| attack_times_ms: List[float] # Time to reach peaks | |
| decay_times_ms: List[float] # Time from peaks to valleys | |
| @dataclass | |
| class PauseDensityMetrics: | |
| """Pause timing and distribution analysis.""" | |
| total_pause_count: int | |
| pause_durations_ms: List[float] | |
| pause_positions_ms: List[float] | |
| mean_pause_duration_ms: float | |
| pause_variance_ms: float | |
| inter_pause_intervals_ms: List[float] | |
| strategic_pause_count: int # Pauses before hooks/key phrases | |
| @dataclass | |
| class BeatAlignmentMetrics: | |
| """Multi-timestamp beat synchronization metrics.""" | |
| beat_timestamps_ms: List[float] # Expected beat positions | |
| syllable_timestamps_ms: List[float] # Actual syllable positions | |
| alignment_errors_ms: List[float] # Per-syllable deviations | |
| mean_error_ms: float | |
| max_error_ms: float | |
| on_beat_percentage: float # % within acceptable tolerance | |
| sync_quality_score: float # 0.0 to 1.0 | |
| @dataclass | |
| class SpectralFeatures: | |
| """Spectral analysis across frequency bands.""" | |
| timestamps_ms: List[float] | |
| low_band_energy: List[float] # 0-200 Hz | |
| mid_low_energy: List[float] # 200-500 Hz | |
| mid_energy: List[float] # 500-2000 Hz | |
| mid_high_energy: List[float] # 2000-4000 Hz | |
| high_energy: List[float] # 4000+ Hz | |
| spectral_centroid: List[float] | |
| spectral_rolloff: List[float] | |
| dominant_band_per_segment: List[str] | |
| @dataclass | |
| class SegmentedAudioFeatures: | |
| """Complete time-segmented audio feature set.""" | |
| # Syllable-level data | |
| syllable_features: List[SyllableLevelTiming] | |
| # Word-level data | |
| word_features: List[WordLevelFeatures] | |
| # Time-series data | |
| pitch_contour: PitchContour | |
| energy_envelope: EnergyEnvelope | |
| # Timing metrics | |
| pause_metrics: PauseDensityMetrics | |
| beat_alignment: BeatAlignmentMetrics | |
| # Spectral data | |
| spectral_features: SpectralFeatures | |
| # Overall metrics | |
| total_duration_ms: float | |
| words_per_minute: float | |
| syllables_per_second: float | |
| # Extraction metadata | |
| extraction_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| extractor_version: str = SCHEMA_VERSION | |
| @dataclass | |
| class PlatformMetrics: | |
| """Platform-normalized performance metrics with negative signals.""" | |
| # Retention breakdown | |
| retention_1s: float # Critical early hook | |
| retention_2s: float | |
| retention_3s: float | |
| completion_rate: float | |
| # Engagement signals | |
| rewatch_count: int | |
| rewatch_rate: float # Rewatches per view | |
| shares: int | |
| shares_per_impression: float | |
| saves: int | |
| # Negative signals (CRITICAL for failure detection) | |
| scroll_away_velocity: float # Avg ms before skip | |
| mute_rate: float # % who muted audio | |
| skip_rate: float # % who skipped before completion | |
| negative_feedback_count: int # "Not interested" etc. | |
| # Normalized scores | |
| platform_engagement_score: float # Platform-specific normalization | |
| virality_coefficient: float # Shares / (views * time_decay) | |
| # Collection metadata | |
| platform: str | |
| collection_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| metrics_version: str = SCHEMA_VERSION | |
| @dataclass | |
| class AudioRecord: | |
| """Complete append-only record for a single video's audio performance.""" | |
| # Primary identifiers | |
| record_id: str # Unique hash: video_id + timestamp | |
| video_id: str | |
| timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| # Audio features (immutable after creation) | |
| audio_features: SegmentedAudioFeatures | |
| # Platform performance (immutable after final collection) | |
| platform_metrics: PlatformMetrics | |
| # Mandatory tags for retrieval | |
| niche: str | |
| platform: str | |
| beat_id: str | |
| beat_version_lineage: str # e.g., "beat_v3 <- beat_v2 <- beat_v1" | |
| voice_profile_hash: str # SHA256 of voice config | |
| orchestration_job_id: str | |
| # Additional context | |
| language: str | |
| trend_id: Optional[str] = None | |
| content_type: str = "audio_overlay" # For future expansion | |
| # Metadata (append-only guarantees) | |
| schema_version: str = SCHEMA_VERSION | |
| ingestion_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| is_anomaly: bool = False | |
| anomaly_reason: Optional[str] = None | |
| def __post_init__(self): | |
| """Generate deterministic record_id.""" | |
| if not self.record_id: | |
| hash_input = f"{self.video_id}:{self.timestamp}:{self.orchestration_job_id}" | |
| self.record_id = hashlib.sha256(hash_input.encode()).hexdigest()[:16] | |
| class EmbeddingEngine: | |
| """ | |
| Computes and manages embeddings for audio patterns. | |
| Enables fast similarity search and ML-based pattern matching. | |
| """ | |
| def __init__(self): | |
| self.embedding_dim = 128 | |
| self.pca = None | |
| self.scaler = StandardScaler() | |
| self.lock = threading.Lock() | |
| self.feature_cache = deque(maxlen=10000) # Cache for incremental PCA | |
| def compute_audio_embedding(self, audio_features: SegmentedAudioFeatures) -> np.ndarray: | |
| """ | |
| Compute 128-dimensional embedding from audio features. | |
| Combines MFCC-like, phase, timing, and spectral features. | |
| """ | |
| features = [] | |
| # 1. Syllable timing features (statistical summaries) | |
| if audio_features.syllable_features: | |
| syl_durations = [s.duration_ms for s in audio_features.syllable_features[:20]] | |
| syl_energies = [s.energy_db for s in audio_features.syllable_features[:20]] | |
| syl_pitches = [s.pitch_hz for s in audio_features.syllable_features[:20]] | |
| syl_beat_errors = [abs(s.beat_alignment_error_ms) for s in audio_features.syllable_features[:20]] | |
| features.extend([ | |
| np.mean(syl_durations), np.std(syl_durations), np.max(syl_durations), | |
| np.mean(syl_energies), np.std(syl_energies), np.max(syl_energies), | |
| np.mean(syl_pitches), np.std(syl_pitches), | |
| np.mean(syl_beat_errors), np.max(syl_beat_errors) | |
| ]) | |
| else: | |
| features.extend([0.0] * 10) | |
| # 2. Word-level emotion features | |
| if audio_features.word_features: | |
| emotions = [w.emotion_intensity for w in audio_features.word_features[:15]] | |
| emphasis = [w.emphasis_score for w in audio_features.word_features[:15]] | |
| features.extend([ | |
| np.mean(emotions), np.std(emotions), np.max(emotions), | |
| np.mean(emphasis), np.std(emphasis) | |
| ]) | |
| else: | |
| features.extend([0.0] * 5) | |
| # 3. Pitch contour features | |
| pc = audio_features.pitch_contour | |
| features.extend([ | |
| pc.pitch_variance, | |
| pc.pitch_range_semitones, | |
| pc.pitch_slope, | |
| len(pc.pitch_inflection_points) / max(len(pc.timestamps_ms), 1) | |
| ]) | |
| # 4. Energy envelope features | |
| ee = audio_features.energy_envelope | |
| features.extend([ | |
| ee.peak_energy_db, | |
| ee.mean_energy_db, | |
| ee.energy_variance, | |
| ee.dynamic_range_db, | |
| np.mean(ee.attack_times_ms) if ee.attack_times_ms else 0.0, | |
| np.mean(ee.decay_times_ms) if ee.decay_times_ms else 0.0 | |
| ]) | |
| # 5. Pause metrics | |
| pm = audio_features.pause_metrics | |
| features.extend([ | |
| pm.total_pause_count, | |
| pm.mean_pause_duration_ms, | |
| pm.pause_variance_ms, | |
| pm.strategic_pause_count | |
| ]) | |
| # 6. Beat alignment | |
| ba = audio_features.beat_alignment | |
| features.extend([ | |
| ba.mean_error_ms, | |
| ba.max_error_ms, | |
| ba.on_beat_percentage, | |
| ba.sync_quality_score | |
| ]) | |
| # 7. Spectral features (sample from time-series) | |
| sf = audio_features.spectral_features | |
| if sf.low_band_energy: | |
| features.extend([ | |
| np.mean(sf.low_band_energy[:10]), | |
| np.mean(sf.mid_energy[:10]), | |
| np.mean(sf.high_energy[:10]), | |
| np.mean(sf.spectral_centroid[:10]) if sf.spectral_centroid else 0.0 | |
| ]) | |
| else: | |
| features.extend([0.0] * 4) | |
| # 8. Overall metrics | |
| features.extend([ | |
| audio_features.total_duration_ms, | |
| audio_features.words_per_minute, | |
| audio_features.syllables_per_second | |
| ]) | |
| # Pad or truncate to consistent dimension before PCA | |
| feature_array = np.array(features, dtype=np.float32) | |
| # Handle NaN/inf | |
| feature_array = np.nan_to_num(feature_array, nan=0.0, posinf=1e6, neginf=-1e6) | |
| # If we have enough data, apply PCA | |
| if self.pca is not None: | |
| with self.lock: | |
| feature_array = self.scaler.transform(feature_array.reshape(1, -1)) | |
| embedding = self.pca.transform(feature_array)[0] | |
| else: | |
| # Not yet trained, return raw features (will be reduced later) | |
| embedding = feature_array | |
| # Ensure output is exactly 128 dimensions | |
| if len(embedding) < self.embedding_dim: | |
| embedding = np.pad(embedding, (0, self.embedding_dim - len(embedding))) | |
| elif len(embedding) > self.embedding_dim: | |
| embedding = embedding[:self.embedding_dim] | |
| return embedding | |
| def compute_performance_embedding(self, platform_metrics: PlatformMetrics) -> np.ndarray: | |
| """Compute performance-focused embedding.""" | |
| features = [ | |
| platform_metrics.retention_1s, | |
| platform_metrics.retention_2s, | |
| platform_metrics.retention_3s, | |
| platform_metrics.completion_rate, | |
| platform_metrics.rewatch_rate, | |
| platform_metrics.shares_per_impression, | |
| 1.0 - platform_metrics.skip_rate, # scroll_stop_probability | |
| 1.0 - platform_metrics.mute_rate, # audio_kept_on | |
| platform_metrics.platform_engagement_score, | |
| platform_metrics.virality_coefficient, | |
| np.log1p(platform_metrics.rewatch_count), | |
| np.log1p(platform_metrics.shares) | |
| ] | |
| return np.array(features, dtype=np.float32) | |
| def train_pca(self, feature_matrix: np.ndarray): | |
| """Train PCA for dimensionality reduction.""" | |
| with self.lock: | |
| self.scaler.fit(feature_matrix) | |
| normalized = self.scaler.transform(feature_matrix) | |
| n_components = min(self.embedding_dim, feature_matrix.shape[1], feature_matrix.shape[0]) | |
| self.pca = PCA(n_components=n_components) | |
| self.pca.fit(normalized) | |
| logger.info(f"PCA trained: {n_components} components, " | |
| f"explained variance: {self.pca.explained_variance_ratio_.sum():.2%}") | |
| def update_incremental(self, features: np.ndarray): | |
| """Add features to cache for incremental PCA updates.""" | |
| with self.lock: | |
| self.feature_cache.append(features) | |
| # Retrain PCA every 1000 samples | |
| if len(self.feature_cache) >= 1000: | |
| matrix = np.vstack(list(self.feature_cache)) | |
| self.train_pca(matrix) | |
| self.feature_cache.clear() | |
| class ViralityPredictor: | |
| """ | |
| Predicts virality metrics before posting using historical patterns. | |
| Enables pre-posting quality gates and RL reward estimation. | |
| """ | |
| def __init__(self): | |
| self.models = {} # Will store trained models per niche/platform | |
| self.lock = threading.Lock() | |
| self.training_data = defaultdict(lambda: {"X": [], "y": {}}) | |
| def add_training_sample( | |
| self, | |
| niche: str, | |
| platform: str, | |
| audio_embedding: np.ndarray, | |
| actual_metrics: Dict[str, float] | |
| ): | |
| """Add sample to training data.""" | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| self.training_data[key]["X"].append(audio_embedding) | |
| for metric, value in actual_metrics.items(): | |
| if metric not in self.training_data[key]["y"]: | |
| self.training_data[key]["y"][metric] = [] | |
| self.training_data[key]["y"][metric].append(value) | |
| def train_models(self, niche: str, platform: str): | |
| """Train prediction models for specific niche/platform.""" | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| data = self.training_data[key] | |
| if len(data["X"]) < 50: # Need minimum samples | |
| logger.warning(f"Insufficient training data for {key}: {len(data['X'])} samples") | |
| return | |
| X = np.vstack(data["X"]) | |
| # Train simple linear models for each metric | |
| # In production, use gradient boosting or neural nets | |
| models = {} | |
| for metric, y_values in data["y"].items(): | |
| if len(y_values) != len(X): | |
| continue | |
| y = np.array(y_values) | |
| # Simple linear regression via normal equation | |
| # θ = (X^T X)^-1 X^T y | |
| try: | |
| X_with_bias = np.c_[np.ones(len(X)), X] | |
| theta = np.linalg.lstsq(X_with_bias, y, rcond=None)[0] | |
| models[metric] = theta | |
| except Exception as e: | |
| logger.error(f"Failed to train {metric} model: {e}") | |
| self.models[key] = models | |
| logger.info(f"Trained {len(models)} prediction models for {key}") | |
| def predict( | |
| self, | |
| niche: str, | |
| platform: str, | |
| audio_embedding: np.ndarray | |
| ) -> Dict[str, float]: | |
| """ | |
| Predict performance metrics before posting. | |
| Returns: | |
| Dict with predicted_views_24h, predicted_completion_rate, etc. | |
| """ | |
| key = f"{niche}:{platform}" | |
| with self.lock: | |
| if key not in self.models: | |
| # No model trained yet, return neutral predictions | |
| return { | |
| "predicted_views_24h": 50000.0, # Conservative baseline | |
| "predicted_completion_rate": 0.5, | |
| "predicted_engagement_score": 0.5, | |
| "predicted_retention_2s": 0.6, | |
| "confidence": 0.1 # Low confidence | |
| } | |
| models = self.models[key] | |
| predictions = {} | |
| X = np.r_[1.0, audio_embedding] # Add bias term | |
| for metric, theta in models.items(): | |
| if len(theta) != len(X): | |
| continue | |
| pred = np.dot(X, theta) | |
| # Clip to valid ranges | |
| if "rate" in metric or "retention" in metric or "score" in metric: | |
| pred = np.clip(pred, 0.0, 1.0) | |
| elif "views" in metric: | |
| pred = max(0.0, pred) | |
| predictions[f"predicted_{metric}"] = float(pred) | |
| # Add confidence based on training samples | |
| predictions["confidence"] = min(len(self.training_data[key]["X"]) / 500.0, 1.0) | |
| return predictions | |
| class TrendMomentumTracker: | |
| """ | |
| Tracks temporal momentum and decay for patterns. | |
| Enables adaptive trending vs stale pattern detection. | |
| """ | |
| def __init__(self, window_hours: int = 24): | |
| self.window_hours = window_hours | |
| self.pattern_timeseries = defaultdict(list) # pattern_key -> [(timestamp, engagement)] | |
| self.lock = threading.Lock() | |
| def add_performance_point( | |
| self, | |
| pattern_key: str, | |
| timestamp: datetime, | |
| engagement_score: float | |
| ): | |
| """Add performance data point for pattern.""" | |
| with self.lock: | |
| self.pattern_timeseries[pattern_key].append((timestamp, engagement_score)) | |
| # Prune old data | |
| cutoff = datetime.utcnow() - timedelta(hours=self.window_hours * 7) # Keep 7 windows | |
| self.pattern_timeseries[pattern_key] = [ | |
| (ts, score) for ts, score in self.pattern_timeseries[pattern_key] | |
| if ts > cutoff | |
| ] | |
| def calculate_momentum(self, pattern_key: str) -> Dict[str, float]: | |
| """ | |
| Calculate trend momentum and decay rate. | |
| Returns: | |
| { | |
| "trend_momentum": weighted recent growth, | |
| "decay_rate": exponential decay coefficient, | |
| "is_trending": boolean indicator, | |
| "velocity": rate of change | |
| } | |
| """ | |
| with self.lock: | |
| data = self.pattern_timeseries.get(pattern_key, []) | |
| if len(data) < 3: | |
| return { | |
| "trend_momentum": 0.0, | |
| "decay_rate": 0.0, | |
| "is_trending": False, | |
| "velocity": 0.0 | |
| } | |
| # Sort by timestamp | |
| data = sorted(data, key=lambda x: x[0]) | |
| # Split into recent and older | |
| midpoint = len(data) // 2 | |
| older_scores = [score for _, score in data[:midpoint]] | |
| recent_scores = [score for _, score in data[midpoint:]] | |
| # Calculate momentum as weighted growth | |
| older_avg = np.mean(older_scores) | |
| recent_avg = np.mean(recent_scores) | |
| if older_avg > 0: | |
| momentum = (recent_avg - older_avg) / older_avg | |
| else: | |
| momentum = 0.0 | |
| # Calculate exponential decay rate | |
| # Fit exponential: y = a * exp(b * t) | |
| timestamps = [(ts - data[0][0]).total_seconds() / 3600.0 for ts, _ in data] | |
| scores = [score for _, score in data] | |
| try: | |
| # Log-linear regression | |
| log_scores = np.log(np.maximum(scores, 1e-6)) | |
| coeffs = np.polyfit(timestamps, log_scores, 1) | |
| decay_rate = float(coeffs[0]) # Negative means decay | |
| except: | |
| decay_rate = 0.0 | |
| # Velocity (recent rate of change) | |
| if len(recent_scores) >= 2: | |
| recent_times = timestamps[midpoint:] | |
| velocity = (recent_scores[-1] - recent_scores[0]) / max(recent_times[-1] - recent_times[0], 1.0) | |
| else: | |
| velocity = 0.0 | |
| is_trending = momentum > 0.1 and decay_rate > -0.05 | |
| return { | |
| "trend_momentum": float(momentum), | |
| "decay_rate": float(decay_rate), | |
| "is_trending": is_trending, | |
| "velocity": float(velocity) | |
| } | |
| class AdaptiveAnomalyDetector: | |
| """ | |
| Enhanced anomaly detection with drift modeling and predictive capabilities. | |
| Uses exponential moving averages and rolling windows for adaptive thresholds. | |
| """ | |
| def __init__(self, alpha: float = 0.2, window_size: int = 1000): | |
| self.alpha = alpha # EMA learning rate | |
| self.window_size = window_size | |
| self.ema_stats = defaultdict(lambda: {"mean": 0.0, "var": 1.0, "n": 0}) | |
| self.rolling_windows = defaultdict(lambda: deque(maxlen=window_size)) | |
| self.drift_detector = defaultdict(lambda: {"last_mean": 0.0, "drift_count": 0}) | |
| self.lock = threading.Lock() | |
| def update_statistics( | |
| self, | |
| key: str, | |
| value: float, | |
| detect_drift: bool = True | |
| ): | |
| """Update EMA statistics with drift detection.""" | |
| with self.lock: | |
| stats = self.ema_stats[key] | |
| window = self.rolling_windows[key] | |
| # Update rolling window | |
| window.append(value) | |
| # Update EMA | |
| if stats["n"] == 0: | |
| stats["mean"] = value | |
| stats["var"] = 0.0 | |
| else: | |
| # Exponential moving average | |
| delta = value - stats["mean"] | |
| stats["mean"] += self.alpha * delta | |
| stats["var"] = (1 - self.alpha) * (stats["var"] + self.alpha * delta ** 2) | |
| stats["n"] += 1 | |
| # Drift detection | |
| if detect_drift and len(window) >= 100: | |
| self._detect_drift(key, window) | |
| def _detect_drift(self, key: str, window: deque): | |
| """Detect concept drift in distribution.""" | |
| drift = self.drift_detector[key] | |
| # Compare recent mean to older mean | |
| recent = list(window)[-50:] | |
| older = list(window)[:50] | |
| recent_mean = np.mean(recent) | |
| older_mean = np.mean(older) | |
| # Check for significant shift | |
| if abs(recent_mean - older_mean) > 2 * np.std(list(window)): | |
| drift["drift_count"] += 1 | |
| logger.warning(f"Drift detected for {key}: {older_mean:.3f} -> {recent_mean:.3f}") | |
| drift["last_mean"] = recent_mean | |
| def is_anomalous( | |
| self, | |
| key: str, | |
| value: float, | |
| n_sigma: float = 3.0 | |
| ) -> Tuple[bool, float]: | |
| """ | |
| Check if value is anomalous using adaptive thresholds. | |
| Returns: | |
| (is_anomaly, z_score) | |
| """ | |
| with self.lock: | |
| stats = self.ema_stats[key] | |
| if stats["n"] < 10: | |
| return False, 0.0 | |
| std = np.sqrt(max(stats["var"], 1e-6)) | |
| z_score = abs(value - stats["mean"]) / std | |
| is_anomaly = z_score > n_sigma | |
| return is_anomaly, float(z_score) | |
| def predict_failure_probability( | |
| self, | |
| record: AudioRecord | |
| ) -> Tuple[float, List[str]]: | |
| """ | |
| Predict probability of failure BEFORE posting. | |
| Returns: | |
| (failure_probability, risk_factors) | |
| """ | |
| risks = [] | |
| risk_scores = [] | |
| af = record.audio_features | |
| pm = record.platform_metrics | |
| # Check beat alignment risk | |
| beat_key = f"{record.niche}:beat_alignment" | |
| if beat_key in self.ema_stats: | |
| is_anom, z = self.is_anomalous(beat_key, af.beat_alignment.sync_quality_score) | |
| if is_anom and af.beat_alignment.sync_quality_score < self.ema_stats[beat_key]["mean"]: | |
| risks.append("poor_beat_alignment") | |
| risk_scores.append(min(z / 3.0, 1.0)) | |
| # Check energy risk | |
| energy_key = f"{record.niche}:energy" | |
| if energy_key in self.ema_stats: | |
| is_anom, z = self.is_anomalous(energy_key, af.energy_envelope.mean_energy_db) | |
| if is_anom and af.energy_envelope.mean_energy_db < self.ema_stats[energy_key]["mean"]: | |
| risks.append("low_energy") | |
| risk_scores.append(min(z / 3.0, 1.0)) | |
| # Check emotional intensity risk | |
| if af.word_features: | |
| early_emotion = np.mean([w.emotion_intensity for w in af.word_features[:5]]) | |
| emotion_key = f"{record.niche}:emotion" | |
| if emotion_key in self.ema_stats: | |
| is_anom, z = self.is_anomalous(emotion_key, early_emotion) | |
| if is_anom and early_emotion < self.ema_stats[emotion_key]["mean"]: | |
| risks.append("low_emotional_intensity") | |
| risk_scores.append(min(z / 3.0, 1.0)) | |
| # Aggregate risk | |
| if risk_scores: | |
| failure_prob = min(np.mean(risk_scores), 1.0) | |
| else: | |
| failure_prob = 0.1 # Baseline risk | |
| return failure_prob, risks | |
| def detect_anomalies(self, record: AudioRecord) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Detect anomalies using adaptive thresholds. | |
| Returns: | |
| (is_anomaly, reason) | |
| """ | |
| anomalies = [] | |
| # Check retention cliff | |
| ret_1s = record.platform_metrics.retention_1s | |
| ret_2s = record.platform_metrics.retention_2s | |
| if ret_1s > 0.8 and ret_2s < 0.3: | |
| anomalies.append("retention_cliff_1s_to_2s") | |
| # Check extreme negative signals | |
| if record.platform_metrics.mute_rate > 0.5: | |
| anomalies.append("high_mute_rate") | |
| if record.platform_metrics.scroll_away_velocity < 500: | |
| anomalies.append("instant_scroll_away") | |
| # Check beat alignment failure | |
| if record.audio_features.beat_alignment.on_beat_percentage < 0.3: | |
| anomalies.append("severe_beat_misalignment") | |
| # Check extreme pitch variance | |
| if record.audio_features.pitch_contour.pitch_variance > 5000: | |
| anomalies.append("extreme_pitch_variance") | |
| # Check silence detection | |
| if record.audio_features.energy_envelope.mean_energy_db < -60: | |
| anomalies.append("audio_too_quiet") | |
| # Adaptive statistical outlier detection | |
| key = f"{record.niche}:{record.platform}" | |
| is_eng_anom, z_eng = self.is_anomalous( | |
| f"{key}:engagement", | |
| record.platform_metrics.platform_engagement_score, | |
| n_sigma=3.5 | |
| ) | |
| if is_eng_anom and z_eng > 4.0: | |
| anomalies.append(f"statistical_outlier_z{z_eng:.1f}") | |
| # Check beat alignment adaptive | |
| is_beat_anom, z_beat = self.is_anomalous( | |
| f"{key}:beat", | |
| record.audio_features.beat_alignment.sync_quality_score | |
| ) | |
| if is_beat_anom and record.audio_features.beat_alignment.sync_quality_score < 0.5: | |
| anomalies.append(f"poor_beat_sync_z{z_beat:.1f}") | |
| if anomalies: | |
| return True, "; ".join(anomalies) | |
| return False, None | |
| class AudioPerformanceStore: | |
| """ | |
| Production-grade audio performance store for autonomous viral content system. | |
| CRITICAL PROPERTIES: | |
| - Append-only: No silent overwrites | |
| - Indexed retrieval: Optimized for RL queries | |
| - Event emission: Real-time orchestration integration | |
| - Anomaly detection: Automatic failure flagging | |
| - Scale: 20k-100k videos/day | |
| """ | |
| def __init__(self, db_path: str = "audio_performance_ground_truth.db"): | |
| """Initialize store with production-grade SQLite backend.""" | |
| verify_caller_authorization() | |
| self.db_path = Path(db_path) | |
| self.lock = threading.RLock() | |
| # Enhanced components | |
| self.embedding_engine = EmbeddingEngine() | |
| self.virality_predictor = ViralityPredictor() | |
| self.trend_tracker = TrendMomentumTracker() | |
| self.anomaly_detector = AdaptiveAnomalyDetector() | |
| # Event system | |
| self._event_listeners: Dict[EventType, List[Callable]] = defaultdict(list) | |
| # Performance tracking | |
| self._ingest_count = 0 | |
| self._ingest_errors = 0 | |
| self._last_stats_reset = time.time() | |
| # Threshold configuration | |
| self.thresholds = { | |
| "extreme_success_retention_2s": 0.85, | |
| "extreme_failure_retention_2s": 0.15, | |
| "extreme_success_engagement": 0.9, | |
| "extreme_failure_engagement": 0.1, | |
| } | |
| # Initialize database | |
| self._init_database() | |
| logger.info(f"AudioPerformanceStore initialized (schema v{SCHEMA_VERSION})") | |
| logger.info(f"Database: {self.db_path.absolute()}") | |
| logger.info(f"Append-only mode: ENABLED") | |
| logger.info(f"ML capabilities: Embeddings, Predictions, Trend Tracking") | |
| def _init_database(self): | |
| """Initialize production-grade database schema with indices.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Main records table (append-only) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS audio_records ( | |
| record_id TEXT PRIMARY KEY, | |
| video_id TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| audio_features_json TEXT NOT NULL, | |
| platform_metrics_json TEXT NOT NULL, | |
| niche TEXT NOT NULL, | |
| platform TEXT NOT NULL, | |
| beat_id TEXT NOT NULL, | |
| beat_version_lineage TEXT NOT NULL, | |
| voice_profile_hash TEXT NOT NULL, | |
| orchestration_job_id TEXT NOT NULL, | |
| language TEXT NOT NULL, | |
| trend_id TEXT, | |
| content_type TEXT NOT NULL, | |
| schema_version TEXT NOT NULL, | |
| ingestion_timestamp TEXT NOT NULL, | |
| is_anomaly INTEGER NOT NULL, | |
| anomaly_reason TEXT | |
| ) | |
| """) | |
| # Performance-critical indices | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_video_id ON audio_records(video_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_id ON audio_records(orchestration_job_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_platform_niche ON audio_records(platform, niche)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_beat_id ON audio_records(beat_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_voice_hash ON audio_records(voice_profile_hash)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_ingestion_time ON audio_records(ingestion_timestamp)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_anomaly ON audio_records(is_anomaly)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON audio_records(timestamp)") | |
| # Materialized view for fast winner/loser queries (ENHANCED) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS performance_summary ( | |
| record_id TEXT PRIMARY KEY, | |
| video_id TEXT NOT NULL, | |
| niche TEXT NOT NULL, | |
| platform TEXT NOT NULL, | |
| retention_1s REAL NOT NULL, | |
| retention_2s REAL NOT NULL, | |
| retention_3s REAL NOT NULL, | |
| completion_rate REAL NOT NULL, | |
| engagement_score REAL NOT NULL, | |
| mute_rate REAL NOT NULL, | |
| scroll_away_velocity REAL NOT NULL, | |
| beat_alignment_score REAL NOT NULL, | |
| is_winner INTEGER NOT NULL, | |
| is_loser INTEGER NOT NULL, | |
| audio_embedding BLOB, | |
| performance_embedding BLOB, | |
| predicted_views_24h REAL, | |
| predicted_completion_rate REAL, | |
| predicted_engagement_score REAL, | |
| predicted_retention_2s REAL, | |
| prediction_confidence REAL, | |
| trend_momentum REAL, | |
| decay_rate REAL, | |
| is_trending INTEGER, | |
| failure_risk_score REAL, | |
| ingestion_timestamp TEXT NOT NULL, | |
| FOREIGN KEY (record_id) REFERENCES audio_records(record_id) | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_winners ON performance_summary(is_winner, engagement_score DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_losers ON performance_summary(is_loser, retention_2s ASC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_engagement ON performance_summary(engagement_score DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_predicted_views ON performance_summary(predicted_views_24h DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_trending ON performance_summary(is_trending DESC, trend_momentum DESC)") | |
| # Pattern similarity graph | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS pattern_similarity ( | |
| record_id TEXT NOT NULL, | |
| similar_record_id TEXT NOT NULL, | |
| similarity_score REAL NOT NULL, | |
| similarity_type TEXT NOT NULL, | |
| computed_at TEXT NOT NULL, | |
| PRIMARY KEY (record_id, similar_record_id, similarity_type), | |
| FOREIGN KEY (record_id) REFERENCES audio_records(record_id), | |
| FOREIGN KEY (similar_record_id) REFERENCES audio_records(record_id) | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_similarity_score ON pattern_similarity(similarity_score DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_similarity_record ON pattern_similarity(record_id)") | |
| # Temporal performance trajectory | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS performance_trajectory ( | |
| trajectory_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| record_id TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| hours_since_post REAL NOT NULL, | |
| views INTEGER NOT NULL, | |
| engagement_score REAL NOT NULL, | |
| velocity REAL NOT NULL, | |
| FOREIGN KEY (record_id) REFERENCES audio_records(record_id) | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_trajectory_record ON performance_trajectory(record_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_trajectory_time ON performance_trajectory(hours_since_post)") | |
| # System metadata | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS system_metadata ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT NOT NULL, | |
| updated_at TEXT NOT NULL | |
| ) | |
| """) | |
| # Event log | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS event_log ( | |
| event_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| event_type TEXT NOT NULL, | |
| record_id TEXT, | |
| event_data TEXT NOT NULL, | |
| timestamp TEXT NOT NULL | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON event_log(event_type)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_time ON event_log(timestamp)") | |
| # Store schema version | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO system_metadata (key, value, updated_at) | |
| VALUES (?, ?, ?) | |
| """, ("schema_version", SCHEMA_VERSION, datetime.utcnow().isoformat())) | |
| conn.commit() | |
| @contextmanager | |
| def _get_connection(self): | |
| """Thread-safe database connection with WAL mode for concurrency.""" | |
| conn = sqlite3.connect( | |
| str(self.db_path), | |
| timeout=30.0, | |
| check_same_thread=False | |
| ) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| conn.execute("PRAGMA synchronous=NORMAL") | |
| conn.execute("PRAGMA cache_size=-64000") # 64MB cache | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| def store_audio_record(self, record: AudioRecord) -> bool: | |
| """ | |
| Store audio record with validation and anomaly detection. | |
| APPEND-ONLY: No overwrites. Duplicates are rejected. | |
| Args: | |
| record: Complete AudioRecord instance | |
| Returns: | |
| bool: True if stored successfully | |
| Raises: | |
| DataIntegrityError: If validation fails | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| try: | |
| # Validate record | |
| self._validate_record(record) | |
| # Compute embeddings | |
| audio_embedding = self.embedding_engine.compute_audio_embedding(record.audio_features) | |
| performance_embedding = self.embedding_engine.compute_performance_embedding(record.platform_metrics) | |
| # Get predictions | |
| predictions = self.virality_predictor.predict( | |
| record.niche, | |
| record.platform, | |
| audio_embedding | |
| ) | |
| # Calculate trend momentum | |
| pattern_key = f"{record.niche}:{record.beat_id}" | |
| self.trend_tracker.add_performance_point( | |
| pattern_key, | |
| datetime.fromisoformat(record.timestamp), | |
| record.platform_metrics.platform_engagement_score | |
| ) | |
| momentum_metrics = self.trend_tracker.calculate_momentum(pattern_key) | |
| # Adaptive anomaly detection | |
| is_anomaly, reason = self.anomaly_detector.detect_anomalies(record) | |
| failure_prob, risk_factors = self.anomaly_detector.predict_failure_probability(record) | |
| if risk_factors: | |
| reason = (reason + "; " if reason else "") + ", ".join(risk_factors) | |
| record.is_anomaly = is_anomaly | |
| record.anomaly_reason = reason | |
| # Serialize complex objects | |
| audio_json = json.dumps(asdict(record.audio_features)) | |
| metrics_json = json.dumps(asdict(record.platform_metrics)) | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Append-only: Check for duplicates | |
| cursor.execute( | |
| "SELECT record_id FROM audio_records WHERE record_id = ?", | |
| (record.record_id,) | |
| ) | |
| if cursor.fetchone(): | |
| raise DataIntegrityError( | |
| f"APPEND-ONLY VIOLATION: Record {record.record_id} already exists" | |
| ) | |
| # Insert main record | |
| cursor.execute(""" | |
| INSERT INTO audio_records ( | |
| record_id, video_id, timestamp, audio_features_json, | |
| platform_metrics_json, niche, platform, beat_id, | |
| beat_version_lineage, voice_profile_hash, orchestration_job_id, | |
| language, trend_id, content_type, schema_version, | |
| ingestion_timestamp, is_anomaly, anomaly_reason | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| record.record_id, record.video_id, record.timestamp, | |
| audio_json, metrics_json, record.niche, record.platform, | |
| record.beat_id, record.beat_version_lineage, | |
| record.voice_profile_hash, record.orchestration_job_id, | |
| record.language, record.trend_id, record.content_type, | |
| record.schema_version, record.ingestion_timestamp, | |
| 1 if record.is_anomaly else 0, record.anomaly_reason | |
| )) | |
| # Update enhanced performance summary | |
| self._update_performance_summary( | |
| cursor, record, audio_embedding, performance_embedding, | |
| predictions, momentum_metrics, failure_prob | |
| ) | |
| conn.commit() | |
| # Update adaptive statistics | |
| self.anomaly_detector.update_statistics( | |
| f"{record.niche}:engagement", | |
| record.platform_metrics.platform_engagement_score | |
| ) | |
| self.anomaly_detector.update_statistics( | |
| f"{record.niche}:beat_alignment", | |
| record.audio_features.beat_alignment.sync_quality_score | |
| ) | |
| self.anomaly_detector.update_statistics( | |
| f"{record.niche}:energy", | |
| record.audio_features.energy_envelope.mean_energy_db | |
| ) | |
| # Add to training data | |
| self.virality_predictor.add_training_sample( | |
| record.niche, | |
| record.platform, | |
| audio_embedding, | |
| { | |
| "views_24h": record.platform_metrics.shares * 100, # Proxy | |
| "completion_rate": record.platform_metrics.completion_rate, | |
| "engagement_score": record.platform_metrics.platform_engagement_score, | |
| "retention_2s": record.platform_metrics.retention_2s | |
| } | |
| ) | |
| # Update embedding engine | |
| self.embedding_engine.update_incremental(audio_embedding) | |
| # Compute and store similarities (async in production) | |
| self._compute_similarities(record.record_id, audio_embedding) | |
| # Track ingestion | |
| self._ingest_count += 1 | |
| self._check_ingestion_rate() | |
| # Emit events | |
| self._emit_event(EventType.RECORD_STORED, record) | |
| if is_anomaly: | |
| self._emit_event(EventType.ANOMALY_DETECTED, record) | |
| self._log_event(EventType.ANOMALY_DETECTED, record.record_id, { | |
| "reason": reason, | |
| "video_id": record.video_id | |
| }) | |
| # Check extreme thresholds | |
| self._check_extreme_performance(record) | |
| logger.debug(f"Stored record {record.record_id} (anomaly: {is_anomaly})") | |
| return True | |
| except Exception as e: | |
| self._ingest_errors += 1 | |
| logger.error(f"CRITICAL: Failed to store record: {e}", exc_info=True) | |
| return False | |
| def _validate_record(self, record: AudioRecord): | |
| """Validate record before storage. Raises DataIntegrityError if invalid.""" | |
| # Validate required fields | |
| if not record.video_id or not record.orchestration_job_id: | |
| raise DataIntegrityError("Missing required identifiers") | |
| # Validate retention values | |
| pm = record.platform_metrics | |
| if not (0 <= pm.retention_1s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_1s: {pm.retention_1s}") | |
| if not (0 <= pm.retention_2s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_2s: {pm.retention_2s}") | |
| if not (0 <= pm.retention_3s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_3s: {pm.retention_3s}") | |
| # Validate retention ordering | |
| if pm.retention_2s > pm.retention_1s + 0.01: # Small tolerance | |
| raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)") | |
| if pm.retention_3s > pm.retention_2s + 0.01: | |
| raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)") | |
| # Validate audio features | |
| af = record.audio_features | |
| if af.total_duration_ms <= 0: | |
| raise DataIntegrityError("Invalid duration") | |
| if len(af.syllable_features) == 0: | |
| raise DataIntegrityError("Missing syllable features") | |
| if len(af.word_features) == 0: | |
| raise DataIntegrityError("Missing word features") | |
| def _update_performance_summary( | |
| self, | |
| cursor, | |
| record: AudioRecord, | |
| audio_embedding: np.ndarray, | |
| performance_embedding: np.ndarray, | |
| predictions: Dict[str, float], | |
| momentum_metrics: Dict[str, float], | |
| failure_risk: float | |
| ): | |
| """Update enhanced materialized view with ML features.""" | |
| pm = record.platform_metrics | |
| ba = record.audio_features.beat_alignment | |
| # Determine winner/loser status | |
| is_winner = ( | |
| pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and | |
| pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"] | |
| ) | |
| is_loser = ( | |
| pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or | |
| pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"] | |
| ) | |
| # Serialize embeddings | |
| audio_emb_bytes = pickle.dumps(audio_embedding) | |
| perf_emb_bytes = pickle.dumps(performance_embedding) | |
| cursor.execute(""" | |
| INSERT INTO performance_summary ( | |
| record_id, video_id, niche, platform, retention_1s, retention_2s, | |
| retention_3s, completion_rate, engagement_score, mute_rate, | |
| scroll_away_velocity, beat_alignment_score, is_winner, is_loser, | |
| audio_embedding, performance_embedding, | |
| predicted_views_24h, predicted_completion_rate, predicted_engagement_score, | |
| predicted_retention_2s, prediction_confidence, | |
| trend_momentum, decay_rate, is_trending, failure_risk_score, | |
| ingestion_timestamp | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| record.record_id, record.video_id, record.niche, record.platform, | |
| pm.retention_1s, pm.retention_2s, pm.retention_3s, pm.completion_rate, | |
| pm.platform_engagement_score, pm.mute_rate, pm.scroll_away_velocity, | |
| ba.sync_quality_score, 1 if is_winner else 0, 1 if is_loser else 0, | |
| audio_emb_bytes, perf_emb_bytes, | |
| predictions.get("predicted_views_24h", 0.0), | |
| predictions.get("predicted_completion_rate", 0.0), | |
| predictions.get("predicted_engagement_score", 0.0), | |
| predictions.get("predicted_retention_2s", 0.0), | |
| predictions.get("confidence", 0.0), | |
| momentum_metrics["trend_momentum"], | |
| momentum_metrics["decay_rate"], | |
| 1 if momentum_metrics["is_trending"] else 0, | |
| failure_risk, | |
| record.ingestion_timestamp | |
| )) | |
| def _compute_similarities(self, record_id: str, embedding: np.ndarray): | |
| """Compute and store pattern similarities (k-nearest neighbors).""" | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Get recent embeddings for comparison | |
| cursor.execute(""" | |
| SELECT record_id, audio_embedding | |
| FROM performance_summary | |
| WHERE record_id != ? | |
| ORDER BY ingestion_timestamp DESC | |
| LIMIT 1000 | |
| """, (record_id,)) | |
| rows = cursor.fetchall() | |
| similarities = [] | |
| for row in rows: | |
| other_id = row[0] | |
| other_emb = pickle.loads(row[1]) | |
| # Cosine similarity | |
| cos_sim = 1.0 - cosine(embedding, other_emb) | |
| if cos_sim > 0.7: # Only store high similarities | |
| similarities.append((other_id, cos_sim)) | |
| # Store top-K similarities | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| for other_id, score in similarities[:20]: # Top 20 | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO pattern_similarity | |
| (record_id, similar_record_id, similarity_score, similarity_type, computed_at) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ( | |
| record_id, other_id, score, "audio_cosine", | |
| datetime.utcnow().isoformat() | |
| )) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"Failed to compute similarities: {e}") | |
| def _check_extreme_performance(self, record: AudioRecord): | |
| """Check for extreme success/failure and emit events.""" | |
| pm = record.platform_metrics | |
| if (pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and | |
| pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"]): | |
| self._emit_event(EventType.EXTREME_SUCCESS, record) | |
| self._log_event(EventType.EXTREME_SUCCESS, record.record_id, { | |
| "retention_2s": pm.retention_2s, | |
| "engagement": pm.platform_engagement_score, | |
| "niche": record.niche | |
| }) | |
| logger.info(f"EXTREME SUCCESS: {record.video_id} (ret={pm.retention_2s:.2f})") | |
| elif (pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or | |
| pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"]): | |
| self._emit_event(EventType.EXTREME_FAILURE, record) | |
| self._log_event(EventType.EXTREME_FAILURE, record.record_id, { | |
| "retention_2s": pm.retention_2s, | |
| "engagement": pm.platform_engagement_score, | |
| "mute_rate": pm.mute_rate, | |
| "niche": record.niche | |
| }) | |
| logger.warning(f"EXTREME FAILURE: {record.video_id} (ret={pm.retention_2s:.2f})") | |
| def get_winners_vs_losers( | |
| self, | |
| filters: Optional[Dict[str, Any]] = None, | |
| limit_per_group: int = 1000 | |
| ) -> Dict[str, List[AudioRecord]]: | |
| """ | |
| Get winners and losers for comparison (critical for RL). | |
| Returns: | |
| {"winners": [...], "losers": [...]} | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| winners = self._query_performance_group(filters, "winners", limit_per_group) | |
| losers = self._query_performance_group(filters, "losers", limit_per_group) | |
| return { | |
| "winners": winners, | |
| "losers": losers | |
| } | |
| def analyze_retention_killers( | |
| self, | |
| filters: Optional[Dict[str, Any]] = None, | |
| threshold_ms: int = 2000 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Identify what killed retention early (< threshold_ms). | |
| Returns list of analysis dicts with audio feature correlations. | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| # Get records with early retention failure | |
| query = """ | |
| SELECT ar.* FROM audio_records ar | |
| JOIN performance_summary ps ON ar.record_id = ps.record_id | |
| WHERE ps.retention_2s < 0.3 | |
| """ | |
| params = [] | |
| if filters: | |
| conditions = [] | |
| for key, value in filters.items(): | |
| if key in ["niche", "platform", "beat_id"]: | |
| conditions.append(f"ar.{key} = ?") | |
| params.append(value) | |
| if conditions: | |
| query += " AND " + " AND ".join(conditions) | |
| query += " ORDER BY ps.retention_2s ASC LIMIT 500" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| if not rows: | |
| return [] | |
| records = [self._row_to_record(row) for row in rows] | |
| # Analyze common patterns | |
| analyses = [] | |
| for record in records: | |
| af = record.audio_features | |
| pm = record.platform_metrics | |
| killers = [] | |
| # Check first 2 seconds of audio | |
| early_syllables = [s for s in af.syllable_features if s.start_time_ms < threshold_ms] | |
| if early_syllables: | |
| avg_energy = mean([s.energy_db for s in early_syllables]) | |
| if avg_energy < -50: | |
| killers.append("low_energy_start") | |
| avg_beat_error = mean([abs(s.beat_alignment_error_ms) for s in early_syllables]) | |
| if avg_beat_error > 150: | |
| killers.append("poor_beat_alignment_start") | |
| # Check pause density early | |
| early_pauses = [p for p in af.pause_metrics. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment