Created
December 30, 2025 17:57
-
-
Save bogged-broker/b1ff39bc6059a47b4643832bb598a0e1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| audio_performance_store.py | |
| CRITICAL: Single source of truth for all audio decisions in autonomous viral content system. | |
| Every downstream learning, punishment, and reward depends on this data. | |
| SCALE: 20k-100k videos/day, append-only with indexed retrieval | |
| INTEGRATION: Orchestration scheduler, enforcement layers, RL feedback loops | |
| FAILURE MODE: Incorrect data = catastrophic system failure | |
| SCHEMA VERSION: 2.0.0 | |
| """ | |
| import sqlite3 | |
| import threading | |
| import time | |
| import hashlib | |
| import json | |
| from collections import defaultdict | |
| from contextlib import contextmanager | |
| from dataclasses import dataclass, field, asdict | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any, Tuple, Callable | |
| from enum import Enum | |
| import numpy as np | |
| import logging | |
| from statistics import mean, stdev | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Schema version | |
| SCHEMA_VERSION = "2.0.0" | |
| # Access control | |
| AUTHORIZED_MODULES = { | |
| "audio_generation_controller", | |
| "audio_reinforcement_loop", | |
| "feedback_ingestor", | |
| "scheduler", | |
| "audio_pattern_learner" | |
| } | |
| class UnauthorizedAccessError(Exception): | |
| """Raised when unauthorized module attempts access.""" | |
| pass | |
| class DataIntegrityError(Exception): | |
| """Raised when data validation fails.""" | |
| pass | |
| class EventType(Enum): | |
| """Event types for orchestration integration.""" | |
| EXTREME_SUCCESS = "extreme_success" | |
| EXTREME_FAILURE = "extreme_failure" | |
| ANOMALY_DETECTED = "anomaly_detected" | |
| RECORD_STORED = "record_stored" | |
| THRESHOLD_CROSSED = "threshold_crossed" | |
| def verify_caller_authorization(): | |
| """Verify calling module is authorized. MANDATORY for all public methods.""" | |
| import inspect | |
| frame = inspect.currentframe() | |
| try: | |
| caller_frame = frame.f_back.f_back | |
| caller_module = inspect.getmodule(caller_frame) | |
| if caller_module: | |
| module_name = caller_module.__name__.split('.')[-1] | |
| if module_name not in AUTHORIZED_MODULES: | |
| raise UnauthorizedAccessError( | |
| f"SECURITY: Module '{module_name}' unauthorized. " | |
| f"Authorized: {AUTHORIZED_MODULES}" | |
| ) | |
| finally: | |
| del frame | |
| @dataclass | |
| class SyllableLevelTiming: | |
| """Time-segmented syllable-level audio features.""" | |
| syllable_index: int | |
| start_time_ms: float | |
| duration_ms: float | |
| pitch_hz: float | |
| energy_db: float | |
| beat_alignment_error_ms: float # Deviation from expected beat position | |
| phoneme_sequence: List[str] | |
| stress_level: float # 0.0 to 1.0 | |
| @dataclass | |
| class WordLevelFeatures: | |
| """Word-level emotion and delivery metrics.""" | |
| word_index: int | |
| word_text: str | |
| start_time_ms: float | |
| duration_ms: float | |
| emotion_intensity: float # 0.0 to 1.0 | |
| emotion_class: str # e.g., "excited", "calm", "urgent" | |
| emphasis_score: float # Relative emphasis vs surrounding words | |
| clarity_score: float # Pronunciation clarity | |
| @dataclass | |
| class PitchContour: | |
| """Time-series pitch data with statistical features.""" | |
| timestamps_ms: List[float] | |
| pitch_hz_values: List[float] | |
| pitch_variance: float | |
| pitch_range_semitones: float | |
| pitch_slope: float # Linear regression slope | |
| pitch_inflection_points: List[int] # Indices of significant changes | |
| @dataclass | |
| class EnergyEnvelope: | |
| """Time-series energy/amplitude data.""" | |
| timestamps_ms: List[float] | |
| energy_db_values: List[float] | |
| peak_energy_db: float | |
| mean_energy_db: float | |
| energy_variance: float | |
| dynamic_range_db: float | |
| attack_times_ms: List[float] # Time to reach peaks | |
| decay_times_ms: List[float] # Time from peaks to valleys | |
| @dataclass | |
| class PauseDensityMetrics: | |
| """Pause timing and distribution analysis.""" | |
| total_pause_count: int | |
| pause_durations_ms: List[float] | |
| pause_positions_ms: List[float] | |
| mean_pause_duration_ms: float | |
| pause_variance_ms: float | |
| inter_pause_intervals_ms: List[float] | |
| strategic_pause_count: int # Pauses before hooks/key phrases | |
| @dataclass | |
| class BeatAlignmentMetrics: | |
| """Multi-timestamp beat synchronization metrics.""" | |
| beat_timestamps_ms: List[float] # Expected beat positions | |
| syllable_timestamps_ms: List[float] # Actual syllable positions | |
| alignment_errors_ms: List[float] # Per-syllable deviations | |
| mean_error_ms: float | |
| max_error_ms: float | |
| on_beat_percentage: float # % within acceptable tolerance | |
| sync_quality_score: float # 0.0 to 1.0 | |
| @dataclass | |
| class SpectralFeatures: | |
| """Spectral analysis across frequency bands.""" | |
| timestamps_ms: List[float] | |
| low_band_energy: List[float] # 0-200 Hz | |
| mid_low_energy: List[float] # 200-500 Hz | |
| mid_energy: List[float] # 500-2000 Hz | |
| mid_high_energy: List[float] # 2000-4000 Hz | |
| high_energy: List[float] # 4000+ Hz | |
| spectral_centroid: List[float] | |
| spectral_rolloff: List[float] | |
| dominant_band_per_segment: List[str] | |
| @dataclass | |
| class SegmentedAudioFeatures: | |
| """Complete time-segmented audio feature set.""" | |
| # Syllable-level data | |
| syllable_features: List[SyllableLevelTiming] | |
| # Word-level data | |
| word_features: List[WordLevelFeatures] | |
| # Time-series data | |
| pitch_contour: PitchContour | |
| energy_envelope: EnergyEnvelope | |
| # Timing metrics | |
| pause_metrics: PauseDensityMetrics | |
| beat_alignment: BeatAlignmentMetrics | |
| # Spectral data | |
| spectral_features: SpectralFeatures | |
| # Overall metrics | |
| total_duration_ms: float | |
| words_per_minute: float | |
| syllables_per_second: float | |
| # Extraction metadata | |
| extraction_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| extractor_version: str = SCHEMA_VERSION | |
| @dataclass | |
| class PlatformMetrics: | |
| """Platform-normalized performance metrics with negative signals.""" | |
| # Retention breakdown | |
| retention_1s: float # Critical early hook | |
| retention_2s: float | |
| retention_3s: float | |
| completion_rate: float | |
| # Engagement signals | |
| rewatch_count: int | |
| rewatch_rate: float # Rewatches per view | |
| shares: int | |
| shares_per_impression: float | |
| saves: int | |
| # Negative signals (CRITICAL for failure detection) | |
| scroll_away_velocity: float # Avg ms before skip | |
| mute_rate: float # % who muted audio | |
| skip_rate: float # % who skipped before completion | |
| negative_feedback_count: int # "Not interested" etc. | |
| # Normalized scores | |
| platform_engagement_score: float # Platform-specific normalization | |
| virality_coefficient: float # Shares / (views * time_decay) | |
| # Collection metadata | |
| platform: str | |
| collection_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| metrics_version: str = SCHEMA_VERSION | |
| @dataclass | |
| class AudioRecord: | |
| """Complete append-only record for a single video's audio performance.""" | |
| # Primary identifiers | |
| record_id: str # Unique hash: video_id + timestamp | |
| video_id: str | |
| timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| # Audio features (immutable after creation) | |
| audio_features: SegmentedAudioFeatures | |
| # Platform performance (immutable after final collection) | |
| platform_metrics: PlatformMetrics | |
| # Mandatory tags for retrieval | |
| niche: str | |
| platform: str | |
| beat_id: str | |
| beat_version_lineage: str # e.g., "beat_v3 <- beat_v2 <- beat_v1" | |
| voice_profile_hash: str # SHA256 of voice config | |
| orchestration_job_id: str | |
| # Additional context | |
| language: str | |
| trend_id: Optional[str] = None | |
| content_type: str = "audio_overlay" # For future expansion | |
| # Metadata (append-only guarantees) | |
| schema_version: str = SCHEMA_VERSION | |
| ingestion_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) | |
| is_anomaly: bool = False | |
| anomaly_reason: Optional[str] = None | |
| def __post_init__(self): | |
| """Generate deterministic record_id.""" | |
| if not self.record_id: | |
| hash_input = f"{self.video_id}:{self.timestamp}:{self.orchestration_job_id}" | |
| self.record_id = hashlib.sha256(hash_input.encode()).hexdigest()[:16] | |
| class AnomalyDetector: | |
| """Real-time anomaly detection for audio performance data.""" | |
| def __init__(self): | |
| self.baseline_stats = defaultdict(lambda: {"mean": 0.0, "std": 1.0, "n": 0}) | |
| self.lock = threading.Lock() | |
| def detect_anomalies(self, record: AudioRecord) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Detect anomalies in audio features and performance. | |
| Returns: | |
| (is_anomaly, reason) | |
| """ | |
| anomalies = [] | |
| # Check retention cliff (major drop between 1s and 2s) | |
| ret_1s = record.platform_metrics.retention_1s | |
| ret_2s = record.platform_metrics.retention_2s | |
| if ret_1s > 0.8 and ret_2s < 0.3: | |
| anomalies.append("retention_cliff_1s_to_2s") | |
| # Check extreme negative signals | |
| if record.platform_metrics.mute_rate > 0.5: | |
| anomalies.append("high_mute_rate") | |
| if record.platform_metrics.scroll_away_velocity < 500: # < 500ms | |
| anomalies.append("instant_scroll_away") | |
| # Check beat alignment failure | |
| if record.audio_features.beat_alignment.on_beat_percentage < 0.3: | |
| anomalies.append("severe_beat_misalignment") | |
| # Check extreme pitch variance (potentially distorted audio) | |
| if record.audio_features.pitch_contour.pitch_variance > 5000: | |
| anomalies.append("extreme_pitch_variance") | |
| # Check silence detection (energy too low) | |
| if record.audio_features.energy_envelope.mean_energy_db < -60: | |
| anomalies.append("audio_too_quiet") | |
| # Statistical outlier detection | |
| with self.lock: | |
| key = f"{record.niche}:{record.platform}" | |
| stats = self.baseline_stats[key] | |
| if stats["n"] > 100: # Need baseline | |
| engagement = record.platform_metrics.platform_engagement_score | |
| z_score = abs((engagement - stats["mean"]) / max(stats["std"], 0.01)) | |
| if z_score > 4.0: # 4 standard deviations | |
| anomalies.append(f"statistical_outlier_z{z_score:.1f}") | |
| if anomalies: | |
| return True, "; ".join(anomalies) | |
| return False, None | |
| def update_baseline(self, record: AudioRecord): | |
| """Update baseline statistics for anomaly detection.""" | |
| with self.lock: | |
| key = f"{record.niche}:{record.platform}" | |
| stats = self.baseline_stats[key] | |
| engagement = record.platform_metrics.platform_engagement_score | |
| n = stats["n"] | |
| # Online mean and variance update | |
| if n == 0: | |
| stats["mean"] = engagement | |
| stats["std"] = 0.0 | |
| else: | |
| old_mean = stats["mean"] | |
| stats["mean"] = (old_mean * n + engagement) / (n + 1) | |
| if n > 1: | |
| old_var = stats["std"] ** 2 | |
| new_var = ((n - 1) * old_var + (engagement - old_mean) * (engagement - stats["mean"])) / n | |
| stats["std"] = max(np.sqrt(new_var), 0.01) | |
| stats["n"] = n + 1 | |
| class AudioPerformanceStore: | |
| """ | |
| Production-grade audio performance store for autonomous viral content system. | |
| CRITICAL PROPERTIES: | |
| - Append-only: No silent overwrites | |
| - Indexed retrieval: Optimized for RL queries | |
| - Event emission: Real-time orchestration integration | |
| - Anomaly detection: Automatic failure flagging | |
| - Scale: 20k-100k videos/day | |
| """ | |
| def __init__(self, db_path: str = "audio_performance_ground_truth.db"): | |
| """Initialize store with production-grade SQLite backend.""" | |
| verify_caller_authorization() | |
| self.db_path = Path(db_path) | |
| self.lock = threading.RLock() | |
| self.anomaly_detector = AnomalyDetector() | |
| # Event system | |
| self._event_listeners: Dict[EventType, List[Callable]] = defaultdict(list) | |
| # Performance tracking | |
| self._ingest_count = 0 | |
| self._ingest_errors = 0 | |
| self._last_stats_reset = time.time() | |
| # Threshold configuration | |
| self.thresholds = { | |
| "extreme_success_retention_2s": 0.85, | |
| "extreme_failure_retention_2s": 0.15, | |
| "extreme_success_engagement": 0.9, | |
| "extreme_failure_engagement": 0.1, | |
| } | |
| # Initialize database | |
| self._init_database() | |
| logger.info(f"AudioPerformanceStore initialized (schema v{SCHEMA_VERSION})") | |
| logger.info(f"Database: {self.db_path.absolute()}") | |
| logger.info(f"Append-only mode: ENABLED") | |
| def _init_database(self): | |
| """Initialize production-grade database schema with indices.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Main records table (append-only) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS audio_records ( | |
| record_id TEXT PRIMARY KEY, | |
| video_id TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| audio_features_json TEXT NOT NULL, | |
| platform_metrics_json TEXT NOT NULL, | |
| niche TEXT NOT NULL, | |
| platform TEXT NOT NULL, | |
| beat_id TEXT NOT NULL, | |
| beat_version_lineage TEXT NOT NULL, | |
| voice_profile_hash TEXT NOT NULL, | |
| orchestration_job_id TEXT NOT NULL, | |
| language TEXT NOT NULL, | |
| trend_id TEXT, | |
| content_type TEXT NOT NULL, | |
| schema_version TEXT NOT NULL, | |
| ingestion_timestamp TEXT NOT NULL, | |
| is_anomaly INTEGER NOT NULL, | |
| anomaly_reason TEXT | |
| ) | |
| """) | |
| # Performance-critical indices | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_video_id ON audio_records(video_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_id ON audio_records(orchestration_job_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_platform_niche ON audio_records(platform, niche)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_beat_id ON audio_records(beat_id)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_voice_hash ON audio_records(voice_profile_hash)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_ingestion_time ON audio_records(ingestion_timestamp)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_anomaly ON audio_records(is_anomaly)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON audio_records(timestamp)") | |
| # Materialized view for fast winner/loser queries | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS performance_summary ( | |
| record_id TEXT PRIMARY KEY, | |
| video_id TEXT NOT NULL, | |
| niche TEXT NOT NULL, | |
| platform TEXT NOT NULL, | |
| retention_1s REAL NOT NULL, | |
| retention_2s REAL NOT NULL, | |
| retention_3s REAL NOT NULL, | |
| completion_rate REAL NOT NULL, | |
| engagement_score REAL NOT NULL, | |
| mute_rate REAL NOT NULL, | |
| scroll_away_velocity REAL NOT NULL, | |
| beat_alignment_score REAL NOT NULL, | |
| is_winner INTEGER NOT NULL, | |
| is_loser INTEGER NOT NULL, | |
| ingestion_timestamp TEXT NOT NULL, | |
| FOREIGN KEY (record_id) REFERENCES audio_records(record_id) | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_winners ON performance_summary(is_winner, engagement_score DESC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_losers ON performance_summary(is_loser, retention_2s ASC)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_engagement ON performance_summary(engagement_score DESC)") | |
| # System metadata | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS system_metadata ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT NOT NULL, | |
| updated_at TEXT NOT NULL | |
| ) | |
| """) | |
| # Event log | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS event_log ( | |
| event_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| event_type TEXT NOT NULL, | |
| record_id TEXT, | |
| event_data TEXT NOT NULL, | |
| timestamp TEXT NOT NULL | |
| ) | |
| """) | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON event_log(event_type)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_time ON event_log(timestamp)") | |
| # Store schema version | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO system_metadata (key, value, updated_at) | |
| VALUES (?, ?, ?) | |
| """, ("schema_version", SCHEMA_VERSION, datetime.utcnow().isoformat())) | |
| conn.commit() | |
| @contextmanager | |
| def _get_connection(self): | |
| """Thread-safe database connection with WAL mode for concurrency.""" | |
| conn = sqlite3.connect( | |
| str(self.db_path), | |
| timeout=30.0, | |
| check_same_thread=False | |
| ) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| conn.execute("PRAGMA synchronous=NORMAL") | |
| conn.execute("PRAGMA cache_size=-64000") # 64MB cache | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| def store_audio_record(self, record: AudioRecord) -> bool: | |
| """ | |
| Store audio record with validation and anomaly detection. | |
| APPEND-ONLY: No overwrites. Duplicates are rejected. | |
| Args: | |
| record: Complete AudioRecord instance | |
| Returns: | |
| bool: True if stored successfully | |
| Raises: | |
| DataIntegrityError: If validation fails | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| try: | |
| # Validate record | |
| self._validate_record(record) | |
| # Detect anomalies | |
| is_anomaly, reason = self.anomaly_detector.detect_anomalies(record) | |
| record.is_anomaly = is_anomaly | |
| record.anomaly_reason = reason | |
| # Serialize complex objects | |
| audio_json = json.dumps(asdict(record.audio_features)) | |
| metrics_json = json.dumps(asdict(record.platform_metrics)) | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Append-only: Check for duplicates | |
| cursor.execute( | |
| "SELECT record_id FROM audio_records WHERE record_id = ?", | |
| (record.record_id,) | |
| ) | |
| if cursor.fetchone(): | |
| raise DataIntegrityError( | |
| f"APPEND-ONLY VIOLATION: Record {record.record_id} already exists" | |
| ) | |
| # Insert main record | |
| cursor.execute(""" | |
| INSERT INTO audio_records ( | |
| record_id, video_id, timestamp, audio_features_json, | |
| platform_metrics_json, niche, platform, beat_id, | |
| beat_version_lineage, voice_profile_hash, orchestration_job_id, | |
| language, trend_id, content_type, schema_version, | |
| ingestion_timestamp, is_anomaly, anomaly_reason | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| record.record_id, record.video_id, record.timestamp, | |
| audio_json, metrics_json, record.niche, record.platform, | |
| record.beat_id, record.beat_version_lineage, | |
| record.voice_profile_hash, record.orchestration_job_id, | |
| record.language, record.trend_id, record.content_type, | |
| record.schema_version, record.ingestion_timestamp, | |
| 1 if record.is_anomaly else 0, record.anomaly_reason | |
| )) | |
| # Update materialized view | |
| self._update_performance_summary(cursor, record) | |
| conn.commit() | |
| # Update anomaly baseline | |
| if not is_anomaly: | |
| self.anomaly_detector.update_baseline(record) | |
| # Track ingestion | |
| self._ingest_count += 1 | |
| self._check_ingestion_rate() | |
| # Emit events | |
| self._emit_event(EventType.RECORD_STORED, record) | |
| if is_anomaly: | |
| self._emit_event(EventType.ANOMALY_DETECTED, record) | |
| self._log_event(EventType.ANOMALY_DETECTED, record.record_id, { | |
| "reason": reason, | |
| "video_id": record.video_id | |
| }) | |
| # Check extreme thresholds | |
| self._check_extreme_performance(record) | |
| logger.debug(f"Stored record {record.record_id} (anomaly: {is_anomaly})") | |
| return True | |
| except Exception as e: | |
| self._ingest_errors += 1 | |
| logger.error(f"CRITICAL: Failed to store record: {e}", exc_info=True) | |
| return False | |
| def _validate_record(self, record: AudioRecord): | |
| """Validate record before storage. Raises DataIntegrityError if invalid.""" | |
| # Validate required fields | |
| if not record.video_id or not record.orchestration_job_id: | |
| raise DataIntegrityError("Missing required identifiers") | |
| # Validate retention values | |
| pm = record.platform_metrics | |
| if not (0 <= pm.retention_1s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_1s: {pm.retention_1s}") | |
| if not (0 <= pm.retention_2s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_2s: {pm.retention_2s}") | |
| if not (0 <= pm.retention_3s <= 1.0): | |
| raise DataIntegrityError(f"Invalid retention_3s: {pm.retention_3s}") | |
| # Validate retention ordering | |
| if pm.retention_2s > pm.retention_1s + 0.01: # Small tolerance | |
| raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)") | |
| if pm.retention_3s > pm.retention_2s + 0.01: | |
| raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)") | |
| # Validate audio features | |
| af = record.audio_features | |
| if af.total_duration_ms <= 0: | |
| raise DataIntegrityError("Invalid duration") | |
| if len(af.syllable_features) == 0: | |
| raise DataIntegrityError("Missing syllable features") | |
| if len(af.word_features) == 0: | |
| raise DataIntegrityError("Missing word features") | |
| def _update_performance_summary(self, cursor, record: AudioRecord): | |
| """Update materialized view for fast queries.""" | |
| pm = record.platform_metrics | |
| ba = record.audio_features.beat_alignment | |
| # Determine winner/loser status | |
| is_winner = ( | |
| pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and | |
| pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"] | |
| ) | |
| is_loser = ( | |
| pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or | |
| pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"] | |
| ) | |
| cursor.execute(""" | |
| INSERT INTO performance_summary ( | |
| record_id, video_id, niche, platform, retention_1s, retention_2s, | |
| retention_3s, completion_rate, engagement_score, mute_rate, | |
| scroll_away_velocity, beat_alignment_score, is_winner, is_loser, | |
| ingestion_timestamp | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| record.record_id, record.video_id, record.niche, record.platform, | |
| pm.retention_1s, pm.retention_2s, pm.retention_3s, pm.completion_rate, | |
| pm.platform_engagement_score, pm.mute_rate, pm.scroll_away_velocity, | |
| ba.sync_quality_score, 1 if is_winner else 0, 1 if is_loser else 0, | |
| record.ingestion_timestamp | |
| )) | |
| def _check_extreme_performance(self, record: AudioRecord): | |
| """Check for extreme success/failure and emit events.""" | |
| pm = record.platform_metrics | |
| if (pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and | |
| pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"]): | |
| self._emit_event(EventType.EXTREME_SUCCESS, record) | |
| self._log_event(EventType.EXTREME_SUCCESS, record.record_id, { | |
| "retention_2s": pm.retention_2s, | |
| "engagement": pm.platform_engagement_score, | |
| "niche": record.niche | |
| }) | |
| logger.info(f"EXTREME SUCCESS: {record.video_id} (ret={pm.retention_2s:.2f})") | |
| elif (pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or | |
| pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"]): | |
| self._emit_event(EventType.EXTREME_FAILURE, record) | |
| self._log_event(EventType.EXTREME_FAILURE, record.record_id, { | |
| "retention_2s": pm.retention_2s, | |
| "engagement": pm.platform_engagement_score, | |
| "mute_rate": pm.mute_rate, | |
| "niche": record.niche | |
| }) | |
| logger.warning(f"EXTREME FAILURE: {record.video_id} (ret={pm.retention_2s:.2f})") | |
| def get_winners_vs_losers( | |
| self, | |
| filters: Optional[Dict[str, Any]] = None, | |
| limit_per_group: int = 1000 | |
| ) -> Dict[str, List[AudioRecord]]: | |
| """ | |
| Get winners and losers for comparison (critical for RL). | |
| Returns: | |
| {"winners": [...], "losers": [...]} | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| winners = self._query_performance_group(filters, "winners", limit_per_group) | |
| losers = self._query_performance_group(filters, "losers", limit_per_group) | |
| return { | |
| "winners": winners, | |
| "losers": losers | |
| } | |
| def analyze_retention_killers( | |
| self, | |
| filters: Optional[Dict[str, Any]] = None, | |
| threshold_ms: int = 2000 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Identify what killed retention early (< threshold_ms). | |
| Returns list of analysis dicts with audio feature correlations. | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| # Get records with early retention failure | |
| query = """ | |
| SELECT ar.* FROM audio_records ar | |
| JOIN performance_summary ps ON ar.record_id = ps.record_id | |
| WHERE ps.retention_2s < 0.3 | |
| """ | |
| params = [] | |
| if filters: | |
| conditions = [] | |
| for key, value in filters.items(): | |
| if key in ["niche", "platform", "beat_id"]: | |
| conditions.append(f"ar.{key} = ?") | |
| params.append(value) | |
| if conditions: | |
| query += " AND " + " AND ".join(conditions) | |
| query += " ORDER BY ps.retention_2s ASC LIMIT 500" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| if not rows: | |
| return [] | |
| records = [self._row_to_record(row) for row in rows] | |
| # Analyze common patterns | |
| analyses = [] | |
| for record in records: | |
| af = record.audio_features | |
| pm = record.platform_metrics | |
| killers = [] | |
| # Check first 2 seconds of audio | |
| early_syllables = [s for s in af.syllable_features if s.start_time_ms < threshold_ms] | |
| if early_syllables: | |
| avg_energy = mean([s.energy_db for s in early_syllables]) | |
| if avg_energy < -50: | |
| killers.append("low_energy_start") | |
| avg_beat_error = mean([abs(s.beat_alignment_error_ms) for s in early_syllables]) | |
| if avg_beat_error > 150: | |
| killers.append("poor_beat_alignment_start") | |
| # Check pause density early | |
| early_pauses = [p for p in af.pause_metrics.<parameter name="content">pause_positions_ms if p < threshold_ms] | |
| if len(early_pauses) > 3: | |
| killers.append("excessive_early_pauses") | |
| # Check mute rate correlation | |
| if pm.mute_rate > 0.4: | |
| killers.append("high_mute_rate") | |
| # Check scroll-away velocity | |
| if pm.scroll_away_velocity < 1000: | |
| killers.append("instant_scroll_away") | |
| analyses.append({ | |
| "video_id": record.video_id, | |
| "retention_2s": pm.retention_2s, | |
| "killers": killers, | |
| "beat_alignment_score": af.beat_alignment.sync_quality_score, | |
| "early_energy_db": avg_energy if early_syllables else None, | |
| "mute_rate": pm.mute_rate, | |
| "scroll_away_ms": pm.scroll_away_velocity | |
| }) | |
| return analyses | |
| def sample_for_rl( | |
| self, | |
| batch_size: int = 32, | |
| filters: Optional[Dict[str, Any]] = None, | |
| strategy: str = "balanced" | |
| ) -> List[AudioRecord]: | |
| """ | |
| Sample records for RL training with configurable strategies. | |
| Args: | |
| batch_size: Number of records to sample | |
| filters: Optional filters (niche, platform, etc.) | |
| strategy: "balanced" (50/50 winners/losers), "recent", "diverse" | |
| Returns: | |
| List of AudioRecord instances | |
| """ | |
| verify_caller_authorization() | |
| with self.lock: | |
| if strategy == "balanced": | |
| # 50% winners, 50% losers | |
| half = batch_size // 2 | |
| winners = self._query_performance_group(filters, "winners", half) | |
| losers = self._query_performance_group(filters, "losers", batch_size - half) | |
| samples = winners + losers | |
| np.random.shuffle(samples) | |
| return samples[:batch_size] | |
| elif strategy == "recent": | |
| # Recent records with recency bias | |
| records = self._query_records(filters, order_by="ingestion_timestamp DESC", limit=batch_size * 3) | |
| if len(records) <= batch_size: | |
| return records | |
| # Apply recency weighting | |
| timestamps = [datetime.fromisoformat(r.ingestion_timestamp).timestamp() for r in records] | |
| max_ts = max(timestamps) | |
| min_ts = min(timestamps) | |
| if max_ts > min_ts: | |
| weights = [(ts - min_ts) / (max_ts - min_ts) + 0.1 for ts in timestamps] | |
| weights = np.array(weights) | |
| weights = weights / weights.sum() | |
| indices = np.random.choice(len(records), size=batch_size, replace=False, p=weights) | |
| return [records[i] for i in indices] | |
| else: | |
| return np.random.choice(records, size=batch_size, replace=False).tolist() | |
| elif strategy == "diverse": | |
| # Maximize diversity across niches and voice profiles | |
| records = self._query_records(filters, limit=batch_size * 5) | |
| if len(records) <= batch_size: | |
| return records | |
| # Cluster by niche and voice_profile_hash | |
| clusters = defaultdict(list) | |
| for record in records: | |
| key = f"{record.niche}:{record.voice_profile_hash}" | |
| clusters[key].append(record) | |
| # Sample proportionally from each cluster | |
| samples = [] | |
| cluster_list = list(clusters.values()) | |
| samples_per_cluster = max(1, batch_size // len(cluster_list)) | |
| for cluster in cluster_list: | |
| n = min(samples_per_cluster, len(cluster)) | |
| samples.extend(np.random.choice(cluster, size=n, replace=False).tolist()) | |
| if len(samples) > batch_size: | |
| samples = np.random.choice(samples, size=batch_size, replace=False).tolist() | |
| return samples | |
| else: | |
| raise ValueError(f"Unknown sampling strategy: {strategy}") | |
| def get_records_by_job(self, orchestration_job_id: str) -> List[AudioRecord]: | |
| """Get all records for a specific orchestration job.""" | |
| verify_caller_authorization() | |
| with self.lock: | |
| return self._query_records({"orchestration_job_id": orchestration_job_id}) | |
| def get_records_by_beat(self, beat_id: str) -> List[AudioRecord]: | |
| """Get all records using a specific beat.""" | |
| verify_caller_authorization() | |
| with self.lock: | |
| return self._query_records({"beat_id": beat_id}) | |
| def get_recent_records(self, n: int = 1000) -> List[AudioRecord]: | |
| """Get N most recent records.""" | |
| verify_caller_authorization() | |
| with self.lock: | |
| return self._query_records(None, order_by="ingestion_timestamp DESC", limit=n) | |
| def _query_performance_group( | |
| self, | |
| filters: Optional[Dict[str, Any]], | |
| group: str, | |
| limit: int | |
| ) -> List[AudioRecord]: | |
| """Query winners or losers from materialized view.""" | |
| query = """ | |
| SELECT ar.* FROM audio_records ar | |
| JOIN performance_summary ps ON ar.record_id = ps.record_id | |
| WHERE ps.is_{} = 1 | |
| """.format(group[:-1]) # "winners" -> "winner" | |
| params = [] | |
| if filters: | |
| conditions = [] | |
| for key, value in filters.items(): | |
| if key in ["niche", "platform"]: | |
| conditions.append(f"ps.{key} = ?") | |
| params.append(value) | |
| elif key in ["beat_id", "voice_profile_hash"]: | |
| conditions.append(f"ar.{key} = ?") | |
| params.append(value) | |
| if conditions: | |
| query += " AND " + " AND ".join(conditions) | |
| if group == "winners": | |
| query += " ORDER BY ps.engagement_score DESC" | |
| else: | |
| query += " ORDER BY ps.retention_2s ASC" | |
| query += f" LIMIT {limit}" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| return [self._row_to_record(row) for row in rows] | |
| def _query_records( | |
| self, | |
| filters: Optional[Dict[str, Any]] = None, | |
| order_by: str = "ingestion_timestamp DESC", | |
| limit: Optional[int] = None | |
| ) -> List[AudioRecord]: | |
| """General record query with filters.""" | |
| query = "SELECT * FROM audio_records" | |
| params = [] | |
| if filters: | |
| conditions = [] | |
| for key, value in filters.items(): | |
| if key in ["niche", "platform", "beat_id", "voice_profile_hash", | |
| "orchestration_job_id", "language", "trend_id"]: | |
| conditions.append(f"{key} = ?") | |
| params.append(value) | |
| elif key == "is_anomaly": | |
| conditions.append("is_anomaly = ?") | |
| params.append(1 if value else 0) | |
| if conditions: | |
| query += " WHERE " + " AND ".join(conditions) | |
| query += f" ORDER BY {order_by}" | |
| if limit: | |
| query += f" LIMIT {limit}" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| return [self._row_to_record(row) for row in rows] | |
| def _row_to_record(self, row: sqlite3.Row) -> AudioRecord: | |
| """Convert database row to AudioRecord.""" | |
| audio_dict = json.loads(row["audio_features_json"]) | |
| metrics_dict = json.loads(row["platform_metrics_json"]) | |
| # Reconstruct nested dataclasses | |
| audio_features = self._dict_to_audio_features(audio_dict) | |
| platform_metrics = PlatformMetrics(**metrics_dict) | |
| return AudioRecord( | |
| record_id=row["record_id"], | |
| video_id=row["video_id"], | |
| timestamp=row["timestamp"], | |
| audio_features=audio_features, | |
| platform_metrics=platform_metrics, | |
| niche=row["niche"], | |
| platform=row["platform"], | |
| beat_id=row["beat_id"], | |
| beat_version_lineage=row["beat_version_lineage"], | |
| voice_profile_hash=row["voice_profile_hash"], | |
| orchestration_job_id=row["orchestration_job_id"], | |
| language=row["language"], | |
| trend_id=row["trend_id"], | |
| content_type=row["content_type"], | |
| schema_version=row["schema_version"], | |
| ingestion_timestamp=row["ingestion_timestamp"], | |
| is_anomaly=bool(row["is_anomaly"]), | |
| anomaly_reason=row["anomaly_reason"] | |
| ) | |
| def _dict_to_audio_features(self, data: Dict) -> SegmentedAudioFeatures: | |
| """Reconstruct SegmentedAudioFeatures from dict.""" | |
| syllables = [SyllableLevelTiming(**s) for s in data["syllable_features"]] | |
| words = [WordLevelFeatures(**w) for w in data["word_features"]] | |
| pitch = PitchContour(**data["pitch_contour"]) | |
| energy = EnergyEnvelope(**data["energy_envelope"]) | |
| pauses = PauseDensityMetrics(**data["pause_metrics"]) | |
| beat = BeatAlignmentMetrics(**data["beat_alignment"]) | |
| spectral = SpectralFeatures(**data["spectral_features"]) | |
| return SegmentedAudioFeatures( | |
| syllable_features=syllables, | |
| word_features=words, | |
| pitch_contour=pitch, | |
| energy_envelope=energy, | |
| pause_metrics=pauses, | |
| beat_alignment=beat, | |
| spectral_features=spectral, | |
| total_duration_ms=data["total_duration_ms"], | |
| words_per_minute=data["words_per_minute"], | |
| syllables_per_second=data["syllables_per_second"], | |
| extraction_timestamp=data["extraction_timestamp"], | |
| extractor_version=data["extractor_version"] | |
| ) | |
| def register_event_listener(self, event_type: EventType, callback: Callable): | |
| """Register callback for specific event type.""" | |
| verify_caller_authorization() | |
| with self.lock: | |
| self._event_listeners[event_type].append(callback) | |
| logger.info(f"Registered listener for {event_type.value}") | |
| def _emit_event(self, event_type: EventType, data: Any): | |
| """Emit event to registered listeners.""" | |
| for callback in self._event_listeners[event_type]: | |
| try: | |
| callback(event_type, data) | |
| except Exception as e: | |
| logger.error(f"Event listener error for {event_type.value}: {e}") | |
| def _log_event(self, event_type: EventType, record_id: str, event_data: Dict): | |
| """Log event to database for audit trail.""" | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO event_log (event_type, record_id, event_data, timestamp) | |
| VALUES (?, ?, ?, ?) | |
| """, ( | |
| event_type.value, | |
| record_id, | |
| json.dumps(event_data), | |
| datetime.utcnow().isoformat() | |
| )) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"Failed to log event: {e}") | |
| def _check_ingestion_rate(self): | |
| """Monitor ingestion rate and emit warnings.""" | |
| elapsed = time.time() - self._last_stats_reset | |
| if elapsed >= 3600: # Check hourly | |
| hourly_rate = self._ingest_count | |
| daily_projection = hourly_rate * 24 | |
| error_rate = self._ingest_errors / max(self._ingest_count + self._ingest_errors, 1) | |
| logger.info( | |
| f"Ingestion stats: {hourly_rate}/hour " | |
| f"(projected: {daily_projection}/day, errors: {error_rate:.2%})" | |
| ) | |
| if daily_projection < 20000: | |
| logger.warning(f"Below minimum ingestion rate: {daily_projection}/day") | |
| if error_rate > 0.01: # 1% error rate | |
| logger.error(f"High error rate: {error_rate:.2%}") | |
| self._ingest_count = 0 | |
| self._ingest_errors = 0 | |
| self._last_stats_reset = time.time() | |
| def get_system_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive system statistics.""" | |
| verify_caller_authorization() | |
| with self.lock: | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Total records | |
| cursor.execute("SELECT COUNT(*) FROM audio_records") | |
| total_records = cursor.fetchone()[0] | |
| # Anomaly count | |
| cursor.execute("SELECT COUNT(*) FROM audio_records WHERE is_anomaly = 1") | |
| anomaly_count = cursor.fetchone()[0] | |
| # Winner/loser counts | |
| cursor.execute("SELECT COUNT(*) FROM performance_summary WHERE is_winner = 1") | |
| winner_count = cursor.fetchone()[0] | |
| cursor.execute("SELECT COUNT(*) FROM performance_summary WHERE is_loser = 1") | |
| loser_count = cursor.fetchone()[0] | |
| # By platform | |
| cursor.execute(""" | |
| SELECT platform, COUNT(*) as count | |
| FROM audio_records | |
| GROUP BY platform | |
| """) | |
| by_platform = dict(cursor.fetchall()) | |
| # By niche | |
| cursor.execute(""" | |
| SELECT niche, COUNT(*) as count | |
| FROM audio_records | |
| GROUP BY niche | |
| """) | |
| by_niche = dict(cursor.fetchall()) | |
| # Event counts | |
| cursor.execute(""" | |
| SELECT event_type, COUNT(*) as count | |
| FROM event_log | |
| GROUP BY event_type | |
| """) | |
| event_counts = dict(cursor.fetchall()) | |
| return { | |
| "schema_version": SCHEMA_VERSION, | |
| "total_records": total_records, | |
| "anomaly_count": anomaly_count, | |
| "anomaly_rate": anomaly_count / max(total_records, 1), | |
| "winner_count": winner_count, | |
| "loser_count": loser_count, | |
| "by_platform": by_platform, | |
| "by_niche": by_niche, | |
| "event_counts": event_counts, | |
| "db_size_bytes": self.db_path.stat().st_size if self.db_path.exists() else 0, | |
| "current_ingest_rate": self._ingest_count, | |
| "current_error_rate": self._ingest_errors / max(self._ingest_count + self._ingest_errors, 1) | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get system stats: {e}") | |
| return {} | |
| # Singleton instance | |
| _store_instance: Optional[AudioPerformanceStore] = None | |
| _store_lock = threading.Lock() | |
| def get_store(db_path: str = "audio_performance_ground_truth.db") -> AudioPerformanceStore: | |
| """Get or create singleton store instance.""" | |
| verify_caller_authorization() | |
| global _store_instance | |
| if _store_instance is None: | |
| with _store_lock: | |
| if _store_instance is None: | |
| _store_instance = AudioPerformanceStore(db_path) | |
| return _store_instance | |
| if __name__ == "__main__": | |
| print(f"AudioPerformanceStore v{SCHEMA_VERSION}") | |
| print("CRITICAL SYSTEM COMPONENT - Ground Truth Ledger") | |
| print(f"Authorized modules: {AUTHORIZED_MODULES}") | |
| print("\nThis module is the single source of truth for all audio decisions.") | |
| print("Incorrect data = catastrophic system failure.") | |
| </parameter> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment