Skip to content

Instantly share code, notes, and snippets.

@bogged-broker
Last active December 31, 2025 00:11
Show Gist options
  • Select an option

  • Save bogged-broker/ff2ff7a67bb97202c4d8f1069e4b4176 to your computer and use it in GitHub Desktop.

Select an option

Save bogged-broker/ff2ff7a67bb97202c4d8f1069e4b4176 to your computer and use it in GitHub Desktop.
"""
audio_performance_store.py
CRITICAL: Single source of truth for all audio decisions in autonomous viral content system.
Every downstream learning, punishment, and reward depends on this data.
SCALE: 20k-100k videos/day, append-only with indexed retrieval
INTEGRATION: Orchestration scheduler, enforcement layers, RL feedback loops
FAILURE MODE: Incorrect data = catastrophic system failure
SCHEMA VERSION: 2.0.0
"""
import sqlite3
import threading
import time
import hashlib
import json
import pickle
from collections import defaultdict, deque
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Callable
from enum import Enum
import numpy as np
import logging
from statistics import mean, stdev
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from scipy.spatial.distance import cosine, euclidean
from scipy.stats import pearsonr
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Schema version
SCHEMA_VERSION = "2.0.0"
# Access control
AUTHORIZED_MODULES = {
"audio_generation_controller",
"audio_reinforcement_loop",
"feedback_ingestor",
"scheduler",
"audio_pattern_learner"
}
class UnauthorizedAccessError(Exception):
"""Raised when unauthorized module attempts access."""
pass
class DataIntegrityError(Exception):
"""Raised when data validation fails."""
pass
class EventType(Enum):
"""Event types for orchestration integration."""
EXTREME_SUCCESS = "extreme_success"
EXTREME_FAILURE = "extreme_failure"
ANOMALY_DETECTED = "anomaly_detected"
RECORD_STORED = "record_stored"
THRESHOLD_CROSSED = "threshold_crossed"
def verify_caller_authorization():
"""Verify calling module is authorized. MANDATORY for all public methods."""
import inspect
frame = inspect.currentframe()
try:
caller_frame = frame.f_back.f_back
caller_module = inspect.getmodule(caller_frame)
if caller_module:
module_name = caller_module.__name__.split('.')[-1]
if module_name not in AUTHORIZED_MODULES:
raise UnauthorizedAccessError(
f"SECURITY: Module '{module_name}' unauthorized. "
f"Authorized: {AUTHORIZED_MODULES}"
)
finally:
del frame
@dataclass
class SyllableLevelTiming:
"""Time-segmented syllable-level audio features."""
syllable_index: int
start_time_ms: float
duration_ms: float
pitch_hz: float
energy_db: float
beat_alignment_error_ms: float # Deviation from expected beat position
phoneme_sequence: List[str]
stress_level: float # 0.0 to 1.0
@dataclass
class WordLevelFeatures:
"""Word-level emotion and delivery metrics."""
word_index: int
word_text: str
start_time_ms: float
duration_ms: float
emotion_intensity: float # 0.0 to 1.0
emotion_class: str # e.g., "excited", "calm", "urgent"
emphasis_score: float # Relative emphasis vs surrounding words
clarity_score: float # Pronunciation clarity
@dataclass
class PitchContour:
"""Time-series pitch data with statistical features."""
timestamps_ms: List[float]
pitch_hz_values: List[float]
pitch_variance: float
pitch_range_semitones: float
pitch_slope: float # Linear regression slope
pitch_inflection_points: List[int] # Indices of significant changes
@dataclass
class EnergyEnvelope:
"""Time-series energy/amplitude data."""
timestamps_ms: List[float]
energy_db_values: List[float]
peak_energy_db: float
mean_energy_db: float
energy_variance: float
dynamic_range_db: float
attack_times_ms: List[float] # Time to reach peaks
decay_times_ms: List[float] # Time from peaks to valleys
@dataclass
class PauseDensityMetrics:
"""Pause timing and distribution analysis."""
total_pause_count: int
pause_durations_ms: List[float]
pause_positions_ms: List[float]
mean_pause_duration_ms: float
pause_variance_ms: float
inter_pause_intervals_ms: List[float]
strategic_pause_count: int # Pauses before hooks/key phrases
@dataclass
class BeatAlignmentMetrics:
"""Multi-timestamp beat synchronization metrics."""
beat_timestamps_ms: List[float] # Expected beat positions
syllable_timestamps_ms: List[float] # Actual syllable positions
alignment_errors_ms: List[float] # Per-syllable deviations
mean_error_ms: float
max_error_ms: float
on_beat_percentage: float # % within acceptable tolerance
sync_quality_score: float # 0.0 to 1.0
@dataclass
class SpectralFeatures:
"""Spectral analysis across frequency bands."""
timestamps_ms: List[float]
low_band_energy: List[float] # 0-200 Hz
mid_low_energy: List[float] # 200-500 Hz
mid_energy: List[float] # 500-2000 Hz
mid_high_energy: List[float] # 2000-4000 Hz
high_energy: List[float] # 4000+ Hz
spectral_centroid: List[float]
spectral_rolloff: List[float]
dominant_band_per_segment: List[str]
@dataclass
class SegmentedAudioFeatures:
"""Complete time-segmented audio feature set."""
# Syllable-level data
syllable_features: List[SyllableLevelTiming]
# Word-level data
word_features: List[WordLevelFeatures]
# Time-series data
pitch_contour: PitchContour
energy_envelope: EnergyEnvelope
# Timing metrics
pause_metrics: PauseDensityMetrics
beat_alignment: BeatAlignmentMetrics
# Spectral data
spectral_features: SpectralFeatures
# Overall metrics
total_duration_ms: float
words_per_minute: float
syllables_per_second: float
# Extraction metadata
extraction_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
extractor_version: str = SCHEMA_VERSION
@dataclass
class PlatformMetrics:
"""Platform-normalized performance metrics with negative signals."""
# Retention breakdown
retention_1s: float # Critical early hook
retention_2s: float
retention_3s: float
completion_rate: float
# Engagement signals
rewatch_count: int
rewatch_rate: float # Rewatches per view
shares: int
shares_per_impression: float
saves: int
# Negative signals (CRITICAL for failure detection)
scroll_away_velocity: float # Avg ms before skip
mute_rate: float # % who muted audio
skip_rate: float # % who skipped before completion
negative_feedback_count: int # "Not interested" etc.
# Normalized scores
platform_engagement_score: float # Platform-specific normalization
virality_coefficient: float # Shares / (views * time_decay)
# Collection metadata
platform: str
collection_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
metrics_version: str = SCHEMA_VERSION
@dataclass
class AudioRecord:
"""Complete append-only record for a single video's audio performance."""
# Primary identifiers
record_id: str # Unique hash: video_id + timestamp
video_id: str
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
# Audio features (immutable after creation)
audio_features: SegmentedAudioFeatures
# Platform performance (immutable after final collection)
platform_metrics: PlatformMetrics
# Mandatory tags for retrieval
niche: str
platform: str
beat_id: str
beat_version_lineage: str # e.g., "beat_v3 <- beat_v2 <- beat_v1"
voice_profile_hash: str # SHA256 of voice config
orchestration_job_id: str
# Additional context
language: str
trend_id: Optional[str] = None
content_type: str = "audio_overlay" # For future expansion
# Metadata (append-only guarantees)
schema_version: str = SCHEMA_VERSION
ingestion_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
is_anomaly: bool = False
anomaly_reason: Optional[str] = None
def __post_init__(self):
"""Generate deterministic record_id."""
if not self.record_id:
hash_input = f"{self.video_id}:{self.timestamp}:{self.orchestration_job_id}"
self.record_id = hashlib.sha256(hash_input.encode()).hexdigest()[:16]
class EmbeddingEngine:
"""
Computes and manages embeddings for audio patterns.
Enables fast similarity search and ML-based pattern matching.
"""
def __init__(self):
self.embedding_dim = 128
self.pca = None
self.scaler = StandardScaler()
self.lock = threading.Lock()
self.feature_cache = deque(maxlen=10000) # Cache for incremental PCA
def compute_audio_embedding(self, audio_features: SegmentedAudioFeatures) -> np.ndarray:
"""
Compute 128-dimensional embedding from audio features.
Combines MFCC-like, phase, timing, and spectral features.
"""
features = []
# 1. Syllable timing features (statistical summaries)
if audio_features.syllable_features:
syl_durations = [s.duration_ms for s in audio_features.syllable_features[:20]]
syl_energies = [s.energy_db for s in audio_features.syllable_features[:20]]
syl_pitches = [s.pitch_hz for s in audio_features.syllable_features[:20]]
syl_beat_errors = [abs(s.beat_alignment_error_ms) for s in audio_features.syllable_features[:20]]
features.extend([
np.mean(syl_durations), np.std(syl_durations), np.max(syl_durations),
np.mean(syl_energies), np.std(syl_energies), np.max(syl_energies),
np.mean(syl_pitches), np.std(syl_pitches),
np.mean(syl_beat_errors), np.max(syl_beat_errors)
])
else:
features.extend([0.0] * 10)
# 2. Word-level emotion features
if audio_features.word_features:
emotions = [w.emotion_intensity for w in audio_features.word_features[:15]]
emphasis = [w.emphasis_score for w in audio_features.word_features[:15]]
features.extend([
np.mean(emotions), np.std(emotions), np.max(emotions),
np.mean(emphasis), np.std(emphasis)
])
else:
features.extend([0.0] * 5)
# 3. Pitch contour features
pc = audio_features.pitch_contour
features.extend([
pc.pitch_variance,
pc.pitch_range_semitones,
pc.pitch_slope,
len(pc.pitch_inflection_points) / max(len(pc.timestamps_ms), 1)
])
# 4. Energy envelope features
ee = audio_features.energy_envelope
features.extend([
ee.peak_energy_db,
ee.mean_energy_db,
ee.energy_variance,
ee.dynamic_range_db,
np.mean(ee.attack_times_ms) if ee.attack_times_ms else 0.0,
np.mean(ee.decay_times_ms) if ee.decay_times_ms else 0.0
])
# 5. Pause metrics
pm = audio_features.pause_metrics
features.extend([
pm.total_pause_count,
pm.mean_pause_duration_ms,
pm.pause_variance_ms,
pm.strategic_pause_count
])
# 6. Beat alignment
ba = audio_features.beat_alignment
features.extend([
ba.mean_error_ms,
ba.max_error_ms,
ba.on_beat_percentage,
ba.sync_quality_score
])
# 7. Spectral features (sample from time-series)
sf = audio_features.spectral_features
if sf.low_band_energy:
features.extend([
np.mean(sf.low_band_energy[:10]),
np.mean(sf.mid_energy[:10]),
np.mean(sf.high_energy[:10]),
np.mean(sf.spectral_centroid[:10]) if sf.spectral_centroid else 0.0
])
else:
features.extend([0.0] * 4)
# 8. Overall metrics
features.extend([
audio_features.total_duration_ms,
audio_features.words_per_minute,
audio_features.syllables_per_second
])
# Pad or truncate to consistent dimension before PCA
feature_array = np.array(features, dtype=np.float32)
# Handle NaN/inf
feature_array = np.nan_to_num(feature_array, nan=0.0, posinf=1e6, neginf=-1e6)
# If we have enough data, apply PCA
if self.pca is not None:
with self.lock:
feature_array = self.scaler.transform(feature_array.reshape(1, -1))
embedding = self.pca.transform(feature_array)[0]
else:
# Not yet trained, return raw features (will be reduced later)
embedding = feature_array
# Ensure output is exactly 128 dimensions
if len(embedding) < self.embedding_dim:
embedding = np.pad(embedding, (0, self.embedding_dim - len(embedding)))
elif len(embedding) > self.embedding_dim:
embedding = embedding[:self.embedding_dim]
return embedding
def compute_performance_embedding(self, platform_metrics: PlatformMetrics) -> np.ndarray:
"""Compute performance-focused embedding."""
features = [
platform_metrics.retention_1s,
platform_metrics.retention_2s,
platform_metrics.retention_3s,
platform_metrics.completion_rate,
platform_metrics.rewatch_rate,
platform_metrics.shares_per_impression,
1.0 - platform_metrics.skip_rate, # scroll_stop_probability
1.0 - platform_metrics.mute_rate, # audio_kept_on
platform_metrics.platform_engagement_score,
platform_metrics.virality_coefficient,
np.log1p(platform_metrics.rewatch_count),
np.log1p(platform_metrics.shares)
]
return np.array(features, dtype=np.float32)
def train_pca(self, feature_matrix: np.ndarray):
"""Train PCA for dimensionality reduction."""
with self.lock:
self.scaler.fit(feature_matrix)
normalized = self.scaler.transform(feature_matrix)
n_components = min(self.embedding_dim, feature_matrix.shape[1], feature_matrix.shape[0])
self.pca = PCA(n_components=n_components)
self.pca.fit(normalized)
logger.info(f"PCA trained: {n_components} components, "
f"explained variance: {self.pca.explained_variance_ratio_.sum():.2%}")
def update_incremental(self, features: np.ndarray):
"""Add features to cache for incremental PCA updates."""
with self.lock:
self.feature_cache.append(features)
# Retrain PCA every 1000 samples
if len(self.feature_cache) >= 1000:
matrix = np.vstack(list(self.feature_cache))
self.train_pca(matrix)
self.feature_cache.clear()
class ViralityPredictor:
"""
Predicts virality metrics before posting using historical patterns.
Enables pre-posting quality gates and RL reward estimation.
"""
def __init__(self):
self.models = {} # Will store trained models per niche/platform
self.lock = threading.Lock()
self.training_data = defaultdict(lambda: {"X": [], "y": {}})
def add_training_sample(
self,
niche: str,
platform: str,
audio_embedding: np.ndarray,
actual_metrics: Dict[str, float]
):
"""Add sample to training data."""
key = f"{niche}:{platform}"
with self.lock:
self.training_data[key]["X"].append(audio_embedding)
for metric, value in actual_metrics.items():
if metric not in self.training_data[key]["y"]:
self.training_data[key]["y"][metric] = []
self.training_data[key]["y"][metric].append(value)
def train_models(self, niche: str, platform: str):
"""Train prediction models for specific niche/platform."""
key = f"{niche}:{platform}"
with self.lock:
data = self.training_data[key]
if len(data["X"]) < 50: # Need minimum samples
logger.warning(f"Insufficient training data for {key}: {len(data['X'])} samples")
return
X = np.vstack(data["X"])
# Train simple linear models for each metric
# In production, use gradient boosting or neural nets
models = {}
for metric, y_values in data["y"].items():
if len(y_values) != len(X):
continue
y = np.array(y_values)
# Simple linear regression via normal equation
# θ = (X^T X)^-1 X^T y
try:
X_with_bias = np.c_[np.ones(len(X)), X]
theta = np.linalg.lstsq(X_with_bias, y, rcond=None)[0]
models[metric] = theta
except Exception as e:
logger.error(f"Failed to train {metric} model: {e}")
self.models[key] = models
logger.info(f"Trained {len(models)} prediction models for {key}")
def predict(
self,
niche: str,
platform: str,
audio_embedding: np.ndarray
) -> Dict[str, float]:
"""
Predict performance metrics before posting.
Returns:
Dict with predicted_views_24h, predicted_completion_rate, etc.
"""
key = f"{niche}:{platform}"
with self.lock:
if key not in self.models:
# No model trained yet, return neutral predictions
return {
"predicted_views_24h": 50000.0, # Conservative baseline
"predicted_completion_rate": 0.5,
"predicted_engagement_score": 0.5,
"predicted_retention_2s": 0.6,
"confidence": 0.1 # Low confidence
}
models = self.models[key]
predictions = {}
X = np.r_[1.0, audio_embedding] # Add bias term
for metric, theta in models.items():
if len(theta) != len(X):
continue
pred = np.dot(X, theta)
# Clip to valid ranges
if "rate" in metric or "retention" in metric or "score" in metric:
pred = np.clip(pred, 0.0, 1.0)
elif "views" in metric:
pred = max(0.0, pred)
predictions[f"predicted_{metric}"] = float(pred)
# Add confidence based on training samples
predictions["confidence"] = min(len(self.training_data[key]["X"]) / 500.0, 1.0)
return predictions
class TrendMomentumTracker:
"""
Tracks temporal momentum and decay for patterns.
Enables adaptive trending vs stale pattern detection.
"""
def __init__(self, window_hours: int = 24):
self.window_hours = window_hours
self.pattern_timeseries = defaultdict(list) # pattern_key -> [(timestamp, engagement)]
self.lock = threading.Lock()
def add_performance_point(
self,
pattern_key: str,
timestamp: datetime,
engagement_score: float
):
"""Add performance data point for pattern."""
with self.lock:
self.pattern_timeseries[pattern_key].append((timestamp, engagement_score))
# Prune old data
cutoff = datetime.utcnow() - timedelta(hours=self.window_hours * 7) # Keep 7 windows
self.pattern_timeseries[pattern_key] = [
(ts, score) for ts, score in self.pattern_timeseries[pattern_key]
if ts > cutoff
]
def calculate_momentum(self, pattern_key: str) -> Dict[str, float]:
"""
Calculate trend momentum and decay rate.
Returns:
{
"trend_momentum": weighted recent growth,
"decay_rate": exponential decay coefficient,
"is_trending": boolean indicator,
"velocity": rate of change
}
"""
with self.lock:
data = self.pattern_timeseries.get(pattern_key, [])
if len(data) < 3:
return {
"trend_momentum": 0.0,
"decay_rate": 0.0,
"is_trending": False,
"velocity": 0.0
}
# Sort by timestamp
data = sorted(data, key=lambda x: x[0])
# Split into recent and older
midpoint = len(data) // 2
older_scores = [score for _, score in data[:midpoint]]
recent_scores = [score for _, score in data[midpoint:]]
# Calculate momentum as weighted growth
older_avg = np.mean(older_scores)
recent_avg = np.mean(recent_scores)
if older_avg > 0:
momentum = (recent_avg - older_avg) / older_avg
else:
momentum = 0.0
# Calculate exponential decay rate
# Fit exponential: y = a * exp(b * t)
timestamps = [(ts - data[0][0]).total_seconds() / 3600.0 for ts, _ in data]
scores = [score for _, score in data]
try:
# Log-linear regression
log_scores = np.log(np.maximum(scores, 1e-6))
coeffs = np.polyfit(timestamps, log_scores, 1)
decay_rate = float(coeffs[0]) # Negative means decay
except:
decay_rate = 0.0
# Velocity (recent rate of change)
if len(recent_scores) >= 2:
recent_times = timestamps[midpoint:]
velocity = (recent_scores[-1] - recent_scores[0]) / max(recent_times[-1] - recent_times[0], 1.0)
else:
velocity = 0.0
is_trending = momentum > 0.1 and decay_rate > -0.05
return {
"trend_momentum": float(momentum),
"decay_rate": float(decay_rate),
"is_trending": is_trending,
"velocity": float(velocity)
}
class AdaptiveAnomalyDetector:
"""
Enhanced anomaly detection with drift modeling and predictive capabilities.
Uses exponential moving averages and rolling windows for adaptive thresholds.
"""
def __init__(self, alpha: float = 0.2, window_size: int = 1000):
self.alpha = alpha # EMA learning rate
self.window_size = window_size
self.ema_stats = defaultdict(lambda: {"mean": 0.0, "var": 1.0, "n": 0})
self.rolling_windows = defaultdict(lambda: deque(maxlen=window_size))
self.drift_detector = defaultdict(lambda: {"last_mean": 0.0, "drift_count": 0})
self.lock = threading.Lock()
def update_statistics(
self,
key: str,
value: float,
detect_drift: bool = True
):
"""Update EMA statistics with drift detection."""
with self.lock:
stats = self.ema_stats[key]
window = self.rolling_windows[key]
# Update rolling window
window.append(value)
# Update EMA
if stats["n"] == 0:
stats["mean"] = value
stats["var"] = 0.0
else:
# Exponential moving average
delta = value - stats["mean"]
stats["mean"] += self.alpha * delta
stats["var"] = (1 - self.alpha) * (stats["var"] + self.alpha * delta ** 2)
stats["n"] += 1
# Drift detection
if detect_drift and len(window) >= 100:
self._detect_drift(key, window)
def _detect_drift(self, key: str, window: deque):
"""Detect concept drift in distribution."""
drift = self.drift_detector[key]
# Compare recent mean to older mean
recent = list(window)[-50:]
older = list(window)[:50]
recent_mean = np.mean(recent)
older_mean = np.mean(older)
# Check for significant shift
if abs(recent_mean - older_mean) > 2 * np.std(list(window)):
drift["drift_count"] += 1
logger.warning(f"Drift detected for {key}: {older_mean:.3f} -> {recent_mean:.3f}")
drift["last_mean"] = recent_mean
def is_anomalous(
self,
key: str,
value: float,
n_sigma: float = 3.0
) -> Tuple[bool, float]:
"""
Check if value is anomalous using adaptive thresholds.
Returns:
(is_anomaly, z_score)
"""
with self.lock:
stats = self.ema_stats[key]
if stats["n"] < 10:
return False, 0.0
std = np.sqrt(max(stats["var"], 1e-6))
z_score = abs(value - stats["mean"]) / std
is_anomaly = z_score > n_sigma
return is_anomaly, float(z_score)
def predict_failure_probability(
self,
record: AudioRecord
) -> Tuple[float, List[str]]:
"""
Predict probability of failure BEFORE posting.
Returns:
(failure_probability, risk_factors)
"""
risks = []
risk_scores = []
af = record.audio_features
pm = record.platform_metrics
# Check beat alignment risk
beat_key = f"{record.niche}:beat_alignment"
if beat_key in self.ema_stats:
is_anom, z = self.is_anomalous(beat_key, af.beat_alignment.sync_quality_score)
if is_anom and af.beat_alignment.sync_quality_score < self.ema_stats[beat_key]["mean"]:
risks.append("poor_beat_alignment")
risk_scores.append(min(z / 3.0, 1.0))
# Check energy risk
energy_key = f"{record.niche}:energy"
if energy_key in self.ema_stats:
is_anom, z = self.is_anomalous(energy_key, af.energy_envelope.mean_energy_db)
if is_anom and af.energy_envelope.mean_energy_db < self.ema_stats[energy_key]["mean"]:
risks.append("low_energy")
risk_scores.append(min(z / 3.0, 1.0))
# Check emotional intensity risk
if af.word_features:
early_emotion = np.mean([w.emotion_intensity for w in af.word_features[:5]])
emotion_key = f"{record.niche}:emotion"
if emotion_key in self.ema_stats:
is_anom, z = self.is_anomalous(emotion_key, early_emotion)
if is_anom and early_emotion < self.ema_stats[emotion_key]["mean"]:
risks.append("low_emotional_intensity")
risk_scores.append(min(z / 3.0, 1.0))
# Aggregate risk
if risk_scores:
failure_prob = min(np.mean(risk_scores), 1.0)
else:
failure_prob = 0.1 # Baseline risk
return failure_prob, risks
def detect_anomalies(self, record: AudioRecord) -> Tuple[bool, Optional[str]]:
"""
Detect anomalies using adaptive thresholds.
Returns:
(is_anomaly, reason)
"""
anomalies = []
# Check retention cliff
ret_1s = record.platform_metrics.retention_1s
ret_2s = record.platform_metrics.retention_2s
if ret_1s > 0.8 and ret_2s < 0.3:
anomalies.append("retention_cliff_1s_to_2s")
# Check extreme negative signals
if record.platform_metrics.mute_rate > 0.5:
anomalies.append("high_mute_rate")
if record.platform_metrics.scroll_away_velocity < 500:
anomalies.append("instant_scroll_away")
# Check beat alignment failure
if record.audio_features.beat_alignment.on_beat_percentage < 0.3:
anomalies.append("severe_beat_misalignment")
# Check extreme pitch variance
if record.audio_features.pitch_contour.pitch_variance > 5000:
anomalies.append("extreme_pitch_variance")
# Check silence detection
if record.audio_features.energy_envelope.mean_energy_db < -60:
anomalies.append("audio_too_quiet")
# Adaptive statistical outlier detection
key = f"{record.niche}:{record.platform}"
is_eng_anom, z_eng = self.is_anomalous(
f"{key}:engagement",
record.platform_metrics.platform_engagement_score,
n_sigma=3.5
)
if is_eng_anom and z_eng > 4.0:
anomalies.append(f"statistical_outlier_z{z_eng:.1f}")
# Check beat alignment adaptive
is_beat_anom, z_beat = self.is_anomalous(
f"{key}:beat",
record.audio_features.beat_alignment.sync_quality_score
)
if is_beat_anom and record.audio_features.beat_alignment.sync_quality_score < 0.5:
anomalies.append(f"poor_beat_sync_z{z_beat:.1f}")
if anomalies:
return True, "; ".join(anomalies)
return False, None
class AudioPerformanceStore:
"""
Production-grade audio performance store for autonomous viral content system.
CRITICAL PROPERTIES:
- Append-only: No silent overwrites
- Indexed retrieval: Optimized for RL queries
- Event emission: Real-time orchestration integration
- Anomaly detection: Automatic failure flagging
- Scale: 20k-100k videos/day
"""
def __init__(self, db_path: str = "audio_performance_ground_truth.db"):
"""Initialize store with production-grade SQLite backend."""
verify_caller_authorization()
self.db_path = Path(db_path)
self.lock = threading.RLock()
# Enhanced components
self.embedding_engine = EmbeddingEngine()
self.virality_predictor = ViralityPredictor()
self.trend_tracker = TrendMomentumTracker()
self.anomaly_detector = AdaptiveAnomalyDetector()
# Event system
self._event_listeners: Dict[EventType, List[Callable]] = defaultdict(list)
# Performance tracking
self._ingest_count = 0
self._ingest_errors = 0
self._last_stats_reset = time.time()
# Threshold configuration
self.thresholds = {
"extreme_success_retention_2s": 0.85,
"extreme_failure_retention_2s": 0.15,
"extreme_success_engagement": 0.9,
"extreme_failure_engagement": 0.1,
}
# Initialize database
self._init_database()
logger.info(f"AudioPerformanceStore initialized (schema v{SCHEMA_VERSION})")
logger.info(f"Database: {self.db_path.absolute()}")
logger.info(f"Append-only mode: ENABLED")
logger.info(f"ML capabilities: Embeddings, Predictions, Trend Tracking")
def _init_database(self):
"""Initialize production-grade database schema with indices."""
with self._get_connection() as conn:
cursor = conn.cursor()
# Main records table (append-only)
cursor.execute("""
CREATE TABLE IF NOT EXISTS audio_records (
record_id TEXT PRIMARY KEY,
video_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
audio_features_json TEXT NOT NULL,
platform_metrics_json TEXT NOT NULL,
niche TEXT NOT NULL,
platform TEXT NOT NULL,
beat_id TEXT NOT NULL,
beat_version_lineage TEXT NOT NULL,
voice_profile_hash TEXT NOT NULL,
orchestration_job_id TEXT NOT NULL,
language TEXT NOT NULL,
trend_id TEXT,
content_type TEXT NOT NULL,
schema_version TEXT NOT NULL,
ingestion_timestamp TEXT NOT NULL,
is_anomaly INTEGER NOT NULL,
anomaly_reason TEXT
)
""")
# Performance-critical indices
cursor.execute("CREATE INDEX IF NOT EXISTS idx_video_id ON audio_records(video_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_id ON audio_records(orchestration_job_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_platform_niche ON audio_records(platform, niche)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_beat_id ON audio_records(beat_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_voice_hash ON audio_records(voice_profile_hash)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_ingestion_time ON audio_records(ingestion_timestamp)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_anomaly ON audio_records(is_anomaly)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON audio_records(timestamp)")
# Materialized view for fast winner/loser queries (ENHANCED)
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance_summary (
record_id TEXT PRIMARY KEY,
video_id TEXT NOT NULL,
niche TEXT NOT NULL,
platform TEXT NOT NULL,
retention_1s REAL NOT NULL,
retention_2s REAL NOT NULL,
retention_3s REAL NOT NULL,
completion_rate REAL NOT NULL,
engagement_score REAL NOT NULL,
mute_rate REAL NOT NULL,
scroll_away_velocity REAL NOT NULL,
beat_alignment_score REAL NOT NULL,
is_winner INTEGER NOT NULL,
is_loser INTEGER NOT NULL,
audio_embedding BLOB,
performance_embedding BLOB,
predicted_views_24h REAL,
predicted_completion_rate REAL,
predicted_engagement_score REAL,
predicted_retention_2s REAL,
prediction_confidence REAL,
trend_momentum REAL,
decay_rate REAL,
is_trending INTEGER,
failure_risk_score REAL,
ingestion_timestamp TEXT NOT NULL,
FOREIGN KEY (record_id) REFERENCES audio_records(record_id)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_winners ON performance_summary(is_winner, engagement_score DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_losers ON performance_summary(is_loser, retention_2s ASC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_engagement ON performance_summary(engagement_score DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_predicted_views ON performance_summary(predicted_views_24h DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_trending ON performance_summary(is_trending DESC, trend_momentum DESC)")
# Pattern similarity graph
cursor.execute("""
CREATE TABLE IF NOT EXISTS pattern_similarity (
record_id TEXT NOT NULL,
similar_record_id TEXT NOT NULL,
similarity_score REAL NOT NULL,
similarity_type TEXT NOT NULL,
computed_at TEXT NOT NULL,
PRIMARY KEY (record_id, similar_record_id, similarity_type),
FOREIGN KEY (record_id) REFERENCES audio_records(record_id),
FOREIGN KEY (similar_record_id) REFERENCES audio_records(record_id)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_similarity_score ON pattern_similarity(similarity_score DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_similarity_record ON pattern_similarity(record_id)")
# Temporal performance trajectory
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance_trajectory (
trajectory_id INTEGER PRIMARY KEY AUTOINCREMENT,
record_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
hours_since_post REAL NOT NULL,
views INTEGER NOT NULL,
engagement_score REAL NOT NULL,
velocity REAL NOT NULL,
FOREIGN KEY (record_id) REFERENCES audio_records(record_id)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_trajectory_record ON performance_trajectory(record_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_trajectory_time ON performance_trajectory(hours_since_post)")
# System metadata
cursor.execute("""
CREATE TABLE IF NOT EXISTS system_metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# Event log
cursor.execute("""
CREATE TABLE IF NOT EXISTS event_log (
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
record_id TEXT,
event_data TEXT NOT NULL,
timestamp TEXT NOT NULL
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON event_log(event_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_time ON event_log(timestamp)")
# Store schema version
cursor.execute("""
INSERT OR REPLACE INTO system_metadata (key, value, updated_at)
VALUES (?, ?, ?)
""", ("schema_version", SCHEMA_VERSION, datetime.utcnow().isoformat()))
conn.commit()
@contextmanager
def _get_connection(self):
"""Thread-safe database connection with WAL mode for concurrency."""
conn = sqlite3.connect(
str(self.db_path),
timeout=30.0,
check_same_thread=False
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA cache_size=-64000") # 64MB cache
try:
yield conn
finally:
conn.close()
def store_audio_record(self, record: AudioRecord) -> bool:
"""
Store audio record with validation and anomaly detection.
APPEND-ONLY: No overwrites. Duplicates are rejected.
Args:
record: Complete AudioRecord instance
Returns:
bool: True if stored successfully
Raises:
DataIntegrityError: If validation fails
"""
verify_caller_authorization()
with self.lock:
try:
# Validate record
self._validate_record(record)
# Compute embeddings
audio_embedding = self.embedding_engine.compute_audio_embedding(record.audio_features)
performance_embedding = self.embedding_engine.compute_performance_embedding(record.platform_metrics)
# Get predictions
predictions = self.virality_predictor.predict(
record.niche,
record.platform,
audio_embedding
)
# Calculate trend momentum
pattern_key = f"{record.niche}:{record.beat_id}"
self.trend_tracker.add_performance_point(
pattern_key,
datetime.fromisoformat(record.timestamp),
record.platform_metrics.platform_engagement_score
)
momentum_metrics = self.trend_tracker.calculate_momentum(pattern_key)
# Adaptive anomaly detection
is_anomaly, reason = self.anomaly_detector.detect_anomalies(record)
failure_prob, risk_factors = self.anomaly_detector.predict_failure_probability(record)
if risk_factors:
reason = (reason + "; " if reason else "") + ", ".join(risk_factors)
record.is_anomaly = is_anomaly
record.anomaly_reason = reason
# Serialize complex objects
audio_json = json.dumps(asdict(record.audio_features))
metrics_json = json.dumps(asdict(record.platform_metrics))
with self._get_connection() as conn:
cursor = conn.cursor()
# Append-only: Check for duplicates
cursor.execute(
"SELECT record_id FROM audio_records WHERE record_id = ?",
(record.record_id,)
)
if cursor.fetchone():
raise DataIntegrityError(
f"APPEND-ONLY VIOLATION: Record {record.record_id} already exists"
)
# Insert main record
cursor.execute("""
INSERT INTO audio_records (
record_id, video_id, timestamp, audio_features_json,
platform_metrics_json, niche, platform, beat_id,
beat_version_lineage, voice_profile_hash, orchestration_job_id,
language, trend_id, content_type, schema_version,
ingestion_timestamp, is_anomaly, anomaly_reason
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record.record_id, record.video_id, record.timestamp,
audio_json, metrics_json, record.niche, record.platform,
record.beat_id, record.beat_version_lineage,
record.voice_profile_hash, record.orchestration_job_id,
record.language, record.trend_id, record.content_type,
record.schema_version, record.ingestion_timestamp,
1 if record.is_anomaly else 0, record.anomaly_reason
))
# Update enhanced performance summary
self._update_performance_summary(
cursor, record, audio_embedding, performance_embedding,
predictions, momentum_metrics, failure_prob
)
conn.commit()
# Update adaptive statistics
self.anomaly_detector.update_statistics(
f"{record.niche}:engagement",
record.platform_metrics.platform_engagement_score
)
self.anomaly_detector.update_statistics(
f"{record.niche}:beat_alignment",
record.audio_features.beat_alignment.sync_quality_score
)
self.anomaly_detector.update_statistics(
f"{record.niche}:energy",
record.audio_features.energy_envelope.mean_energy_db
)
# Add to training data
self.virality_predictor.add_training_sample(
record.niche,
record.platform,
audio_embedding,
{
"views_24h": record.platform_metrics.shares * 100, # Proxy
"completion_rate": record.platform_metrics.completion_rate,
"engagement_score": record.platform_metrics.platform_engagement_score,
"retention_2s": record.platform_metrics.retention_2s
}
)
# Update embedding engine
self.embedding_engine.update_incremental(audio_embedding)
# Compute and store similarities (async in production)
self._compute_similarities(record.record_id, audio_embedding)
# Track ingestion
self._ingest_count += 1
self._check_ingestion_rate()
# Emit events
self._emit_event(EventType.RECORD_STORED, record)
if is_anomaly:
self._emit_event(EventType.ANOMALY_DETECTED, record)
self._log_event(EventType.ANOMALY_DETECTED, record.record_id, {
"reason": reason,
"video_id": record.video_id
})
# Check extreme thresholds
self._check_extreme_performance(record)
logger.debug(f"Stored record {record.record_id} (anomaly: {is_anomaly})")
return True
except Exception as e:
self._ingest_errors += 1
logger.error(f"CRITICAL: Failed to store record: {e}", exc_info=True)
return False
def _validate_record(self, record: AudioRecord):
"""Validate record before storage. Raises DataIntegrityError if invalid."""
# Validate required fields
if not record.video_id or not record.orchestration_job_id:
raise DataIntegrityError("Missing required identifiers")
# Validate retention values
pm = record.platform_metrics
if not (0 <= pm.retention_1s <= 1.0):
raise DataIntegrityError(f"Invalid retention_1s: {pm.retention_1s}")
if not (0 <= pm.retention_2s <= 1.0):
raise DataIntegrityError(f"Invalid retention_2s: {pm.retention_2s}")
if not (0 <= pm.retention_3s <= 1.0):
raise DataIntegrityError(f"Invalid retention_3s: {pm.retention_3s}")
# Validate retention ordering
if pm.retention_2s > pm.retention_1s + 0.01: # Small tolerance
raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)")
if pm.retention_3s > pm.retention_2s + 0.01:
raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)")
# Validate audio features
af = record.audio_features
if af.total_duration_ms <= 0:
raise DataIntegrityError("Invalid duration")
if len(af.syllable_features) == 0:
raise DataIntegrityError("Missing syllable features")
if len(af.word_features) == 0:
raise DataIntegrityError("Missing word features")
def _update_performance_summary(
self,
cursor,
record: AudioRecord,
audio_embedding: np.ndarray,
performance_embedding: np.ndarray,
predictions: Dict[str, float],
momentum_metrics: Dict[str, float],
failure_risk: float
):
"""Update enhanced materialized view with ML features."""
pm = record.platform_metrics
ba = record.audio_features.beat_alignment
# Determine winner/loser status
is_winner = (
pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and
pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"]
)
is_loser = (
pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or
pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"]
)
# Serialize embeddings
audio_emb_bytes = pickle.dumps(audio_embedding)
perf_emb_bytes = pickle.dumps(performance_embedding)
cursor.execute("""
INSERT INTO performance_summary (
record_id, video_id, niche, platform, retention_1s, retention_2s,
retention_3s, completion_rate, engagement_score, mute_rate,
scroll_away_velocity, beat_alignment_score, is_winner, is_loser,
audio_embedding, performance_embedding,
predicted_views_24h, predicted_completion_rate, predicted_engagement_score,
predicted_retention_2s, prediction_confidence,
trend_momentum, decay_rate, is_trending, failure_risk_score,
ingestion_timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record.record_id, record.video_id, record.niche, record.platform,
pm.retention_1s, pm.retention_2s, pm.retention_3s, pm.completion_rate,
pm.platform_engagement_score, pm.mute_rate, pm.scroll_away_velocity,
ba.sync_quality_score, 1 if is_winner else 0, 1 if is_loser else 0,
audio_emb_bytes, perf_emb_bytes,
predictions.get("predicted_views_24h", 0.0),
predictions.get("predicted_completion_rate", 0.0),
predictions.get("predicted_engagement_score", 0.0),
predictions.get("predicted_retention_2s", 0.0),
predictions.get("confidence", 0.0),
momentum_metrics["trend_momentum"],
momentum_metrics["decay_rate"],
1 if momentum_metrics["is_trending"] else 0,
failure_risk,
record.ingestion_timestamp
))
def _compute_similarities(self, record_id: str, embedding: np.ndarray):
"""Compute and store pattern similarities (k-nearest neighbors)."""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
# Get recent embeddings for comparison
cursor.execute("""
SELECT record_id, audio_embedding
FROM performance_summary
WHERE record_id != ?
ORDER BY ingestion_timestamp DESC
LIMIT 1000
""", (record_id,))
rows = cursor.fetchall()
similarities = []
for row in rows:
other_id = row[0]
other_emb = pickle.loads(row[1])
# Cosine similarity
cos_sim = 1.0 - cosine(embedding, other_emb)
if cos_sim > 0.7: # Only store high similarities
similarities.append((other_id, cos_sim))
# Store top-K similarities
similarities.sort(key=lambda x: x[1], reverse=True)
for other_id, score in similarities[:20]: # Top 20
cursor.execute("""
INSERT OR REPLACE INTO pattern_similarity
(record_id, similar_record_id, similarity_score, similarity_type, computed_at)
VALUES (?, ?, ?, ?, ?)
""", (
record_id, other_id, score, "audio_cosine",
datetime.utcnow().isoformat()
))
conn.commit()
except Exception as e:
logger.error(f"Failed to compute similarities: {e}")
def _check_extreme_performance(self, record: AudioRecord):
"""Check for extreme success/failure and emit events."""
pm = record.platform_metrics
if (pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and
pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"]):
self._emit_event(EventType.EXTREME_SUCCESS, record)
self._log_event(EventType.EXTREME_SUCCESS, record.record_id, {
"retention_2s": pm.retention_2s,
"engagement": pm.platform_engagement_score,
"niche": record.niche
})
logger.info(f"EXTREME SUCCESS: {record.video_id} (ret={pm.retention_2s:.2f})")
elif (pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or
pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"]):
self._emit_event(EventType.EXTREME_FAILURE, record)
self._log_event(EventType.EXTREME_FAILURE, record.record_id, {
"retention_2s": pm.retention_2s,
"engagement": pm.platform_engagement_score,
"mute_rate": pm.mute_rate,
"niche": record.niche
})
logger.warning(f"EXTREME FAILURE: {record.video_id} (ret={pm.retention_2s:.2f})")
def get_winners_vs_losers(
self,
filters: Optional[Dict[str, Any]] = None,
limit_per_group: int = 1000
) -> Dict[str, List[AudioRecord]]:
"""
Get winners and losers for comparison (critical for RL).
Returns:
{"winners": [...], "losers": [...]}
"""
verify_caller_authorization()
with self.lock:
winners = self._query_performance_group(filters, "winners", limit_per_group)
losers = self._query_performance_group(filters, "losers", limit_per_group)
return {
"winners": winners,
"losers": losers
}
def analyze_retention_killers(
self,
filters: Optional[Dict[str, Any]] = None,
threshold_ms: int = 2000
) -> List[Dict[str, Any]]:
"""
Identify what killed retention early (< threshold_ms).
Returns list of analysis dicts with audio feature correlations.
"""
verify_caller_authorization()
with self.lock:
# Get records with early retention failure
query = """
SELECT ar.* FROM audio_records ar
JOIN performance_summary ps ON ar.record_id = ps.record_id
WHERE ps.retention_2s < 0.3
"""
params = []
if filters:
conditions = []
for key, value in filters.items():
if key in ["niche", "platform", "beat_id"]:
conditions.append(f"ar.{key} = ?")
params.append(value)
if conditions:
query += " AND " + " AND ".join(conditions)
query += " ORDER BY ps.retention_2s ASC LIMIT 500"
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
rows = cursor.fetchall()
if not rows:
return []
records = [self._row_to_record(row) for row in rows]
# Analyze common patterns
analyses = []
for record in records:
af = record.audio_features
pm = record.platform_metrics
killers = []
# Check first 2 seconds of audio
early_syllables = [s for s in af.syllable_features if s.start_time_ms < threshold_ms]
if early_syllables:
avg_energy = mean([s.energy_db for s in early_syllables])
if avg_energy < -50:
killers.append("low_energy_start")
avg_beat_error = mean([abs(s.beat_alignment_error_ms) for s in early_syllables])
if avg_beat_error > 150:
killers.append("poor_beat_alignment_start")
# Check pause density early
early_pauses = [p for p in af.pause_metrics.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment