Skip to content

Instantly share code, notes, and snippets.

@bogged-broker
Created December 30, 2025 21:57
Show Gist options
  • Select an option

  • Save bogged-broker/718a2c2f134ce397925df178467bb79a to your computer and use it in GitHub Desktop.

Select an option

Save bogged-broker/718a2c2f134ce397925df178467bb79a to your computer and use it in GitHub Desktop.
"""
🔊 AUDIO NORMALIZATION ENGINE
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Viral Audio Survivability System - 15/10 Grade | 5M+ Views Baseline Engine
This is NOT a normalizer. This is a VIRAL SURVIVABILITY ENGINE.
Mission:
Transform generated audio into algorithm-optimized, platform-perfect,
retention-maximizing audio that survives compression, mobile playback,
and ranking heuristics without losing emotional impact.
Core Philosophy:
- Platform-aware (not generic LUFS)
- Emotion-preserving (not just loud)
- Learning-based (improves over time)
- Playback-reality-tested (simulates degradation)
- Beat-aligned (preserves rhythm)
- Intelligibility-first (prevents strain)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
import numpy as np
import logging
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import json
import scipy.signal as signal
from scipy.fft import fft, ifft
from scipy.interpolate import interp1d
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════════
# CORE DATA STRUCTURES
# ═══════════════════════════════════════════════════════════════════════════
class Platform(Enum):
"""Platform-specific audio requirements"""
TIKTOK = "tiktok"
INSTAGRAM = "instagram"
YOUTUBE_SHORTS = "youtube_shorts"
YOUTUBE = "youtube"
TWITTER = "twitter"
FACEBOOK = "facebook"
class AudioQuality(Enum):
"""Quality assessment levels"""
VIRAL_READY = "viral_ready"
GOOD = "good"
ACCEPTABLE = "acceptable"
NEEDS_WORK = "needs_work"
FAILED = "failed"
@dataclass
class PlatformAudioProfile:
"""Platform-specific audio requirements and biases"""
platform: Platform
target_lufs: float
true_peak_ceiling: float
short_term_lufs: float
momentary_lufs: float
hook_boost_db: float
compression_ratio: float
algorithm_bias: Dict[str, float]
transcoding_loss: float
mobile_speaker_bias: float
@dataclass
class SyllableMetrics:
"""Per-syllable intelligibility tracking"""
start_time: float
end_time: float
text: str
consonant_energy: float
vowel_clarity: float
formant_integrity: float
is_hook: bool
emotional_weight: float
@dataclass
class BeatAlignment:
"""Beat timing and emphasis data"""
beat_times: List[float]
beat_strengths: List[float]
tempo: float
time_signature: Tuple[int, int]
hook_beats: List[int]
@dataclass
class EmotionProfile:
"""Emotional contour preservation data"""
time_points: List[float]
intensity: List[float]
valence: List[float]
arousal: List[float]
critical_peaks: List[Tuple[float, float]] # (time, importance)
@dataclass
class NormalizationResult:
"""Complete normalization output with diagnostics"""
audio: np.ndarray
sample_rate: int
platform: Platform
quality_score: float
quality_level: AudioQuality
metrics: Dict[str, Any]
confidence_score: float # Likelihood of 5M+ performance
degradation_report: Dict[str, float]
variant_id: str
normalization_params: Dict[str, Any]
@dataclass
class LoudnessMetrics:
"""Complete loudness analysis"""
integrated_lufs: float
short_term_lufs: List[float]
momentary_lufs: List[float]
true_peak: float
hook_lufs: float
first_2s_lufs: float
dynamic_range: float
crest_factor: float
# ═══════════════════════════════════════════════════════════════════════════
# PLATFORM AUDIO PROFILES
# ═══════════════════════════════════════════════════════════════════════════
class PlatformProfileManager:
"""Manages platform-specific audio requirements"""
def __init__(self):
self.profiles = self._initialize_profiles()
self.performance_history: Dict[Platform, List[Dict]] = {}
def _initialize_profiles(self) -> Dict[Platform, PlatformAudioProfile]:
"""Initialize viral-optimized platform profiles"""
return {
Platform.TIKTOK: PlatformAudioProfile(
platform=Platform.TIKTOK,
target_lufs=-14.0, # TikTok actually prefers slightly louder
true_peak_ceiling=-1.0,
short_term_lufs=-12.0,
momentary_lufs=-11.0,
hook_boost_db=1.5,
compression_ratio=3.5,
algorithm_bias={
"early_loudness": 1.8, # First 2s heavily weighted
"consistency": 1.4,
"speech_clarity": 2.0,
"beat_emphasis": 1.6
},
transcoding_loss=0.3,
mobile_speaker_bias=1.2
),
Platform.INSTAGRAM: PlatformAudioProfile(
platform=Platform.INSTAGRAM,
target_lufs=-14.5,
true_peak_ceiling=-1.0,
short_term_lufs=-13.0,
momentary_lufs=-12.0,
hook_boost_db=1.2,
compression_ratio=3.0,
algorithm_bias={
"early_loudness": 1.6,
"consistency": 1.5,
"speech_clarity": 1.8,
"beat_emphasis": 1.4
},
transcoding_loss=0.4,
mobile_speaker_bias=1.3
),
Platform.YOUTUBE_SHORTS: PlatformAudioProfile(
platform=Platform.YOUTUBE_SHORTS,
target_lufs=-14.0,
true_peak_ceiling=-1.0,
short_term_lufs=-13.0,
momentary_lufs=-12.5,
hook_boost_db=1.0,
compression_ratio=2.8,
algorithm_bias={
"early_loudness": 1.5,
"consistency": 1.6,
"speech_clarity": 1.9,
"beat_emphasis": 1.3
},
transcoding_loss=0.2,
mobile_speaker_bias=1.1
),
Platform.YOUTUBE: PlatformAudioProfile(
platform=Platform.YOUTUBE,
target_lufs=-14.0,
true_peak_ceiling=-1.0,
short_term_lufs=-14.0,
momentary_lufs=-13.0,
hook_boost_db=0.8,
compression_ratio=2.5,
algorithm_bias={
"early_loudness": 1.3,
"consistency": 1.7,
"speech_clarity": 1.7,
"beat_emphasis": 1.2
},
transcoding_loss=0.15,
mobile_speaker_bias=1.0
)
}
def get_profile(self, platform: Platform) -> PlatformAudioProfile:
"""Get platform-specific profile"""
return self.profiles[platform]
def update_profile_from_performance(self, platform: Platform,
performance_data: Dict):
"""Adapt profile based on actual performance data"""
if platform not in self.performance_history:
self.performance_history[platform] = []
self.performance_history[platform].append(performance_data)
# Learning: adjust profile based on what's working
if len(self.performance_history[platform]) >= 10:
self._optimize_profile(platform)
def _optimize_profile(self, platform: Platform):
"""Optimize profile based on performance history"""
history = self.performance_history[platform]
profile = self.profiles[platform]
# Find top performers
top_performers = sorted(history,
key=lambda x: x.get('views', 0),
reverse=True)[:5]
if top_performers:
# Adjust target LUFS based on winners
avg_lufs = np.mean([p.get('lufs', profile.target_lufs)
for p in top_performers])
profile.target_lufs = 0.7 * profile.target_lufs + 0.3 * avg_lufs
logger.info(f"📊 Optimized {platform.value} profile: "
f"LUFS={profile.target_lufs:.1f}")
# ═══════════════════════════════════════════════════════════════════════════
# LOUDNESS ANALYSIS ENGINE
# ═══════════════════════════════════════════════════════════════════════════
class LoudnessAnalyzer:
"""ITU-R BS.1770-4 compliant loudness measurement with viral extensions"""
def __init__(self, sample_rate: int = 44100):
self.sample_rate = sample_rate
self.window_size = int(0.4 * sample_rate) # 400ms
self.overlap = int(0.3 * sample_rate) # 300ms overlap
def analyze(self, audio: np.ndarray,
hook_segments: Optional[List[Tuple[float, float]]] = None) -> LoudnessMetrics:
"""Complete loudness analysis"""
# Ensure mono for analysis
if len(audio.shape) > 1:
audio = np.mean(audio, axis=1)
# K-weighting filter (ITU-R BS.1770-4)
audio_weighted = self._apply_k_weighting(audio)
# Integrated LUFS
integrated_lufs = self._calculate_integrated_lufs(audio_weighted)
# Short-term LUFS (3s windows)
short_term_lufs = self._calculate_short_term_lufs(audio_weighted)
# Momentary LUFS (400ms windows)
momentary_lufs = self._calculate_momentary_lufs(audio_weighted)
# True peak
true_peak = self._calculate_true_peak(audio)
# Hook LUFS (if provided)
hook_lufs = integrated_lufs
if hook_segments:
hook_lufs = self._calculate_hook_lufs(audio_weighted, hook_segments)
# First 2 seconds LUFS (critical for algorithms)
first_2s_samples = int(2.0 * self.sample_rate)
first_2s_lufs = self._calculate_integrated_lufs(
audio_weighted[:first_2s_samples]
)
# Dynamic range
dynamic_range = self._calculate_dynamic_range(audio)
# Crest factor
crest_factor = self._calculate_crest_factor(audio)
return LoudnessMetrics(
integrated_lufs=integrated_lufs,
short_term_lufs=short_term_lufs,
momentary_lufs=momentary_lufs,
true_peak=true_peak,
hook_lufs=hook_lufs,
first_2s_lufs=first_2s_lufs,
dynamic_range=dynamic_range,
crest_factor=crest_factor
)
def _apply_k_weighting(self, audio: np.ndarray) -> np.ndarray:
"""Apply ITU-R BS.1770-4 K-weighting filter"""
# Stage 1: High-pass filter (pre-filter)
b_hp, a_hp = signal.butter(2, 100, 'hp', fs=self.sample_rate)
audio_filtered = signal.filtfilt(b_hp, a_hp, audio)
# Stage 2: High-frequency shelving filter
# Simplified implementation of RLB weighting
b_shelf, a_shelf = signal.butter(2, 1000, 'hp', fs=self.sample_rate)
audio_weighted = signal.filtfilt(b_shelf, a_shelf, audio_filtered)
return audio_weighted
def _calculate_integrated_lufs(self, audio: np.ndarray) -> float:
"""Calculate integrated LUFS"""
# Mean square with gating
mean_square = np.mean(audio ** 2)
if mean_square > 0:
lufs = -0.691 + 10 * np.log10(mean_square)
else:
lufs = -70.0 # Silence threshold
return lufs
def _calculate_short_term_lufs(self, audio: np.ndarray) -> List[float]:
"""Calculate short-term LUFS (3s windows)"""
window_size = int(3.0 * self.sample_rate)
hop_size = int(1.0 * self.sample_rate)
short_term = []
for i in range(0, len(audio) - window_size, hop_size):
window = audio[i:i + window_size]
lufs = self._calculate_integrated_lufs(window)
short_term.append(lufs)
return short_term
def _calculate_momentary_lufs(self, audio: np.ndarray) -> List[float]:
"""Calculate momentary LUFS (400ms windows)"""
momentary = []
for i in range(0, len(audio) - self.window_size, self.overlap):
window = audio[i:i + self.window_size]
lufs = self._calculate_integrated_lufs(window)
momentary.append(lufs)
return momentary
def _calculate_true_peak(self, audio: np.ndarray) -> float:
"""Calculate true peak (4x oversampled)"""
# Upsample 4x for true peak detection
upsampled = signal.resample(audio, len(audio) * 4)
true_peak = 20 * np.log10(np.max(np.abs(upsampled)) + 1e-10)
return true_peak
def _calculate_hook_lufs(self, audio: np.ndarray,
hook_segments: List[Tuple[float, float]]) -> float:
"""Calculate LUFS specifically for hook segments"""
hook_audio = []
for start, end in hook_segments:
start_sample = int(start * self.sample_rate)
end_sample = int(end * self.sample_rate)
hook_audio.extend(audio[start_sample:end_sample])
if hook_audio:
return self._calculate_integrated_lufs(np.array(hook_audio))
return -70.0
def _calculate_dynamic_range(self, audio: np.ndarray) -> float:
"""Calculate dynamic range (DR)"""
# RMS of loudest 20% vs average RMS
rms_values = []
window_size = int(0.1 * self.sample_rate)
for i in range(0, len(audio) - window_size, window_size // 2):
window = audio[i:i + window_size]
rms = np.sqrt(np.mean(window ** 2))
rms_values.append(rms)
if rms_values:
rms_values = sorted(rms_values, reverse=True)
top_20_percent = rms_values[:max(1, len(rms_values) // 5)]
peak_rms = np.mean(top_20_percent)
avg_rms = np.mean(rms_values)
if avg_rms > 0:
dr = 20 * np.log10(peak_rms / avg_rms)
return max(0, min(20, dr))
return 0.0
def _calculate_crest_factor(self, audio: np.ndarray) -> float:
"""Calculate crest factor (peak to RMS ratio)"""
peak = np.max(np.abs(audio))
rms = np.sqrt(np.mean(audio ** 2))
if rms > 0:
return 20 * np.log10(peak / rms)
return 0.0
# ═══════════════════════════════════════════════════════════════════════════
# VIRAL-TUNED COMPRESSOR
# ═══════════════════════════════════════════════════════════════════════════
class ViralCompressor:
"""Context-aware, emotion-preserving dynamic range compressor"""
def __init__(self, sample_rate: int = 44100):
self.sample_rate = sample_rate
def compress(self, audio: np.ndarray,
profile: PlatformAudioProfile,
syllables: Optional[List[SyllableMetrics]] = None,
emotion_profile: Optional[EmotionProfile] = None) -> np.ndarray:
"""Apply viral-optimized compression"""
# Multi-band compression for speech clarity
audio_compressed = self._multiband_compress(audio, profile)
# Context-aware compression adjustments
if syllables:
audio_compressed = self._syllable_aware_compress(
audio_compressed, syllables, profile
)
# Emotion-preserving compression
if emotion_profile:
audio_compressed = self._emotion_aware_compress(
audio_compressed, emotion_profile, profile
)
# Upward compression for dead zones
audio_compressed = self._upward_compress_quiet_sections(
audio_compressed, profile
)
return audio_compressed
def _multiband_compress(self, audio: np.ndarray,
profile: PlatformAudioProfile) -> np.ndarray:
"""Multi-band compression with speech band isolation"""
# Define speech-critical bands
bands = [
(80, 250), # Low fundamentals
(250, 2000), # Core speech (most critical)
(2000, 6000), # Consonants and clarity
(6000, 16000) # Air and presence
]
compressed_bands = []
for low, high in bands:
# Bandpass filter
band_audio = self._bandpass_filter(audio, low, high)
# Band-specific compression
if 250 <= low <= 2000: # Core speech band
ratio = profile.compression_ratio * 0.8 # Gentler
elif 2000 <= low <= 6000: # Consonant band
ratio = profile.compression_ratio * 0.6 # Very gentle
else:
ratio = profile.compression_ratio
compressed_band = self._apply_compression(
band_audio,
ratio=ratio,
threshold=-20.0,
attack_ms=5.0,
release_ms=50.0
)
compressed_bands.append(compressed_band)
# Sum bands
return np.sum(compressed_bands, axis=0)
def _bandpass_filter(self, audio: np.ndarray,
low_freq: float, high_freq: float) -> np.ndarray:
"""Apply bandpass filter"""
nyquist = self.sample_rate / 2
low = low_freq / nyquist
high = min(high_freq / nyquist, 0.99)
b, a = signal.butter(4, [low, high], btype='band')
return signal.filtfilt(b, a, audio)
def _apply_compression(self, audio: np.ndarray,
ratio: float,
threshold: float,
attack_ms: float,
release_ms: float,
knee_db: float = 6.0) -> np.ndarray:
"""Apply dynamic range compression"""
# Convert to dB
audio_db = 20 * np.log10(np.abs(audio) + 1e-10)
# Calculate gain reduction with soft knee
gain_reduction = np.zeros_like(audio_db)
for i, db in enumerate(audio_db):
if db > threshold + knee_db:
# Above knee - full compression
gain_reduction[i] = (db - threshold) * (1 - 1/ratio)
elif db > threshold - knee_db:
# In knee - soft transition
knee_factor = ((db - threshold + knee_db) / (2 * knee_db)) ** 2
gain_reduction[i] = knee_factor * (db - threshold) * (1 - 1/ratio)
# Apply attack/release envelope
gain_reduction = self._apply_envelope(
gain_reduction, attack_ms, release_ms
)
# Apply gain reduction
gain_linear = 10 ** (-gain_reduction / 20)
return audio * gain_linear
def _apply_envelope(self, gain_reduction: np.ndarray,
attack_ms: float, release_ms: float) -> np.ndarray:
"""Apply attack/release envelope to gain reduction"""
attack_samples = int(attack_ms * self.sample_rate / 1000)
release_samples = int(release_ms * self.sample_rate / 1000)
envelope = np.zeros_like(gain_reduction)
current_gain = 0.0
for i in range(len(gain_reduction)):
target_gain = gain_reduction[i]
if target_gain > current_gain:
# Attack
alpha = 1.0 / attack_samples if attack_samples > 0 else 1.0
else:
# Release
alpha = 1.0 / release_samples if release_samples > 0 else 1.0
current_gain = current_gain + alpha * (target_gain - current_gain)
envelope[i] = current_gain
return envelope
def _syllable_aware_compress(self, audio: np.ndarray,
syllables: List[SyllableMetrics],
profile: PlatformAudioProfile) -> np.ndarray:
"""Adjust compression based on syllable importance"""
result = audio.copy()
for syl in syllables:
start_sample = int(syl.start_time * self.sample_rate)
end_sample = int(syl.end_time * self.sample_rate)
if start_sample >= len(audio) or end_sample > len(audio):
continue
# Hook syllables get priority
if syl.is_hook:
# Reduce compression (preserve dynamics)
boost = 1.0 + (profile.hook_boost_db / 20)
result[start_sample:end_sample] *= boost
# Preserve consonant transients
if syl.consonant_energy > 0.7:
# Protect first 20ms of syllable
transient_samples = min(int(0.02 * self.sample_rate),
end_sample - start_sample)
transient_boost = 1.1
result[start_sample:start_sample + transient_samples] *= transient_boost
return result
def _emotion_aware_compress(self, audio: np.ndarray,
emotion: EmotionProfile,
profile: PlatformAudioProfile) -> np.ndarray:
"""Preserve emotional peaks during compression"""
result = audio.copy()
for time, importance in emotion.critical_peaks:
sample = int(time * self.sample_rate)
# Protect window around emotional peak
window_size = int(0.1 * self.sample_rate) # 100ms
start = max(0, sample - window_size // 2)
end = min(len(audio), sample + window_size // 2)
# Reduce compression around peak
preservation_factor = 1.0 + (importance * 0.2)
result[start:end] *= preservation_factor
return result
def _upward_compress_quiet_sections(self, audio: np.ndarray,
profile: PlatformAudioProfile) -> np.ndarray:
"""Apply upward compression to prevent disengagement"""
# Find quiet sections (potential dead zones)
window_size = int(0.5 * self.sample_rate)
threshold_rms = 0.05 # Quiet threshold
result = audio.copy()
for i in range(0, len(audio) - window_size, window_size // 2):
window = audio[i:i + window_size]
rms = np.sqrt(np.mean(window ** 2))
if rms < threshold_rms and rms > 0:
# Boost quiet sections
boost = threshold_rms / rms
boost = min(boost, 3.0) # Limit boost
result[i:i + window_size] *= boost
return result
# ═══════════════════════════════════════════════════════════════════════════
# PSYCHOACOUSTIC LIMITER
# ═══════════════════════════════════════════════════════════════════════════
class PsychoacousticLimiter:
"""Emotion-preserving peak limiter"""
def __init__(self, sample_rate: int = 44100):
self.sample_rate = sample_rate
def limit(self, audio: np.ndarray,
ceiling: float,
emotion_profile: Optional[EmotionProfile] = None,
syllables: Optional[List[SyllableMetrics]] = None) -> np.ndarray:
"""Apply intelligent peak limiting"""
# Convert ceiling to linear
ceiling_linear = 10 ** (ceiling / 20)
# Identify critical regions (emotion peaks, hook syllables)
critical_regions = self._identify_critical_regions(
len(audio), emotion_profile, syllables
)
# Apply adaptive limiting
limited = audio.copy()
for i in range(len(audio)):
if np.abs(audio[i]) > ceiling_linear:
# Check if in critical region
is_critical = any(start <= i < end
for start, end, _ in critical_regions)
if is_critical:
# Gentle soft-clipping for critical regions
limited[i] = self._soft_clip(audio[i], ceiling_linear,
hardness=0.3)
else:
# Standard limiting for non-critical regions
limited[i] = self._soft_clip(audio[i], ceiling_linear,
hardness=0.8)
# Transient preservation
limited = self._preserve_transients(audio, limited, syllables)
return limited
def _identify_critical_regions(self, audio_length: int,
emotion_profile: Optional[EmotionProfile],
syllables: Optional[List[SyllableMetrics]]
) -> List[Tuple[int, int, float]]:
"""Identify regions where limiting should be gentle"""
regions = []
# Emotion peaks
if emotion_profile:
for time, importance in emotion_profile.critical_peaks:
sample = int(time * self.sample_rate)
window = int(0.1 * self.sample_rate)
regions.append((
max(0, sample - window),
min(audio_length, sample + window),
importance
))
# Hook syllables
if syllables:
for syl in syllables:
if syl.is_hook or syl.emotional_weight > 0.7:
start = int(syl.start_time * self.sample_rate)
end = int(syl.end_time * self.sample_rate)
regions.append((start, end, syl.emotional_weight))
return regions
def _soft_clip(self, sample: float, ceiling: float,
hardness: float = 0.5) -> float:
"""Soft clipping function"""
if np.abs(sample) <= ceiling:
return sample
# Tanh-based soft clipping
sign = np.sign(sample)
excess = np.abs(sample) - ceiling
# Softer clipping for lower hardness
clipped_excess = excess * (1 - hardness) + \
ceiling * np.tanh(excess / ceiling) * hardness
return sign * (ceiling + clipped_excess * 0.5)
def _preserve_transients(self, original: np.ndarray,
limited: np.ndarray,
syllables: Optional[List[SyllableMetrics]]
) -> np.ndarray:
"""Restore transient energy lost during limiting"""
if not syllables:
return limited
result = limited.copy()
for syl in syllables:
if syl.consonant_energy > 0.6:
# Find transient at syllable start
start_sample = int(syl.start_time * self.sample_rate)
transient_length = min(int(0.02 * self.sample_rate),
int((syl.end_time - syl.start_time)
* self.sample_rate))
if start_sample + transient_length > len(result):
continue
# Restore some transient energy
original_transient = original[start_sample:start_sample + transient_length]
limited_transient = limited[start_sample:start_sample + transient_length]
# Blend to restore sharpness
restoration_factor = 0.3
result[start_sample:start_sample + transient_length] = \
limited_transient * (1 - restoration_factor) + \
original_transient * restoration_factor
return result
# ═══════════════════════════════════════════════════════════════════════════
# INTELLIGIBILITY GUARDIAN
# ═══════════════════════════════════════════════════════════════════════════
class IntelligibilityGuardian:
"""Ensures speech remains crystal clear after processing"""
def __init__(self, sample_rate: int = 44100):
self.sample_rate = sample_rate
def analyze_intelligibility(self, audio: np.ndarray,
syllables: List[SyllableMetrics]
) -> Dict[str, float]:
"""Analyze speech intelligibility metrics"""
metrics = {
'consonant_clarity': 0.0,
'vowel_quality': 0.0,
'formant_integrity': 0.0,
'first_2s_clarity': 0.0,
'overall_score': 0.0
}
if not syllables:
return metrics
# Per-syllable analysis
consonant_scores = []
vowel_scores = []
formant_scores = []
for syl in syllables:
consonant_scores.append(self._measure_consonant_energy(audio, syl))
vowel_scores.append(self._measure_vowel_clarity(audio, syl))
formant_scores.append(self._measure_formant_integrity(audio, syl))
metrics['consonant_clarity'] = np.mean(consonant_scores) if consonant_scores else 0.0
metrics['vowel_quality'] = np.mean(vowel_scores) if vowel_scores else 0.0
metrics['formant_integrity'] = np.mean(formant_scores) if formant_scores else 0.0
# First 2 seconds analysis (CRITICAL for algorithms)
first_2s_syllables = [s for s in syllables if s.start_time < 2.0]
if first_2s_syllables:
first_2s_scores = [self._measure_syllable_clarity(audio, s)
for s in first_2s_syllables]
metrics['first_2s_clarity'] = np.mean(first_2s_scores)
# Overall score
metrics['overall_score'] = (
metrics['consonant_clarity'] * 0.35 +
metrics['vowel_quality'] * 0.25 +
metrics['formant_integrity'] * 0.20 +
metrics['first_2s_clarity'] * 0.20
)
return metrics
def enhance_intelligibility(self, audio: np.ndarray,
syllables: List[SyllableMetrics],
target_score: float = 0.85
) -> np.ndarray:
"""Enhance speech intelligibility"""
enhanced = audio.copy()
for syl in syllables:
start = int(syl.start_time * self.sample_rate)
end = int(syl.end_time * self.sample_rate)
if start >= len(audio) or end > len(audio):
continue
# Extract syllable
syllable_audio = audio[start:end]
# Enhance based on needs
if syl.consonant_energy < 0.6:
syllable_audio = self._boost_consonants(syllable_audio)
if syl.vowel_clarity < 0.7:
syllable_audio = self._enhance_vowels(syllable_audio)
if syl.formant_integrity < 0.75:
syllable_audio = self._restore_formants(syllable_audio)
enhanced[start:end] = syllable_audio
return enhanced
def _measure_consonant_energy(self, audio: np.ndarray,
syl: SyllableMetrics) -> float:
"""Measure consonant energy in syllable"""
start = int(syl.start_time * self.sample_rate)
end = int(syl.end_time * self.sample_rate)
if start >= len(audio) or end > len(audio):
return 0.0
# High-frequency energy (consonants)
syllable = audio[start:end]
hf_energy = self._bandpass_energy(syllable, 2000, 8000)
total_energy = np.sum(syllable ** 2) + 1e-10
return min(1.0, hf_energy / total_energy * 5.0)
def _measure_vowel_clarity(self, audio: np.ndarray,
syl: SyllableMetrics) -> float:
"""Measure vowel clarity"""
start = int(syl.start_time * self.sample_rate)
end = int(syl.end_time * self.sample_rate)
if start >= len(audio) or end > len(audio):
return 0.0
# Mid-frequency stability (vowels)
syllable = audio[start:end]
mf_energy = self._bandpass_energy(syllable, 300, 3000)
total_energy = np.sum(syllable ** 2) + 1e-10
return min(1.0, mf_energy / total_energy * 2.0)
def _measure_formant_integrity(self, audio: np.ndarray,
syl: SyllableMetrics) -> float:
"""Measure formant structure preservation"""
start = int(syl.start_time * self.sample_rate)
end = int(syl.end_time * self.sample_rate)
if start >= len(audio) or end > len(audio):
return 0.0
syllable = audio[start:end]
# Check formant bands (F1, F2, F3)
f1_energy = self._bandpass_energy(syllable, 300, 900)
f2_energy = self._bandpass_energy(syllable, 900, 2500)
f3_energy = self._bandpass_energy(syllable, 2500, 4000)
# Good formant structure has clear peaks
formant_ratio = (f1_energy + f2_energy + f3_energy) / (np.sum(syllable ** 2) + 1e-10)
return min(1.0, formant_ratio * 3.0)
def _measure_syllable_clarity(self, audio: np.ndarray,
syl: SyllableMetrics) -> float:
"""Overall syllable clarity score"""
consonant = self._measure_consonant_energy(audio, syl)
vowel = self._measure_vowel_clarity(audio, syl)
formant = self._measure_formant_integrity(audio, syl)
return (consonant * 0.4 + vowel * 0.3 + formant * 0.3)
def _bandpass_energy(self, audio: np.ndarray,
low_freq: float, high_freq: float) -> float:
"""Calculate energy in frequency band"""
nyquist = self.sample_rate / 2
low = low_freq / nyquist
high = min(high_freq / nyquist, 0.99)
b, a = signal.butter(4, [low, high], btype='band')
filtered = signal.filtfilt(b, a, audio)
return np.sum(filtered ** 2)
def _boost_consonants(self, audio: np.ndarray) -> np.ndarray:
"""Boost high-frequency consonant energy"""
# Gentle high-shelf boost
nyquist = self.sample_rate / 2
freq = 2500 / nyquist
b, a = signal.butter(2, freq, btype='high')
hf = signal.filtfilt(b, a, audio)
return audio + hf * 0.3
def _enhance_vowels(self, audio: np.ndarray) -> np.ndarray:
"""Enhance vowel clarity"""
# Mid-frequency emphasis
nyquist = self.sample_rate / 2
b, a = signal.butter(4, [300/nyquist, 3000/nyquist], btype='band')
mf = signal.filtfilt(b, a, audio)
return audio * 0.8 + mf * 0.4
def _restore_formants(self, audio: np.ndarray) -> np.ndarray:
"""Restore formant structure"""
# Multi-band formant enhancement
f1 = self._bandpass_filter(audio, 300, 900) * 1.2
f2 = self._bandpass_filter(audio, 900, 2500) * 1.15
f3 = self._bandpass_filter(audio, 2500, 4000) * 1.1
return audio * 0.7 + (f1 + f2 + f3) * 0.3
def _bandpass_filter(self, audio: np.ndarray,
low_freq: float, high_freq: float) -> np.ndarray:
"""Apply bandpass filter"""
nyquist = self.sample_rate / 2
low = low_freq / nyquist
high = min(high_freq / nyquist, 0.99)
b, a = signal.butter(4, [low, high], btype='band')
return signal.filtfilt(b, a, audio)
# ═══════════════════════════════════════════════════════════════════════════
# PLAYBACK REALITY SIMULATOR
# ═══════════════════════════════════════════════════════════════════════════
class PlaybackSimulator:
"""Simulates real-world playback degradation"""
def __init__(self, sample_rate: int = 44100):
self.sample_rate = sample_rate
def simulate_degradation(self, audio: np.ndarray,
platform: Platform) -> Dict[str, np.ndarray]:
"""Simulate various playback scenarios"""
scenarios = {
'iphone_speaker': self._simulate_iphone_speaker(audio),
'android_budget': self._simulate_android_budget(audio),
'cheap_earbuds': self._simulate_cheap_earbuds(audio),
'airpods': self._simulate_airpods(audio),
'platform_transcode': self._simulate_platform_transcode(audio, platform)
}
return scenarios
def evaluate_degraded_audio(self, original: np.ndarray,
degraded_scenarios: Dict[str, np.ndarray],
syllables: List[SyllableMetrics]
) -> Dict[str, float]:
"""Evaluate how audio survives degradation"""
scores = {}
analyzer = LoudnessAnalyzer(self.sample_rate)
intel_guardian = IntelligibilityGuardian(self.sample_rate)
for scenario, degraded in degraded_scenarios.items():
# Loudness consistency
orig_metrics = analyzer.analyze(original)
deg_metrics = analyzer.analyze(degraded)
loudness_delta = abs(orig_metrics.integrated_lufs -
deg_metrics.integrated_lufs)
loudness_score = max(0, 1.0 - loudness_delta / 5.0)
# Intelligibility preservation
intel_metrics = intel_guardian.analyze_intelligibility(degraded, syllables)
intel_score = intel_metrics['overall_score']
# Overall score
scores[scenario] = (loudness_score * 0.4 + intel_score * 0.6)
return scores
def _simulate_iphone_speaker(self, audio: np.ndarray) -> np.ndarray:
"""Simulate iPhone mono speaker"""
# Mono conversion
if len(audio.shape) > 1:
audio = np.mean(audio, axis=1)
# Limited frequency response (200Hz - 8kHz)
nyquist = self.sample_rate / 2
b_hp, a_hp = signal.butter(4, 200/nyquist, 'high')
b_lp, a_lp = signal.butter(4, 8000/nyquist, 'low')
filtered = signal.filtfilt(b_hp, a_hp, audio)
filtered = signal.filtfilt(b_lp, a_lp, filtered)
# Small speaker resonance (boost around 1-2kHz)
b_peak, a_peak = signal.butter(2, [1000/nyquist, 2000/nyquist], 'band')
resonance = signal.filtfilt(b_peak, a_peak, filtered)
return filtered + resonance * 0.3
def _simulate_android_budget(self, audio: np.ndarray) -> np.ndarray:
"""Simulate budget Android device speaker"""
if len(audio.shape) > 1:
audio = np.mean(audio, axis=1)
# Even more limited response (300Hz - 6kHz)
nyquist = self.sample_rate / 2
b_hp, a_hp = signal.butter(3, 300/nyquist, 'high')
b_lp, a_lp = signal.butter(3, 6000/nyquist, 'low')
filtered = signal.filtfilt(b_hp, a_hp, audio)
filtered = signal.filtfilt(b_lp, a_lp, filtered)
# Add slight distortion
filtered = np.tanh(filtered * 1.2) * 0.85
return filtered
def _simulate_cheap_earbuds(self, audio: np.ndarray) -> np.ndarray:
"""Simulate low-quality earbuds"""
if len(audio.shape) > 1:
audio = np.mean(audio, axis=1)
# Boosted bass, harsh treble
nyquist = self.sample_rate / 2
# Bass boost (80-200Hz)
b_bass, a_bass = signal.butter(2, [80/nyquist, 200/nyquist], 'band')
bass = signal.filtfilt(b_bass, a_bass, audio)
# Harsh treble (5-10kHz)
b_treble, a_treble = signal.butter(2, [5000/nyquist, 10000/nyquist], 'band')
treble = signal.filtfilt(b_treble, a_treble, audio)
return audio + bass * 0.4 + treble * 0.3
def _simulate_airpods(self, audio: np.ndarray) -> np.ndarray:
"""Simulate AirPods with spatial audio compression"""
if len(audio.shape) > 1:
audio = np.mean(audio, axis=1)
# Relatively flat but with AAC compression artifacts
# Gentle high-frequency roll-off
nyquist = self.sample_rate / 2
b_lp, a_lp = signal.butter(6, 12000/nyquist, 'low')
filtered = signal.filtfilt(b_lp, a_lp, audio)
# Slight compression
filtered = np.tanh(filtered * 1.1) * 0.95
return filtered
def _simulate_platform_transcode(self, audio: np.ndarray,
platform: Platform) -> np.ndarray:
"""Simulate platform transcoding"""
# Platforms re-encode audio, losing some quality
# Simulate lossy compression (simplified)
# Real platforms use AAC/Opus with various bitrates
# Low-pass filter to simulate bandwidth limitation
nyquist = self.sample_rate / 2
cutoff = 15000 if platform == Platform.YOUTUBE else 12000
b_lp, a_lp = signal.butter(8, cutoff/nyquist, 'low')
transcoded = signal.filtfilt(b_lp, a_lp, audio)
# Slight volume reduction (normalization by platform)
transcoded *= 0.95
return transcoded
# ═══════════════════════════════════════════════════════════════════════════
# BEAT ALIGNMENT PRESERVATION ENGINE
# ═══════════════════════════════════════════════════════════════════════════
class BeatPreservationEngine:
"""Ensures normalization doesn't destroy rhythmic integrity"""
def __init__(self, sample_rate: int = 44100):
self.sample_rate = sample_rate
def preserve_beats(self, audio: np.ndarray,
original_audio: np.ndarray,
beat_alignment: BeatAlignment) -> np.ndarray:
"""Restore beat emphasis lost during normalization"""
result = audio.copy()
for i, beat_time in enumerate(beat_alignment.beat_times):
beat_sample = int(beat_time * self.sample_rate)
strength = beat_alignment.beat_strengths[i]
is_hook = i in beat_alignment.hook_beats
if beat_sample >= len(audio):
continue
# Define beat window
window_size = int(0.1 * self.sample_rate) # 100ms
start = max(0, beat_sample - window_size // 4)
end = min(len(audio), beat_sample + window_size)
# Calculate transient loss
orig_window = original_audio[start:end]
proc_window = audio[start:end]
orig_peak = np.max(np.abs(orig_window))
proc_peak = np.max(np.abs(proc_window))
if orig_peak > 0 and proc_peak > 0:
loss_ratio = proc_peak / orig_peak
# Restore transient if significantly lost
if loss_ratio < 0.9:
restoration = min(0.3, (1.0 - loss_ratio) * 0.5)
# Extra restoration for hook beats
if is_hook:
restoration *= 1.5
# Blend to restore punch
result[start:end] = (
proc_window * (1 - restoration) +
orig_window * restoration
)
return result
def detect_timing_drift(self, audio: np.ndarray,
beat_alignment: BeatAlignment) -> List[float]:
"""Detect micro-timing drift caused by processing"""
drift_amounts = []
for beat_time in beat_alignment.beat_times:
beat_sample = int(beat_time * self.sample_rate)
if beat_sample >= len(audio) - 1000:
continue
# Search for actual transient near expected beat
search_window = int(0.05 * self.sample_rate) # ±50ms
start = max(0, beat_sample - search_window)
end = min(len(audio), beat_sample + search_window)
window = audio[start:end]
# Find peak transient
envelope = np.abs(window)
peak_idx = np.argmax(envelope)
# Calculate drift
expected_peak = search_window
drift_samples = peak_idx - expected_peak
drift_ms = (drift_samples / self.sample_rate) * 1000
drift_amounts.append(drift_ms)
return drift_amounts
def apply_envelope_shaping(self, audio: np.ndarray,
beat_alignment: BeatAlignment) -> np.ndarray:
"""Apply beat-preserving envelope shaping"""
result = audio.copy()
envelope = np.ones_like(audio)
for i, beat_time in enumerate(beat_alignment.beat_times):
beat_sample = int(beat_time * self.sample_rate)
strength = beat_alignment.beat_strengths[i]
if beat_sample >= len(audio):
continue
# Create emphasis envelope around beat
window_size = int(0.15 * self.sample_rate)
start = max(0, beat_sample - window_size // 2)
end = min(len(audio), beat_sample + window_size // 2)
# Gaussian-ish envelope
x = np.linspace(-2, 2, end - start)
beat_envelope = 1.0 + strength * 0.15 * np.exp(-x**2)
envelope[start:end] *= beat_envelope
return result * envelope
# ═══════════════════════════════════════════════════════════════════════════
# LEARNING & REINFORCEMENT SYSTEM
# ═══════════════════════════════════════════════════════════════════════════
class NormalizationLearner:
"""Learns optimal normalization parameters from performance data"""
def __init__(self, storage_path: Path):
self.storage_path = storage_path
self.storage_path.mkdir(parents=True, exist_ok=True)
self.history_file = self.storage_path / "normalization_history.json"
self.history = self._load_history()
def _load_history(self) -> List[Dict]:
"""Load historical performance data"""
if self.history_file.exists():
with open(self.history_file, 'r') as f:
return json.load(f)
return []
def _save_history(self):
"""Save history to disk"""
with open(self.history_file, 'w') as f:
json.dump(self.history[-1000:], f, indent=2) # Keep last 1000
def record_performance(self, normalization_params: Dict,
performance_metrics: Dict):
"""Record normalization parameters and resulting performance"""
entry = {
'timestamp': performance_metrics.get('timestamp', ''),
'platform': performance_metrics.get('platform', ''),
'niche': performance_metrics.get('niche', ''),
'params': normalization_params,
'views': performance_metrics.get('views', 0),
'retention_2s': performance_metrics.get('retention_2s', 0.0),
'hook_replay_rate': performance_metrics.get('hook_replay_rate', 0.0),
'completion_rate': performance_metrics.get('completion_rate', 0.0),
'engagement_score': performance_metrics.get('engagement_score', 0.0)
}
self.history.append(entry)
self._save_history()
logger.info(f"📊 Recorded performance: {entry['views']} views, "
f"{entry['retention_2s']:.2%} 2s retention")
def get_optimal_params(self, platform: str, niche: str) -> Dict:
"""Get optimal parameters based on historical performance"""
# Filter relevant history
relevant = [h for h in self.history
if h['platform'] == platform and h['niche'] == niche]
if len(relevant) < 5:
# Not enough data, return defaults
return self._get_default_params()
# Find top performers
top_performers = sorted(relevant,
key=lambda x: x['engagement_score'],
reverse=True)[:10]
# Average their parameters
optimal = self._average_params([p['params'] for p in top_performers])
logger.info(f"🎯 Using learned optimal params for {platform}/{niche}")
return optimal
def _get_default_params(self) -> Dict:
"""Default normalization parameters"""
return {
'target_lufs': -14.0,
'compression_ratio': 3.0,
'hook_boost_db': 1.2,
'attack_ms': 5.0,
'release_ms': 50.0
}
def _average_params(self, param_list: List[Dict]) -> Dict:
"""Average multiple parameter sets"""
if not param_list:
return self._get_default_params()
averaged = {}
keys = param_list[0].keys()
for key in keys:
values = [p[key] for p in param_list if key in p]
averaged[key] = np.mean(values) if values else 0.0
return averaged
# ═══════════════════════════════════════════════════════════════════════════
# MAIN NORMALIZATION ENGINE
# ═══════════════════════════════════════════════════════════════════════════
class AudioNormalizationEngine:
"""
🔥 VIRAL AUDIO SURVIVABILITY ENGINE 🔥
The gatekeeper between good audio and 5M+ view inevitability.
"""
def __init__(self, storage_path: Path = Path("./normalization_data")):
self.sample_rate = 44100
# Initialize all subsystems
self.platform_manager = PlatformProfileManager()
self.loudness_analyzer = LoudnessAnalyzer(self.sample_rate)
self.compressor = ViralCompressor(self.sample_rate)
self.limiter = PsychoacousticLimiter(self.sample_rate)
self.intelligibility = IntelligibilityGuardian(self.sample_rate)
self.playback_sim = PlaybackSimulator(self.sample_rate)
self.beat_preserver = BeatPreservationEngine(self.sample_rate)
self.learner = NormalizationLearner(storage_path)
logger.info("🔊 Audio Normalization Engine initialized - VIRAL MODE ACTIVE")
def normalize(self, audio: np.ndarray,
platform: Platform,
syllables: Optional[List[SyllableMetrics]] = None,
beat_alignment: Optional[BeatAlignment] = None,
emotion_profile: Optional[EmotionProfile] = None,
hook_segments: Optional[List[Tuple[float, float]]] = None,
niche: str = "general",
variant_id: str = "default") -> NormalizationResult:
"""
🎯 NORMALIZE AUDIO FOR VIRAL SUCCESS
This is the main entry point. Everything flows through here.
"""
logger.info(f"🎬 Normalizing for {platform.value} | Variant: {variant_id}")
# Get platform profile
profile = self.platform_manager.get_profile(platform)
# Apply learning (get optimal params for this platform/niche)
learned_params = self.learner.get_optimal_params(platform.value, niche)
self._apply_learned_params(profile, learned_params)
# Store original for comparison
original_audio = audio.copy()
# ═══════════════════════════════════════════════════════════════════
# STEP 1: PRE-ANALYSIS
# ═══════════════════════════════════════════════════════════════════
pre_loudness = self.loudness_analyzer.analyze(audio, hook_segments)
logger.info(f"📊 Pre-normalization: {pre_loudness.integrated_lufs:.1f} LUFS")
# ═══════════════════════════════════════════════════════════════════
# STEP 2: INTELLIGENT COMPRESSION
# ═══════════════════════════════════════════════════════════════════
audio = self.compressor.compress(
audio, profile, syllables, emotion_profile
)
logger.info("✅ Viral-tuned compression applied")
# ═══════════════════════════════════════════════════════════════════
# STEP 3: LOUDNESS NORMALIZATION
# ═══════════════════════════════════════════════════════════════════
audio = self._normalize_loudness(
audio, profile, hook_segments, syllables
)
logger.info(f"🎚️ Normalized to target: {profile.target_lufs:.1f} LUFS")
# ═══════════════════════════════════════════════════════════════════
# STEP 4: PSYCHOACOUSTIC LIMITING
# ═══════════════════════════════════════════════════════════════════
audio = self.limiter.limit(
audio, profile.true_peak_ceiling, emotion_profile, syllables
)
logger.info(f"🎚️ Limited to {profile.true_peak_ceiling:.1f} dB peak")
# ═══════════════════════════════════════════════════════════════════
# STEP 5: INTELLIGIBILITY PROTECTION
# ═══════════════════════════════════════════════════════════════════
if syllables:
intel_metrics = self.intelligibility.analyze_intelligibility(
audio, syllables
)
if intel_metrics['overall_score'] < 0.75:
logger.warning(f"⚠️ Low intelligibility: "
f"{intel_metrics['overall_score']:.2%}")
audio = self.intelligibility.enhance_intelligibility(
audio, syllables, target_score=0.85
)
logger.info("✅ Intelligibility enhanced")
# ═══════════════════════════════════════════════════════════════════
# STEP 6: BEAT ALIGNMENT PRESERVATION
# ═══════════════════════════════════════════════════════════════════
if beat_alignment:
audio = self.beat_preserver.preserve_beats(
audio, original_audio, beat_alignment
)
drift = self.beat_preserver.detect_timing_drift(audio, beat_alignment)
avg_drift = np.mean(np.abs(drift)) if drift else 0.0
if avg_drift > 5.0: # More than 5ms average drift
logger.warning(f"⚠️ Beat timing drift detected: {avg_drift:.1f}ms")
else:
logger.info(f"✅ Beat alignment preserved (drift: {avg_drift:.1f}ms)")
# ═══════════════════════════════════════════════════════════════════
# STEP 7: PLAYBACK REALITY TESTING
# ═══════════════════════════════════════════════════════════════════
degraded_scenarios = self.playback_sim.simulate_degradation(
audio, platform
)
degradation_scores = self.playback_sim.evaluate_degraded_audio(
audio, degraded_scenarios, syllables or []
)
min_score = min(degradation_scores.values())
logger.info(f"📱 Playback survival: {min_score:.2%} (worst case)")
# FAIL CHECK: If audio doesn't survive playback
if min_score < 0.70:
logger.error(f"❌ FAILED: Audio doesn't survive real-world playback")
quality_level = AudioQuality.FAILED
else:
quality_level = self._assess_quality(min_score)
# ═══════════════════════════════════════════════════════════════════
# STEP 8: FINAL ANALYSIS & SCORING
# ═══════════════════════════════════════════════════════════════════
post_loudness = self.loudness_analyzer.analyze(audio, hook_segments)
metrics = {
'pre_lufs': pre_loudness.integrated_lufs,
'post_lufs': post_loudness.integrated_lufs,
'target_lufs': profile.target_lufs,
'true_peak': post_loudness.true_peak,
'dynamic_range': post_loudness.dynamic_range,
'hook_lufs': post_loudness.hook_lufs,
'first_2s_lufs': post_loudness.first_2s_lufs,
'playback_survival': degradation_scores,
'intelligibility': intel_metrics if syllables else {},
'beat_drift_ms': avg_drift if beat_alignment else 0.0
}
# Calculate confidence score (likelihood of viral success)
confidence_score = self._calculate_confidence(
metrics, quality_level, profile
)
# Quality score (0-100)
quality_score = self._calculate_quality_score(metrics, confidence_score)
logger.info(f"🎯 Quality: {quality_score:.1f}/100 | "
f"Confidence: {confidence_score:.1%} | "
f"Status: {quality_level.value}")
# ═══════════════════════════════════════════════════════════════════
# RETURN COMPLETE RESULT
# ═══════════════════════════════════════════════════════════════════
return NormalizationResult(
audio=audio,
sample_rate=self.sample_rate,
platform=platform,
quality_score=quality_score,
quality_level=quality_level,
metrics=metrics,
confidence_score=confidence_score,
degradation_report=degradation_scores,
variant_id=variant_id,
normalization_params=learned_params
)
def normalize_variants(self, audio_variants: List[np.ndarray],
platform: Platform,
**kwargs) -> NormalizationResult:
"""
🔥 NORMALIZE MULTIPLE VARIANTS AND SELECT BEST
5M+ videos come from selection, not hope.
"""
logger.info(f"🎲 Normalizing {len(audio_variants)} variants...")
results = []
for i, audio in enumerate(audio_variants):
variant_id = f"variant_{i+1}"
result = self.normalize(
audio, platform, variant_id=variant_id, **kwargs
)
results.append(result)
# Select best variant
best = max(results, key=lambda r: r.confidence_score)
logger.info(f"🏆 Best variant: {best.variant_id} "
f"(confidence: {best.confidence_score:.1%})")
return best
def _normalize_loudness(self, audio: np.ndarray,
profile: PlatformAudioProfile,
hook_segments: Optional[List[Tuple[float, float]]],
syllables: Optional[List[SyllableMetrics]]) -> np.ndarray:
"""Platform-aware loudness normalization"""
# Analyze current loudness
current_loudness = self.loudness_analyzer.analyze(audio, hook_segments)
# Calculate gain needed
target = profile.target_lufs
current = current_loudness.integrated_lufs
gain_db = target - current
gain_linear = 10 ** (gain_db / 20)
# Apply base gain
audio = audio * gain_linear
# Hook-weighted normalization
if hook_segments and syllables:
audio = self._apply_hook_weighting(
audio, hook_segments, syllables, profile
)
# First 2s boost (critical for algorithms)
audio = self._boost_first_2_seconds(audio, profile)
return audio
def _apply_hook_weighting(self, audio: np.ndarray,
hook_segments: List[Tuple[float, float]],
syllables: List[SyllableMetrics],
profile: PlatformAudioProfile) -> np.ndarray:
"""Apply hook-priority loudness weighting"""
result = audio.copy()
for start, end in hook_segments:
start_sample = int(start * self.sample_rate)
end_sample = int(end * self.sample_rate)
if start_sample >= len(audio) or end_sample > len(audio):
continue
# Boost hook segments
boost_db = profile.hook_boost_db
boost_linear = 10 ** (boost_db / 20)
result[start_sample:end_sample] *= boost_linear
return result
def _boost_first_2_seconds(self, audio: np.ndarray,
profile: PlatformAudioProfile) -> np.ndarray:
"""Boost first 2 seconds (algorithms heavily weight this)"""
first_2s_samples = int(2.0 * self.sample_rate)
if len(audio) < first_2s_samples:
return audio
# Analyze first 2s loudness
first_2s = audio[:first_2s_samples]
first_2s_loudness = self.loudness_analyzer._calculate_integrated_lufs(
self.loudness_analyzer._apply_k_weighting(first_2s)
)
# If first 2s is quieter than target, boost it
target_first_2s = profile.momentary_lufs
if first_2s_loudness < target_first_2s:
boost_db = min(2.0, target_first_2s - first_2s_loudness)
boost_linear = 10 ** (boost_db / 20)
# Apply smooth fade-in boost
fade_samples = int(0.5 * self.sample_rate)
fade_curve = np.linspace(boost_linear, 1.0, fade_samples)
audio[:fade_samples] *= fade_curve
audio[fade_samples:first_2s_samples] *= boost_linear
return audio
def _assess_quality(self, min_survival_score: float) -> AudioQuality:
"""Assess overall audio quality"""
if min_survival_score >= 0.90:
return AudioQuality.VIRAL_READY
elif min_survival_score >= 0.80:
return AudioQuality.GOOD
elif min_survival_score >= 0.70:
return AudioQuality.ACCEPTABLE
else:
return AudioQuality.FAILED
def _calculate_confidence(self, metrics: Dict,
quality_level: AudioQuality,
profile: PlatformAudioProfile) -> float:
"""Calculate confidence score for viral success"""
score = 0.0
# LUFS accuracy (20%)
lufs_error = abs(metrics['post_lufs'] - metrics['target_lufs'])
lufs_score = max(0, 1.0 - lufs_error / 3.0)
score += lufs_score * 0.20
# First 2s loudness (25% - CRITICAL)
first_2s_target = profile.momentary_lufs
first_2s_error = abs(metrics['first_2s_lufs'] - first_2s_target)
first_2s_score = max(0, 1.0 - first_2s_error / 3.0)
score += first_2s_score * 0.25
# Playback survival (30%)
avg_survival = np.mean(list(metrics['playback_survival'].values()))
score += avg_survival * 0.30
# Intelligibility (15%)
if metrics.get('intelligibility'):
intel_score = metrics['intelligibility'].get('overall_score', 0.8)
score += intel_score * 0.15
else:
score += 0.12 # Assume decent if no data
# Beat preservation (10%)
beat_score = max(0, 1.0 - metrics.get('beat_drift_ms', 0) / 10.0)
score += beat_score * 0.10
# Quality level bonus/penalty
quality_multipliers = {
AudioQuality.VIRAL_READY: 1.1,
AudioQuality.GOOD: 1.0,
AudioQuality.ACCEPTABLE: 0.9,
AudioQuality.NEEDS_WORK: 0.7,
AudioQuality.FAILED: 0.5
}
score *= quality_multipliers[quality_level]
return min(1.0, max(0.0, score))
def _calculate_quality_score(self, metrics: Dict,
confidence: float) -> float:
"""Calculate 0-100 quality score"""
return confidence * 100
def _apply_learned_params(self, profile: PlatformAudioProfile,
learned_params: Dict):
"""Apply learned parameters to profile"""
if 'target_lufs' in learned_params:
profile.target_lufs = learned_params['target_lufs']
if 'compression_ratio' in learned_params:
profile.compression_ratio = learned_params['compression_ratio']
if 'hook_boost_db' in learned_params:
profile.hook_boost_db = learned_params['hook_boost_db']
def report_performance(self, result: NormalizationResult,
performance_metrics: Dict):
"""Report performance back to learning system"""
self.learner.record_performance(
result.normalization_params,
performance_metrics
)
# Update platform profile
self.platform_manager.update_profile_from_performance(
result.platform,
performance_metrics
)
# ═══════════════════════════════════════════════════════════════════════════
# CONVENIENCE FUNCTIONS
# ═══════════════════════════════════════════════════════════════════════════
def normalize_audio_for_viral_success(
audio: np.ndarray,
platform: str = "tiktok",
**kwargs
) -> NormalizationResult:
"""
🔥 ONE-LINE VIRAL AUDIO NORMALIZATION 🔥
Usage:
result = normalize_audio_for_viral_success(
audio,
platform="tiktok",
syllables=syllables,
beat_alignment=beats
)
"""
engine = AudioNormalizationEngine()
platform_enum = Platform(platform.lower())
return engine.normalize(audio, platform_enum, **kwargs)
if __name__ == "__main__":
logger.info("🔊 Audio Normalization Engine - Ready for 5M+ Views")
logger.info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
logger.info("This is NOT a normalizer. This is a SURVIVAL ENGINE.")
logger.info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment