Skip to content

Instantly share code, notes, and snippets.

@bogged-broker
Created December 31, 2025 00:02
Show Gist options
  • Select an option

  • Save bogged-broker/3f0af503c0de65632c5b9010164c1e28 to your computer and use it in GitHub Desktop.

Select an option

Save bogged-broker/3f0af503c0de65632c5b9010164c1e28 to your computer and use it in GitHub Desktop.
"""
audio_performance_store.py
CRITICAL: Single source of truth for all audio decisions in autonomous viral content system.
Every downstream learning, punishment, and reward depends on this data.
SCALE: 20k-100k videos/day, append-only with indexed retrieval
INTEGRATION: Orchestration scheduler, enforcement layers, RL feedback loops
FAILURE MODE: Incorrect data = catastrophic system failure
SCHEMA VERSION: 2.0.0
"""
import sqlite3
import threading
import time
import hashlib
import json
from collections import defaultdict
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Callable
from enum import Enum
import numpy as np
import logging
from statistics import mean, stdev
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Schema version
SCHEMA_VERSION = "2.0.0"
# Access control
AUTHORIZED_MODULES = {
"audio_generation_controller",
"audio_reinforcement_loop",
"feedback_ingestor",
"scheduler",
"audio_pattern_learner"
}
class UnauthorizedAccessError(Exception):
"""Raised when unauthorized module attempts access."""
pass
class DataIntegrityError(Exception):
"""Raised when data validation fails."""
pass
class EventType(Enum):
"""Event types for orchestration integration."""
EXTREME_SUCCESS = "extreme_success"
EXTREME_FAILURE = "extreme_failure"
ANOMALY_DETECTED = "anomaly_detected"
RECORD_STORED = "record_stored"
THRESHOLD_CROSSED = "threshold_crossed"
def verify_caller_authorization():
"""Verify calling module is authorized. MANDATORY for all public methods."""
import inspect
frame = inspect.currentframe()
try:
caller_frame = frame.f_back.f_back
caller_module = inspect.getmodule(caller_frame)
if caller_module:
module_name = caller_module.__name__.split('.')[-1]
if module_name not in AUTHORIZED_MODULES:
raise UnauthorizedAccessError(
f"SECURITY: Module '{module_name}' unauthorized. "
f"Authorized: {AUTHORIZED_MODULES}"
)
finally:
del frame
@dataclass
class SyllableLevelTiming:
"""Time-segmented syllable-level audio features."""
syllable_index: int
start_time_ms: float
duration_ms: float
pitch_hz: float
energy_db: float
beat_alignment_error_ms: float # Deviation from expected beat position
phoneme_sequence: List[str]
stress_level: float # 0.0 to 1.0
@dataclass
class WordLevelFeatures:
"""Word-level emotion and delivery metrics."""
word_index: int
word_text: str
start_time_ms: float
duration_ms: float
emotion_intensity: float # 0.0 to 1.0
emotion_class: str # e.g., "excited", "calm", "urgent"
emphasis_score: float # Relative emphasis vs surrounding words
clarity_score: float # Pronunciation clarity
@dataclass
class PitchContour:
"""Time-series pitch data with statistical features."""
timestamps_ms: List[float]
pitch_hz_values: List[float]
pitch_variance: float
pitch_range_semitones: float
pitch_slope: float # Linear regression slope
pitch_inflection_points: List[int] # Indices of significant changes
@dataclass
class EnergyEnvelope:
"""Time-series energy/amplitude data."""
timestamps_ms: List[float]
energy_db_values: List[float]
peak_energy_db: float
mean_energy_db: float
energy_variance: float
dynamic_range_db: float
attack_times_ms: List[float] # Time to reach peaks
decay_times_ms: List[float] # Time from peaks to valleys
@dataclass
class PauseDensityMetrics:
"""Pause timing and distribution analysis."""
total_pause_count: int
pause_durations_ms: List[float]
pause_positions_ms: List[float]
mean_pause_duration_ms: float
pause_variance_ms: float
inter_pause_intervals_ms: List[float]
strategic_pause_count: int # Pauses before hooks/key phrases
@dataclass
class BeatAlignmentMetrics:
"""Multi-timestamp beat synchronization metrics."""
beat_timestamps_ms: List[float] # Expected beat positions
syllable_timestamps_ms: List[float] # Actual syllable positions
alignment_errors_ms: List[float] # Per-syllable deviations
mean_error_ms: float
max_error_ms: float
on_beat_percentage: float # % within acceptable tolerance
sync_quality_score: float # 0.0 to 1.0
@dataclass
class SpectralFeatures:
"""Spectral analysis across frequency bands."""
timestamps_ms: List[float]
low_band_energy: List[float] # 0-200 Hz
mid_low_energy: List[float] # 200-500 Hz
mid_energy: List[float] # 500-2000 Hz
mid_high_energy: List[float] # 2000-4000 Hz
high_energy: List[float] # 4000+ Hz
spectral_centroid: List[float]
spectral_rolloff: List[float]
dominant_band_per_segment: List[str]
@dataclass
class SegmentedAudioFeatures:
"""Complete time-segmented audio feature set."""
# Syllable-level data
syllable_features: List[SyllableLevelTiming]
# Word-level data
word_features: List[WordLevelFeatures]
# Time-series data
pitch_contour: PitchContour
energy_envelope: EnergyEnvelope
# Timing metrics
pause_metrics: PauseDensityMetrics
beat_alignment: BeatAlignmentMetrics
# Spectral data
spectral_features: SpectralFeatures
# Overall metrics
total_duration_ms: float
words_per_minute: float
syllables_per_second: float
# Extraction metadata
extraction_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
extractor_version: str = SCHEMA_VERSION
@dataclass
class PlatformMetrics:
"""Platform-normalized performance metrics with negative signals."""
# Retention breakdown
retention_1s: float # Critical early hook
retention_2s: float
retention_3s: float
completion_rate: float
# Engagement signals
rewatch_count: int
rewatch_rate: float # Rewatches per view
shares: int
shares_per_impression: float
saves: int
# Negative signals (CRITICAL for failure detection)
scroll_away_velocity: float # Avg ms before skip
mute_rate: float # % who muted audio
skip_rate: float # % who skipped before completion
negative_feedback_count: int # "Not interested" etc.
# Normalized scores
platform_engagement_score: float # Platform-specific normalization
virality_coefficient: float # Shares / (views * time_decay)
# Collection metadata
platform: str
collection_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
metrics_version: str = SCHEMA_VERSION
@dataclass
class AudioRecord:
"""Complete append-only record for a single video's audio performance."""
# Primary identifiers
record_id: str # Unique hash: video_id + timestamp
video_id: str
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
# Audio features (immutable after creation)
audio_features: SegmentedAudioFeatures
# Platform performance (immutable after final collection)
platform_metrics: PlatformMetrics
# Mandatory tags for retrieval
niche: str
platform: str
beat_id: str
beat_version_lineage: str # e.g., "beat_v3 <- beat_v2 <- beat_v1"
voice_profile_hash: str # SHA256 of voice config
orchestration_job_id: str
# Additional context
language: str
trend_id: Optional[str] = None
content_type: str = "audio_overlay" # For future expansion
# Metadata (append-only guarantees)
schema_version: str = SCHEMA_VERSION
ingestion_timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
is_anomaly: bool = False
anomaly_reason: Optional[str] = None
def __post_init__(self):
"""Generate deterministic record_id."""
if not self.record_id:
hash_input = f"{self.video_id}:{self.timestamp}:{self.orchestration_job_id}"
self.record_id = hashlib.sha256(hash_input.encode()).hexdigest()[:16]
class AnomalyDetector:
"""Real-time anomaly detection for audio performance data."""
def __init__(self):
self.baseline_stats = defaultdict(lambda: {"mean": 0.0, "std": 1.0, "n": 0})
self.lock = threading.Lock()
def detect_anomalies(self, record: AudioRecord) -> Tuple[bool, Optional[str]]:
"""
Detect anomalies in audio features and performance.
Returns:
(is_anomaly, reason)
"""
anomalies = []
# Check retention cliff (major drop between 1s and 2s)
ret_1s = record.platform_metrics.retention_1s
ret_2s = record.platform_metrics.retention_2s
if ret_1s > 0.8 and ret_2s < 0.3:
anomalies.append("retention_cliff_1s_to_2s")
# Check extreme negative signals
if record.platform_metrics.mute_rate > 0.5:
anomalies.append("high_mute_rate")
if record.platform_metrics.scroll_away_velocity < 500: # < 500ms
anomalies.append("instant_scroll_away")
# Check beat alignment failure
if record.audio_features.beat_alignment.on_beat_percentage < 0.3:
anomalies.append("severe_beat_misalignment")
# Check extreme pitch variance (potentially distorted audio)
if record.audio_features.pitch_contour.pitch_variance > 5000:
anomalies.append("extreme_pitch_variance")
# Check silence detection (energy too low)
if record.audio_features.energy_envelope.mean_energy_db < -60:
anomalies.append("audio_too_quiet")
# Statistical outlier detection
with self.lock:
key = f"{record.niche}:{record.platform}"
stats = self.baseline_stats[key]
if stats["n"] > 100: # Need baseline
engagement = record.platform_metrics.platform_engagement_score
z_score = abs((engagement - stats["mean"]) / max(stats["std"], 0.01))
if z_score > 4.0: # 4 standard deviations
anomalies.append(f"statistical_outlier_z{z_score:.1f}")
if anomalies:
return True, "; ".join(anomalies)
return False, None
def update_baseline(self, record: AudioRecord):
"""Update baseline statistics for anomaly detection."""
with self.lock:
key = f"{record.niche}:{record.platform}"
stats = self.baseline_stats[key]
engagement = record.platform_metrics.platform_engagement_score
n = stats["n"]
# Online mean and variance update
if n == 0:
stats["mean"] = engagement
stats["std"] = 0.0
else:
old_mean = stats["mean"]
stats["mean"] = (old_mean * n + engagement) / (n + 1)
if n > 1:
old_var = stats["std"] ** 2
new_var = ((n - 1) * old_var + (engagement - old_mean) * (engagement - stats["mean"])) / n
stats["std"] = max(np.sqrt(new_var), 0.01)
stats["n"] = n + 1
class AudioPerformanceStore:
"""
Production-grade audio performance store for autonomous viral content system.
CRITICAL PROPERTIES:
- Append-only: No silent overwrites
- Indexed retrieval: Optimized for RL queries
- Event emission: Real-time orchestration integration
- Anomaly detection: Automatic failure flagging
- Scale: 20k-100k videos/day
"""
def __init__(self, db_path: str = "audio_performance_ground_truth.db"):
"""Initialize store with production-grade SQLite backend."""
verify_caller_authorization()
self.db_path = Path(db_path)
self.lock = threading.RLock()
self.anomaly_detector = AnomalyDetector()
# Event system
self._event_listeners: Dict[EventType, List[Callable]] = defaultdict(list)
# Performance tracking
self._ingest_count = 0
self._ingest_errors = 0
self._last_stats_reset = time.time()
# Threshold configuration
self.thresholds = {
"extreme_success_retention_2s": 0.85,
"extreme_failure_retention_2s": 0.15,
"extreme_success_engagement": 0.9,
"extreme_failure_engagement": 0.1,
}
# Initialize database
self._init_database()
logger.info(f"AudioPerformanceStore initialized (schema v{SCHEMA_VERSION})")
logger.info(f"Database: {self.db_path.absolute()}")
logger.info(f"Append-only mode: ENABLED")
def _init_database(self):
"""Initialize production-grade database schema with indices."""
with self._get_connection() as conn:
cursor = conn.cursor()
# Main records table (append-only)
cursor.execute("""
CREATE TABLE IF NOT EXISTS audio_records (
record_id TEXT PRIMARY KEY,
video_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
audio_features_json TEXT NOT NULL,
platform_metrics_json TEXT NOT NULL,
niche TEXT NOT NULL,
platform TEXT NOT NULL,
beat_id TEXT NOT NULL,
beat_version_lineage TEXT NOT NULL,
voice_profile_hash TEXT NOT NULL,
orchestration_job_id TEXT NOT NULL,
language TEXT NOT NULL,
trend_id TEXT,
content_type TEXT NOT NULL,
schema_version TEXT NOT NULL,
ingestion_timestamp TEXT NOT NULL,
is_anomaly INTEGER NOT NULL,
anomaly_reason TEXT
)
""")
# Performance-critical indices
cursor.execute("CREATE INDEX IF NOT EXISTS idx_video_id ON audio_records(video_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_id ON audio_records(orchestration_job_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_platform_niche ON audio_records(platform, niche)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_beat_id ON audio_records(beat_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_voice_hash ON audio_records(voice_profile_hash)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_ingestion_time ON audio_records(ingestion_timestamp)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_anomaly ON audio_records(is_anomaly)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON audio_records(timestamp)")
# Materialized view for fast winner/loser queries
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance_summary (
record_id TEXT PRIMARY KEY,
video_id TEXT NOT NULL,
niche TEXT NOT NULL,
platform TEXT NOT NULL,
retention_1s REAL NOT NULL,
retention_2s REAL NOT NULL,
retention_3s REAL NOT NULL,
completion_rate REAL NOT NULL,
engagement_score REAL NOT NULL,
mute_rate REAL NOT NULL,
scroll_away_velocity REAL NOT NULL,
beat_alignment_score REAL NOT NULL,
is_winner INTEGER NOT NULL,
is_loser INTEGER NOT NULL,
ingestion_timestamp TEXT NOT NULL,
FOREIGN KEY (record_id) REFERENCES audio_records(record_id)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_winners ON performance_summary(is_winner, engagement_score DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_losers ON performance_summary(is_loser, retention_2s ASC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_engagement ON performance_summary(engagement_score DESC)")
# System metadata
cursor.execute("""
CREATE TABLE IF NOT EXISTS system_metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# Event log
cursor.execute("""
CREATE TABLE IF NOT EXISTS event_log (
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
record_id TEXT,
event_data TEXT NOT NULL,
timestamp TEXT NOT NULL
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON event_log(event_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_time ON event_log(timestamp)")
# Store schema version
cursor.execute("""
INSERT OR REPLACE INTO system_metadata (key, value, updated_at)
VALUES (?, ?, ?)
""", ("schema_version", SCHEMA_VERSION, datetime.utcnow().isoformat()))
conn.commit()
@contextmanager
def _get_connection(self):
"""Thread-safe database connection with WAL mode for concurrency."""
conn = sqlite3.connect(
str(self.db_path),
timeout=30.0,
check_same_thread=False
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA cache_size=-64000") # 64MB cache
try:
yield conn
finally:
conn.close()
def store_audio_record(self, record: AudioRecord) -> bool:
"""
Store audio record with validation and anomaly detection.
APPEND-ONLY: No overwrites. Duplicates are rejected.
Args:
record: Complete AudioRecord instance
Returns:
bool: True if stored successfully
Raises:
DataIntegrityError: If validation fails
"""
verify_caller_authorization()
with self.lock:
try:
# Validate record
self._validate_record(record)
# Detect anomalies
is_anomaly, reason = self.anomaly_detector.detect_anomalies(record)
record.is_anomaly = is_anomaly
record.anomaly_reason = reason
# Serialize complex objects
audio_json = json.dumps(asdict(record.audio_features))
metrics_json = json.dumps(asdict(record.platform_metrics))
with self._get_connection() as conn:
cursor = conn.cursor()
# Append-only: Check for duplicates
cursor.execute(
"SELECT record_id FROM audio_records WHERE record_id = ?",
(record.record_id,)
)
if cursor.fetchone():
raise DataIntegrityError(
f"APPEND-ONLY VIOLATION: Record {record.record_id} already exists"
)
# Insert main record
cursor.execute("""
INSERT INTO audio_records (
record_id, video_id, timestamp, audio_features_json,
platform_metrics_json, niche, platform, beat_id,
beat_version_lineage, voice_profile_hash, orchestration_job_id,
language, trend_id, content_type, schema_version,
ingestion_timestamp, is_anomaly, anomaly_reason
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record.record_id, record.video_id, record.timestamp,
audio_json, metrics_json, record.niche, record.platform,
record.beat_id, record.beat_version_lineage,
record.voice_profile_hash, record.orchestration_job_id,
record.language, record.trend_id, record.content_type,
record.schema_version, record.ingestion_timestamp,
1 if record.is_anomaly else 0, record.anomaly_reason
))
# Update materialized view
self._update_performance_summary(cursor, record)
conn.commit()
# Update anomaly baseline
if not is_anomaly:
self.anomaly_detector.update_baseline(record)
# Track ingestion
self._ingest_count += 1
self._check_ingestion_rate()
# Emit events
self._emit_event(EventType.RECORD_STORED, record)
if is_anomaly:
self._emit_event(EventType.ANOMALY_DETECTED, record)
self._log_event(EventType.ANOMALY_DETECTED, record.record_id, {
"reason": reason,
"video_id": record.video_id
})
# Check extreme thresholds
self._check_extreme_performance(record)
logger.debug(f"Stored record {record.record_id} (anomaly: {is_anomaly})")
return True
except Exception as e:
self._ingest_errors += 1
logger.error(f"CRITICAL: Failed to store record: {e}", exc_info=True)
return False
def _validate_record(self, record: AudioRecord):
"""Validate record before storage. Raises DataIntegrityError if invalid."""
# Validate required fields
if not record.video_id or not record.orchestration_job_id:
raise DataIntegrityError("Missing required identifiers")
# Validate retention values
pm = record.platform_metrics
if not (0 <= pm.retention_1s <= 1.0):
raise DataIntegrityError(f"Invalid retention_1s: {pm.retention_1s}")
if not (0 <= pm.retention_2s <= 1.0):
raise DataIntegrityError(f"Invalid retention_2s: {pm.retention_2s}")
if not (0 <= pm.retention_3s <= 1.0):
raise DataIntegrityError(f"Invalid retention_3s: {pm.retention_3s}")
# Validate retention ordering
if pm.retention_2s > pm.retention_1s + 0.01: # Small tolerance
raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)")
if pm.retention_3s > pm.retention_2s + 0.01:
raise DataIntegrityError("Retention must be monotonic (1s >= 2s >= 3s)")
# Validate audio features
af = record.audio_features
if af.total_duration_ms <= 0:
raise DataIntegrityError("Invalid duration")
if len(af.syllable_features) == 0:
raise DataIntegrityError("Missing syllable features")
if len(af.word_features) == 0:
raise DataIntegrityError("Missing word features")
def _update_performance_summary(self, cursor, record: AudioRecord):
"""Update materialized view for fast queries."""
pm = record.platform_metrics
ba = record.audio_features.beat_alignment
# Determine winner/loser status
is_winner = (
pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and
pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"]
)
is_loser = (
pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or
pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"]
)
cursor.execute("""
INSERT INTO performance_summary (
record_id, video_id, niche, platform, retention_1s, retention_2s,
retention_3s, completion_rate, engagement_score, mute_rate,
scroll_away_velocity, beat_alignment_score, is_winner, is_loser,
ingestion_timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record.record_id, record.video_id, record.niche, record.platform,
pm.retention_1s, pm.retention_2s, pm.retention_3s, pm.completion_rate,
pm.platform_engagement_score, pm.mute_rate, pm.scroll_away_velocity,
ba.sync_quality_score, 1 if is_winner else 0, 1 if is_loser else 0,
record.ingestion_timestamp
))
def _check_extreme_performance(self, record: AudioRecord):
"""Check for extreme success/failure and emit events."""
pm = record.platform_metrics
if (pm.retention_2s >= self.thresholds["extreme_success_retention_2s"] and
pm.platform_engagement_score >= self.thresholds["extreme_success_engagement"]):
self._emit_event(EventType.EXTREME_SUCCESS, record)
self._log_event(EventType.EXTREME_SUCCESS, record.record_id, {
"retention_2s": pm.retention_2s,
"engagement": pm.platform_engagement_score,
"niche": record.niche
})
logger.info(f"EXTREME SUCCESS: {record.video_id} (ret={pm.retention_2s:.2f})")
elif (pm.retention_2s <= self.thresholds["extreme_failure_retention_2s"] or
pm.platform_engagement_score <= self.thresholds["extreme_failure_engagement"]):
self._emit_event(EventType.EXTREME_FAILURE, record)
self._log_event(EventType.EXTREME_FAILURE, record.record_id, {
"retention_2s": pm.retention_2s,
"engagement": pm.platform_engagement_score,
"mute_rate": pm.mute_rate,
"niche": record.niche
})
logger.warning(f"EXTREME FAILURE: {record.video_id} (ret={pm.retention_2s:.2f})")
def get_winners_vs_losers(
self,
filters: Optional[Dict[str, Any]] = None,
limit_per_group: int = 1000
) -> Dict[str, List[AudioRecord]]:
"""
Get winners and losers for comparison (critical for RL).
Returns:
{"winners": [...], "losers": [...]}
"""
verify_caller_authorization()
with self.lock:
winners = self._query_performance_group(filters, "winners", limit_per_group)
losers = self._query_performance_group(filters, "losers", limit_per_group)
return {
"winners": winners,
"losers": losers
}
def analyze_retention_killers(
self,
filters: Optional[Dict[str, Any]] = None,
threshold_ms: int = 2000
) -> List[Dict[str, Any]]:
"""
Identify what killed retention early (< threshold_ms).
Returns list of analysis dicts with audio feature correlations.
"""
verify_caller_authorization()
with self.lock:
# Get records with early retention failure
query = """
SELECT ar.* FROM audio_records ar
JOIN performance_summary ps ON ar.record_id = ps.record_id
WHERE ps.retention_2s < 0.3
"""
params = []
if filters:
conditions = []
for key, value in filters.items():
if key in ["niche", "platform", "beat_id"]:
conditions.append(f"ar.{key} = ?")
params.append(value)
if conditions:
query += " AND " + " AND ".join(conditions)
query += " ORDER BY ps.retention_2s ASC LIMIT 500"
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
rows = cursor.fetchall()
if not rows:
return []
records = [self._row_to_record(row) for row in rows]
# Analyze common patterns
analyses = []
for record in records:
af = record.audio_features
pm = record.platform_metrics
killers = []
# Check first 2 seconds of audio
early_syllables = [s for s in af.syllable_features if s.start_time_ms < threshold_ms]
if early_syllables:
avg_energy = mean([s.energy_db for s in early_syllables])
if avg_energy < -50:
killers.append("low_energy_start")
avg_beat_error = mean([abs(s.beat_alignment_error_ms) for s in early_syllables])
if avg_beat_error > 150:
killers.append("poor_beat_alignment_start")
# Check pause density early
early_pauses = [p for p in af.pause_metrics.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment