Skip to content

Instantly share code, notes, and snippets.

@bogged-broker
Created December 31, 2025 00:48
Show Gist options
  • Select an option

  • Save bogged-broker/b8902f6c637c00594274dd89516896a3 to your computer and use it in GitHub Desktop.

Select an option

Save bogged-broker/b8902f6c637c00594274dd89516896a3 to your computer and use it in GitHub Desktop.
"""
audio_playback_simulator.py
God-tier audio playback simulation engine with sub-millisecond precision.
Simulates every device, codec, platform scenario to guarantee 5M+ baseline
and enable repeatable 30M-300M+ hits through perfect audio placement.
Integrates with audio_memory_manager.py and audio_pattern_learner.py.
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from enum import Enum
import json
import warnings
from scipy import signal
from scipy.fft import fft, ifft
class Platform(Enum):
TIKTOK = "tiktok"
INSTAGRAM = "instagram"
YOUTUBE = "youtube"
TWITTER = "twitter"
class Device(Enum):
IPHONE_SPEAKER = "iphone_speaker"
ANDROID_SPEAKER = "android_speaker"
CHEAP_EARBUDS = "cheap_earbuds"
HEADPHONES = "headphones"
LAPTOP_SPEAKER = "laptop_speaker"
TABLET = "tablet"
class Codec(Enum):
AAC = "aac"
MP3 = "mp3"
OGG = "ogg"
OPUS = "opus"
@dataclass
class PlatformProfile:
"""Platform-specific audio requirements and constraints"""
name: str
target_lufs: float
max_peak: float
codec: Codec
stereo_mode: str # "stereo", "mono", "adaptive"
preferred_freq_range: Tuple[int, int] # Hz
compression_ratio: float
buffer_latency_ms: float
# Viral optimization targets
hook_clarity_threshold: float = 0.85
beat_alignment_tolerance_ms: float = 0.5
silence_precision_ms: float = 0.3
# Platform profiles based on each platform's specifications
PLATFORM_PROFILES = {
Platform.TIKTOK: PlatformProfile(
name="TikTok",
target_lufs=-14.0,
max_peak=-1.0,
codec=Codec.AAC,
stereo_mode="adaptive",
preferred_freq_range=(80, 15000),
compression_ratio=4.5,
buffer_latency_ms=22.0
),
Platform.INSTAGRAM: PlatformProfile(
name="Instagram",
target_lufs=-14.0,
max_peak=-1.0,
codec=Codec.AAC,
stereo_mode="mono",
preferred_freq_range=(100, 14000),
compression_ratio=5.0,
buffer_latency_ms=18.0
),
Platform.YOUTUBE: PlatformProfile(
name="YouTube",
target_lufs=-14.0,
max_peak=-1.0,
codec=Codec.OPUS,
stereo_mode="stereo",
preferred_freq_range=(50, 18000),
compression_ratio=3.0,
buffer_latency_ms=12.0
),
Platform.TWITTER: PlatformProfile(
name="Twitter",
target_lufs=-13.0,
max_peak=-1.0,
codec=Codec.AAC,
stereo_mode="mono",
preferred_freq_range=(100, 12000),
compression_ratio=5.5,
buffer_latency_ms=25.0
)
}
@dataclass
class DeviceProfile:
"""Device-specific playback characteristics"""
name: str
frequency_response: Dict[int, float] # Hz -> dB adjustment
phase_drift_ms: float
dynamic_range_db: float
distortion_threshold: float
mono_downmix: bool
low_end_rolloff_hz: int
high_end_rolloff_hz: int
# Device profiles based on real-world measurements
DEVICE_PROFILES = {
Device.IPHONE_SPEAKER: DeviceProfile(
name="iPhone Speaker",
frequency_response={
80: -12.0, 200: -3.0, 500: 0.0, 1000: 0.0,
2000: 1.5, 4000: 2.0, 8000: -2.0, 12000: -8.0
},
phase_drift_ms=-12.0, # Low-end smear
dynamic_range_db=55.0,
distortion_threshold=0.8,
mono_downmix=True,
low_end_rolloff_hz=200,
high_end_rolloff_hz=10000
),
Device.ANDROID_SPEAKER: DeviceProfile(
name="Android Speaker",
frequency_response={
80: -15.0, 200: -5.0, 500: -1.0, 1000: 0.0,
2000: 0.5, 4000: 1.0, 8000: -3.0, 12000: -10.0
},
phase_drift_ms=38.0, # Codec micro-delay
dynamic_range_db=50.0,
distortion_threshold=0.75,
mono_downmix=True,
low_end_rolloff_hz=250,
high_end_rolloff_hz=9000
),
Device.CHEAP_EARBUDS: DeviceProfile(
name="Cheap Earbuds",
frequency_response={
80: -8.0, 200: -2.0, 500: 0.0, 1000: 0.0,
2000: -1.0, 4000: 2.5, 8000: -1.0, 12000: -6.0
},
phase_drift_ms=5.0,
dynamic_range_db=65.0,
distortion_threshold=0.85,
mono_downmix=False,
low_end_rolloff_hz=100,
high_end_rolloff_hz=12000
),
Device.HEADPHONES: DeviceProfile(
name="Headphones",
frequency_response={
80: 0.0, 200: 0.0, 500: 0.0, 1000: 0.0,
2000: 0.0, 4000: 0.0, 8000: 0.0, 12000: 0.0
},
phase_drift_ms=0.0,
dynamic_range_db=90.0,
distortion_threshold=0.95,
mono_downmix=False,
low_end_rolloff_hz=20,
high_end_rolloff_hz=20000
),
Device.LAPTOP_SPEAKER: DeviceProfile(
name="Laptop Speaker",
frequency_response={
80: -18.0, 200: -8.0, 500: -2.0, 1000: 0.0,
2000: 1.0, 4000: 1.5, 8000: -4.0, 12000: -12.0
},
phase_drift_ms=15.0,
dynamic_range_db=48.0,
distortion_threshold=0.7,
mono_downmix=False,
low_end_rolloff_hz=300,
high_end_rolloff_hz=8000
),
Device.TABLET: DeviceProfile(
name="Tablet",
frequency_response={
80: -10.0, 200: -4.0, 500: -1.0, 1000: 0.0,
2000: 0.5, 4000: 1.0, 8000: -2.5, 12000: -9.0
},
phase_drift_ms=8.0,
dynamic_range_db=58.0,
distortion_threshold=0.8,
mono_downmix=False,
low_end_rolloff_hz=180,
high_end_rolloff_hz=11000
)
}
@dataclass
class AudioFork:
"""Micro-variant fork of audio timing"""
id: str
audio_data: np.ndarray
sample_rate: int
hook_offset_ms: float
silence_offset_ms: float
beat_offset_ms: float
metadata: Dict = field(default_factory=dict)
@dataclass
class SimulationResult:
"""Results from playback simulation"""
fork_id: str
platform: Platform
device: Device
predicted_retention: float
engagement_score: float
phase_alignment_score: float
clarity_score: float
intelligibility_score: float
stress_flags: List[str]
warnings: List[str]
risk_score: float
recommended_adjustments: List[Dict]
timing_offsets: Dict[str, float]
class PhaseDriftCorrector:
"""Simulates and corrects phase drift from codecs and devices"""
def __init__(self):
self.drift_lookup_table = {}
self._initialize_lookup_table()
def _initialize_lookup_table(self):
"""Precompute phase drift for common codec+device combinations"""
for platform in Platform:
for device in Device:
profile = PLATFORM_PROFILES[platform]
dev_profile = DEVICE_PROFILES[device]
key = f"{platform.value}_{device.value}_{profile.codec.value}"
# Calculate composite drift
codec_drift = self._estimate_codec_drift(profile.codec)
buffer_drift = profile.buffer_latency_ms
device_drift = dev_profile.phase_drift_ms
total_drift = codec_drift + buffer_drift + device_drift
self.drift_lookup_table[key] = {
"total_drift_ms": total_drift,
"codec_drift": codec_drift,
"buffer_drift": buffer_drift,
"device_drift": device_drift
}
def _estimate_codec_drift(self, codec: Codec) -> float:
"""Estimate phase drift introduced by codec compression"""
codec_drift_map = {
Codec.AAC: 8.5,
Codec.MP3: 12.0,
Codec.OGG: 6.5,
Codec.OPUS: 4.0
}
return codec_drift_map.get(codec, 10.0)
def apply_phase_correction(self, audio: np.ndarray, platform: Platform,
device: Device, sample_rate: int) -> np.ndarray:
"""Apply phase correction to compensate for predicted drift"""
profile = PLATFORM_PROFILES[platform]
key = f"{platform.value}_{device.value}_{profile.codec.value}"
drift_info = self.drift_lookup_table.get(key, {"total_drift_ms": 0.0})
drift_samples = int((drift_info["total_drift_ms"] / 1000.0) * sample_rate)
if drift_samples > 0:
# Shift audio earlier to compensate for latency
corrected = np.roll(audio, -drift_samples)
corrected[-drift_samples:] = 0.0
return corrected
elif drift_samples < 0:
# Add delay
corrected = np.roll(audio, abs(drift_samples))
corrected[:abs(drift_samples)] = 0.0
return corrected
return audio.copy()
def simulate_drift(self, audio: np.ndarray, platform: Platform,
device: Device, sample_rate: int) -> np.ndarray:
"""Simulate the drift that would occur during playback"""
profile = PLATFORM_PROFILES[platform]
key = f"{platform.value}_{device.value}_{profile.codec.value}"
drift_info = self.drift_lookup_table.get(key, {"total_drift_ms": 0.0})
drift_samples = int((drift_info["total_drift_ms"] / 1000.0) * sample_rate)
if drift_samples != 0:
drifted = np.roll(audio, drift_samples)
if drift_samples > 0:
drifted[:drift_samples] = 0.0
else:
drifted[drift_samples:] = 0.0
return drifted
return audio.copy()
class MultiForkSimulator:
"""Generates and simulates micro-variant forks"""
def __init__(self):
self.fork_counter = 0
def generate_forks(self, audio: np.ndarray, sample_rate: int,
num_forks: int = 12) -> List[AudioFork]:
"""Generate micro-variant forks with timing adjustments"""
forks = []
# Base fork (no offset)
forks.append(AudioFork(
id=f"fork_{self.fork_counter}",
audio_data=audio.copy(),
sample_rate=sample_rate,
hook_offset_ms=0.0,
silence_offset_ms=0.0,
beat_offset_ms=0.0
))
self.fork_counter += 1
# Generate variants with micro-offsets
offset_variants = [
(0.5, 0.0, 0.0), (-0.5, 0.0, 0.0), # Hook timing
(0.0, 0.3, 0.0), (0.0, -0.3, 0.0), # Silence timing
(0.0, 0.0, 0.5), (0.0, 0.0, -0.5), # Beat timing
(5.0, 0.0, 0.0), (-5.0, 0.0, 0.0), # Larger hook shifts
(10.0, 2.0, 1.0), (-10.0, -2.0, -1.0), # Combined shifts
(20.0, 5.0, 2.0), (-20.0, -5.0, -2.0) # Major shifts
]
for hook_ms, silence_ms, beat_ms in offset_variants[:num_forks-1]:
total_offset_ms = hook_ms + silence_ms + beat_ms
offset_samples = int((total_offset_ms / 1000.0) * sample_rate)
fork_audio = np.roll(audio, offset_samples)
if offset_samples > 0:
fork_audio[:offset_samples] = 0.0
elif offset_samples < 0:
fork_audio[offset_samples:] = 0.0
forks.append(AudioFork(
id=f"fork_{self.fork_counter}",
audio_data=fork_audio,
sample_rate=sample_rate,
hook_offset_ms=hook_ms,
silence_offset_ms=silence_ms,
beat_offset_ms=beat_ms
))
self.fork_counter += 1
return forks
class LatencyCompensator:
"""Applies device and platform-specific latency compensation"""
@staticmethod
def apply(fork: AudioFork, platform: Platform, device: Device) -> AudioFork:
"""Apply latency compensation to fork"""
profile = PLATFORM_PROFILES[platform]
dev_profile = DEVICE_PROFILES[device]
# Calculate total compensation needed
total_latency_ms = profile.buffer_latency_ms + dev_profile.phase_drift_ms
latency_samples = int((total_latency_ms / 1000.0) * fork.sample_rate)
# Compensate by shifting audio
compensated = np.roll(fork.audio_data, -latency_samples)
if latency_samples > 0:
compensated[-latency_samples:] = 0.0
return AudioFork(
id=fork.id + "_compensated",
audio_data=compensated,
sample_rate=fork.sample_rate,
hook_offset_ms=fork.hook_offset_ms,
silence_offset_ms=fork.silence_offset_ms,
beat_offset_ms=fork.beat_offset_ms,
metadata={**fork.metadata, "latency_compensated": True}
)
class HumanArousalModel:
"""Predicts human attention, arousal, and retention based on audio"""
def __init__(self):
self.fatigue_onset_seconds = 3.5
self.hook_window_seconds = 0.8
self.silence_tension_factor = 1.4
def predict(self, audio: np.ndarray, sample_rate: int) -> float:
"""Predict retention probability based on audio characteristics"""
duration_sec = len(audio) / sample_rate
# Detect hooks (sudden amplitude increases)
hook_score = self._detect_hooks(audio, sample_rate)
# Detect silence patterns (tension and anticipation)
silence_score = self._detect_silences(audio, sample_rate)
# Fatigue modeling
fatigue_penalty = self._calculate_fatigue(duration_sec)
# Energy consistency
energy_score = self._calculate_energy_consistency(audio)
# Composite arousal score
arousal_score = (
hook_score * 0.4 +
silence_score * 0.3 +
energy_score * 0.2 +
(1.0 - fatigue_penalty) * 0.1
)
return np.clip(arousal_score, 0.0, 1.0)
def _detect_hooks(self, audio: np.ndarray, sample_rate: int) -> float:
"""Detect hook timing and strength"""
window_samples = int(self.hook_window_seconds * sample_rate)
energy = np.abs(audio)
if len(energy) < window_samples:
return 0.5
# Calculate energy increase in first window
initial_energy = np.mean(energy[:window_samples])
peak_energy = np.max(energy[:window_samples * 3])
if initial_energy > 0:
hook_ratio = peak_energy / initial_energy
return np.clip(hook_ratio / 3.0, 0.0, 1.0)
return 0.5
def _detect_silences(self, audio: np.ndarray, sample_rate: int) -> float:
"""Detect silence patterns that build tension"""
threshold = 0.01
silence_mask = np.abs(audio) < threshold
# Find silence runs
silence_changes = np.diff(silence_mask.astype(int))
silence_starts = np.where(silence_changes == 1)[0]
silence_ends = np.where(silence_changes == -1)[0]
if len(silence_starts) == 0:
return 0.3 # No strategic silence
# Score based on presence and timing of silences
total_silence_duration = np.sum(silence_mask) / sample_rate
if total_silence_duration < 0.1:
return 0.4
elif total_silence_duration < 0.5:
return 0.8 * self.silence_tension_factor
else:
return 0.6 # Too much silence
def _calculate_fatigue(self, duration_sec: float) -> float:
"""Calculate listener fatigue penalty"""
if duration_sec < self.fatigue_onset_seconds:
return 0.0
excess_time = duration_sec - self.fatigue_onset_seconds
fatigue = excess_time / 10.0
return np.clip(fatigue, 0.0, 0.5)
def _calculate_energy_consistency(self, audio: np.ndarray) -> float:
"""Calculate energy consistency (not too flat, not too chaotic)"""
energy = np.abs(audio)
energy_std = np.std(energy)
energy_mean = np.mean(energy)
if energy_mean == 0:
return 0.0
coefficient_of_variation = energy_std / energy_mean
# Optimal CV is around 0.5-1.5
if 0.5 <= coefficient_of_variation <= 1.5:
return 1.0
elif coefficient_of_variation < 0.5:
return 0.6 # Too flat
else:
return 0.7 # Too chaotic
class StressTestEngine:
"""Runs edge-case stress tests to detect failure patterns"""
def run(self, audio: np.ndarray, sample_rate: int,
platform: Platform, device: Device) -> List[str]:
"""Run stress tests and return failure flags"""
flags = []
# Test 1: Extreme latency
extreme_latency_ms = 100.0
latency_samples = int((extreme_latency_ms / 1000.0) * sample_rate)
extreme_audio = np.roll(audio, latency_samples)
if self._check_timing_collapse(extreme_audio, sample_rate):
flags.append("EXTREME_LATENCY_FAILURE")
# Test 2: Missing micro-silence
if not self._detect_micro_silences(audio, sample_rate):
flags.append("MISSING_MICRO_SILENCE")
# Test 3: Pitch shift tolerance
if not self._check_pitch_robustness(audio):
flags.append("PITCH_SHIFT_VULNERABLE")
# Test 4: Clipping and distortion
if np.max(np.abs(audio)) > 0.95:
flags.append("CLIPPING_RISK")
# Test 5: Low-frequency energy on mobile
dev_profile = DEVICE_PROFILES[device]
if dev_profile.mono_downmix and self._excessive_low_freq(audio, sample_rate):
flags.append("EXCESSIVE_LOW_FREQ_MOBILE")
# Test 6: High-frequency loss
if self._high_freq_loss(audio, sample_rate, dev_profile):
flags.append("HIGH_FREQ_CLARITY_LOSS")
return flags
def _check_timing_collapse(self, audio: np.ndarray, sample_rate: int) -> bool:
"""Check if extreme latency causes timing collapse"""
energy = np.abs(audio)
window = int(0.1 * sample_rate)
if len(energy) < window:
return False
initial_energy = np.mean(energy[:window])
return initial_energy < 0.01
def _detect_micro_silences(self, audio: np.ndarray, sample_rate: int) -> bool:
"""Detect presence of strategic micro-silences"""
threshold = 0.005
silence_mask = np.abs(audio) < threshold
min_silence_samples = int(0.05 * sample_rate)
silence_runs = []
current_run = 0
for is_silent in silence_mask:
if is_silent:
current_run += 1
else:
if current_run > 0:
silence_runs.append(current_run)
current_run = 0
return any(run >= min_silence_samples for run in silence_runs)
def _check_pitch_robustness(self, audio: np.ndarray) -> bool:
"""Check if audio maintains quality under pitch shift"""
# Simple check: ensure audio has spectral diversity
spectrum = np.abs(fft(audio))
spectral_centroid = np.sum(np.arange(len(spectrum)) * spectrum) / np.sum(spectrum)
# Robust audio has centroid in mid-range
normalized_centroid = spectral_centroid / len(spectrum)
return 0.1 < normalized_centroid < 0.6
def _excessive_low_freq(self, audio: np.ndarray, sample_rate: int) -> bool:
"""Check for excessive low-frequency content that degrades on mobile"""
spectrum = np.abs(fft(audio))
freq_bins = np.fft.fftfreq(len(audio), 1/sample_rate)
low_freq_mask = np.abs(freq_bins) < 150
low_freq_energy = np.sum(spectrum[low_freq_mask])
total_energy = np.sum(spectrum)
if total_energy == 0:
return False
return (low_freq_energy / total_energy) > 0.4
def _high_freq_loss(self, audio: np.ndarray, sample_rate: int,
dev_profile: DeviceProfile) -> bool:
"""Check if high-frequency content is lost on device"""
spectrum = np.abs(fft(audio))
freq_bins = np.fft.fftfreq(len(audio), 1/sample_rate)
critical_freq_mask = (np.abs(freq_bins) > 2000) & (np.abs(freq_bins) < 6000)
critical_energy = np.sum(spectrum[critical_freq_mask])
high_rolloff = dev_profile.high_end_rolloff_hz
lost_freq_mask = np.abs(freq_bins) > high_rolloff
lost_energy = np.sum(spectrum[lost_freq_mask])
if critical_energy == 0:
return False
return (lost_energy / critical_energy) > 0.5
class AudioMastering:
"""Loudness normalization, compression, and mastering per platform"""
@staticmethod
def normalize_loudness(audio: np.ndarray, sample_rate: int,
target_lufs: float) -> np.ndarray:
"""Normalize audio to target LUFS"""
# Simple RMS-based approximation of LUFS
rms = np.sqrt(np.mean(audio ** 2))
if rms == 0:
return audio
# Approximate LUFS from RMS (not perfectly accurate but sufficient)
current_lufs = 20 * np.log10(rms) - 0.691
gain_db = target_lufs - current_lufs
gain_linear = 10 ** (gain_db / 20)
normalized = audio * gain_linear
# Peak limiting
peak = np.max(np.abs(normalized))
if peak > 0.95:
normalized = normalized * (0.95 / peak)
return normalized
@staticmethod
def apply_compression(audio: np.ndarray, ratio: float = 4.0,
threshold: float = -20.0) -> np.ndarray:
"""Apply dynamic range compression"""
threshold_linear = 10 ** (threshold / 20)
compressed = audio.copy()
# Simple envelope follower
envelope = np.abs(audio)
# Attack and release smoothing
attack_samples = 100
release_samples = 500
smoothed_envelope = envelope.copy()
for i in range(1, len(smoothed_envelope)):
if envelope[i] > smoothed_envelope[i-1]:
alpha = 1.0 / attack_samples
else:
alpha = 1.0 / release_samples
smoothed_envelope[i] = (alpha * envelope[i] +
(1 - alpha) * smoothed_envelope[i-1])
# Apply compression
gain_reduction = np.ones_like(audio)
over_threshold = smoothed_envelope > threshold_linear
gain_reduction[over_threshold] = (
threshold_linear / smoothed_envelope[over_threshold]
) ** (1 - 1/ratio)
compressed = audio * gain_reduction
return compressed
@staticmethod
def simulate_device_playback(audio: np.ndarray, sample_rate: int,
device: Device) -> np.ndarray:
"""Simulate device-specific frequency response and limitations"""
dev_profile = DEVICE_PROFILES[device]
# Apply frequency response curve
spectrum = fft(audio)
freq_bins = np.fft.fftfreq(len(audio), 1/sample_rate)
# Create filter based on device frequency response
filter_curve = np.ones_like(spectrum, dtype=complex)
for freq_point, db_adjust in dev_profile.frequency_response.items():
# Find closest frequency bins
freq_mask = np.abs(np.abs(freq_bins) - freq_point) < 200
linear_adjust = 10 ** (db_adjust / 20)
filter_curve[freq_mask] *= linear_adjust
# Apply rolloffs
low_rolloff_mask = np.abs(freq_bins) < dev_profile.low_end_rolloff_hz
high_rolloff_mask = np.abs(freq_bins) > dev_profile.high_end_rolloff_hz
filter_curve[low_rolloff_mask] *= 0.1
filter_curve[high_rolloff_mask] *= 0.1
# Apply filter
filtered_spectrum = spectrum * filter_curve
simulated_audio = np.real(ifft(filtered_spectrum))
# Apply dynamic range limitation
simulated_audio = np.clip(simulated_audio, -1.0, 1.0)
# Mono downmix if needed
if dev_profile.mono_downmix and len(simulated_audio.shape) > 1:
simulated_audio = np.mean(simulated_audio, axis=0)
return simulated_audio
class PlaybackSimulatorEngine:
"""Main orchestrator for playback simulation"""
def __init__(self, memory_manager=None, pattern_learner=None):
self.memory_manager = memory_manager
self.pattern_learner = pattern_learner
self.phase_corrector = PhaseDriftCorrector()
self.fork_simulator = MultiForkSimulator()
self.arousal_model = HumanArousalModel()
self.stress_tester = StressTestEngine()
self.mastering = AudioMastering()
def simulate_video(self, audio: np.ndarray, sample_rate: int,
platform: Platform, device: Device,
num_forks: int = 12) -> SimulationResult:
"""Simulate video playback and return best fork with predictions"""
# Generate micro-variant forks
forks = self.fork_simulator.generate_forks(audio, sample_rate, num_forks)
results = []
for fork in forks:
# Apply latency compensation
compensated_fork = LatencyCompensator.apply(fork, platform, device)
# Apply phase drift correction
corrected_audio = self.phase_corrector.apply_phase_correction(
compensated_fork.audio_data, platform, device, sample_rate
)
# Master for platform
profile = PLATFORM_PROFILES[platform]
normalized = self.mastering.normalize_loudness(
corrected_audio, sample_rate, profile.target_lufs
)
compressed = self.mastering.apply_compression(
normalized, profile.compression_ratio
)
# Simulate device playback
simulated = self.mastering.simulate_device_playback(
compressed, sample_rate, device
)
# Predict human arousal and retention
arousal_score = self.arousal_model.predict(simulated, sample_rate)
# Run stress tests
stress_flags = self.stress_tester.run(simulated, sample_rate, platform, device)
# Calculate clarity and intelligibility
clarity_score = self._calculate_clarity(simulated, sample_rate, device)
intelligibility_score = self._calculate_intelligibility(simulated, sample_rate)
# Calculate phase alignment score
phase_score = self._calculate_phase_alignment(
fork, compensated_fork, sample_rate
)
# Calculate engagement score
engagement_score = self._calculate_engagement(
arousal_score, clarity_score, intelligibility_score, phase_score
)
# Calculate risk score
risk_score = self._calculate_risk(stress_flags, clarity_score,
intelligibility_score, phase_score)
# Generate warnings
warnings = self._generate_warnings(stress_flags, clarity_score,
intelligibility_score, risk_score)
# Generate recommendations
recommendations = self._generate_recommendations(
stress_flags, clarity_score, intelligibility_score,
fork, profile, device
)
# Timing offsets
timing_offsets = {
"hook_offset_ms": fork.hook_offset_ms,
"silence_offset_ms": fork.silence_offset_ms,
"beat_offset_ms": fork.beat_offset_ms,
"total_drift_ms": self.phase_corrector.drift_lookup_table.get(
f"{platform.value}_{device.value}_{profile.codec.value}",
{}
).get("total_drift_ms", 0.0)
}
result = SimulationResult(
fork_id=fork.id,
platform=platform,
device=device,
predicted_retention=arousal_score,
engagement_score=engagement_score,
phase_alignment_score=phase_score,
clarity_score=clarity_score,
intelligibility_score=intelligibility_score,
stress_flags=stress_flags,
warnings=warnings,
risk_score=risk_score,
recommended_adjustments=recommendations,
timing_offsets=timing_offsets
)
results.append(result)
# Select best fork based on engagement score
best_result = max(results, key=lambda x: x.engagement_score)
# Update memory and pattern learner if available
if self.memory_manager and self.pattern_learner:
self._update_learning_systems(results, best_result)
return best_result
def simulate_all_platforms(self, audio: np.ndarray, sample_rate: int,
num_forks: int = 12) -> Dict[Platform, Dict[Device, SimulationResult]]:
"""Simulate across all platforms and devices"""
all_results = {}
for platform in Platform:
platform_results = {}
for device in Device:
result = self.simulate_video(audio, sample_rate, platform,
device, num_forks)
platform_results[device] = result
all_results[platform] = platform_results
return all_results
def _calculate_clarity(self, audio: np.ndarray, sample_rate: int,
device: Device) -> float:
"""Calculate audio clarity score after device simulation"""
# Measure spectral clarity
spectrum = np.abs(fft(audio))
# Focus on mid-range frequencies (human speech/music clarity)
freq_bins = np.fft.fftfreq(len(audio), 1/sample_rate)
mid_range_mask = (np.abs(freq_bins) > 500) & (np.abs(freq_bins) < 4000)
mid_range_energy = np.sum(spectrum[mid_range_mask])
total_energy = np.sum(spectrum)
if total_energy == 0:
return 0.0
clarity_ratio = mid_range_energy / total_energy
# Device-specific adjustments
dev_profile = DEVICE_PROFILES[device]
if dev_profile.dynamic_range_db < 55:
clarity_ratio *= 0.9 # Penalize low dynamic range devices
return np.clip(clarity_ratio * 1.5, 0.0, 1.0)
def _calculate_intelligibility(self, audio: np.ndarray, sample_rate: int) -> float:
"""Calculate speech/vocal intelligibility score"""
# Measure energy in speech frequency range (300-3400 Hz)
spectrum = np.abs(fft(audio))
freq_bins = np.fft.fftfreq(len(audio), 1/sample_rate)
speech_mask = (np.abs(freq_bins) > 300) & (np.abs(freq_bins) < 3400)
speech_energy = np.sum(spectrum[speech_mask])
total_energy = np.sum(spectrum)
if total_energy == 0:
return 0.0
# High speech energy ratio = high intelligibility
intelligibility = speech_energy / total_energy
# Check for temporal clarity (not too much reverb/echo)
autocorr = np.correlate(audio, audio, mode='same')
normalized_autocorr = autocorr / autocorr[len(autocorr)//2]
# Low autocorrelation at small lags = good temporal clarity
temporal_clarity = 1.0 - np.mean(np.abs(normalized_autocorr[
len(autocorr)//2 + 100:len(autocorr)//2 + 500
]))
combined_score = (intelligibility * 0.7 + temporal_clarity * 0.3)
return np.clip(combined_score * 1.2, 0.0, 1.0)
def _calculate_phase_alignment(self, original_fork: AudioFork,
compensated_fork: AudioFork,
sample_rate: int) -> float:
"""Calculate how well phase alignment is maintained"""
# Compare original and compensated audio
max_len = min(len(original_fork.audio_data),
len(compensated_fork.audio_data))
orig = original_fork.audio_data[:max_len]
comp = compensated_fork.audio_data[:max_len]
# Cross-correlation to measure alignment
correlation = np.correlate(orig, comp, mode='same')
peak_corr = np.max(np.abs(correlation))
# Normalize
orig_energy = np.sum(orig ** 2)
comp_energy = np.sum(comp ** 2)
if orig_energy == 0 or comp_energy == 0:
return 0.5
normalized_corr = peak_corr / np.sqrt(orig_energy * comp_energy)
return np.clip(normalized_corr, 0.0, 1.0)
def _calculate_engagement(self, arousal: float, clarity: float,
intelligibility: float, phase: float) -> float:
"""Calculate composite engagement score"""
# Weighted combination
engagement = (
arousal * 0.40 +
clarity * 0.25 +
intelligibility * 0.20 +
phase * 0.15
)
return np.clip(engagement, 0.0, 1.0)
def _calculate_risk(self, stress_flags: List[str], clarity: float,
intelligibility: float, phase: float) -> float:
"""Calculate risk score (0 = low risk, 1 = high risk)"""
base_risk = len(stress_flags) * 0.15
# Add risk from poor scores
if clarity < 0.5:
base_risk += 0.2
if intelligibility < 0.5:
base_risk += 0.2
if phase < 0.7:
base_risk += 0.15
return np.clip(base_risk, 0.0, 1.0)
def _generate_warnings(self, stress_flags: List[str], clarity: float,
intelligibility: float, risk: float) -> List[str]:
"""Generate human-readable warnings"""
warnings = []
if risk > 0.7:
warnings.append("⚠️ HIGH RISK: Multiple failure indicators detected")
elif risk > 0.4:
warnings.append("⚠️ MEDIUM RISK: Some quality concerns present")
if clarity < 0.6:
warnings.append("⚠️ Low clarity on mobile speakers - may sound muddy")
if intelligibility < 0.6:
warnings.append("⚠️ Reduced speech intelligibility - vocals may be unclear")
if "CLIPPING_RISK" in stress_flags:
warnings.append("⚠️ Audio clipping detected - reduce peak levels")
if "EXCESSIVE_LOW_FREQ_MOBILE" in stress_flags:
warnings.append("⚠️ Excessive bass will distort on phone speakers")
if "HIGH_FREQ_CLARITY_LOSS" in stress_flags:
warnings.append("⚠️ High-frequency detail lost on this device")
if "MISSING_MICRO_SILENCE" in stress_flags:
warnings.append("⚠️ Missing strategic silence - may reduce engagement")
if "EXTREME_LATENCY_FAILURE" in stress_flags:
warnings.append("⚠️ Vulnerable to latency-induced timing collapse")
return warnings
def _generate_recommendations(self, stress_flags: List[str], clarity: float,
intelligibility: float, fork: AudioFork,
profile: PlatformProfile,
device: Device) -> List[Dict]:
"""Generate actionable adjustment recommendations"""
recommendations = []
if clarity < 0.6:
recommendations.append({
"issue": "Low clarity",
"action": "Boost mid-range frequencies (1-4 kHz)",
"priority": "HIGH"
})
if intelligibility < 0.6:
recommendations.append({
"issue": "Poor intelligibility",
"action": "Reduce reverb and boost vocal clarity (300-3400 Hz)",
"priority": "HIGH"
})
if "EXCESSIVE_LOW_FREQ_MOBILE" in stress_flags:
recommendations.append({
"issue": "Excessive bass",
"action": "Apply high-pass filter at 150 Hz",
"priority": "CRITICAL"
})
if "CLIPPING_RISK" in stress_flags:
recommendations.append({
"issue": "Clipping risk",
"action": "Reduce gain by -2 dB and apply soft limiting",
"priority": "CRITICAL"
})
if "MISSING_MICRO_SILENCE" in stress_flags:
recommendations.append({
"issue": "Missing strategic silence",
"action": f"Add 0.2-0.5s silence at {profile.silence_precision_ms}ms precision",
"priority": "MEDIUM"
})
if abs(fork.hook_offset_ms) > 10:
recommendations.append({
"issue": "Hook timing offset",
"action": f"Adjust hook timing by {-fork.hook_offset_ms:.1f}ms",
"priority": "MEDIUM"
})
return recommendations
def _update_learning_systems(self, all_results: List[SimulationResult],
best_result: SimulationResult):
"""Update memory manager and pattern learner with simulation results"""
# This would integrate with your actual memory manager and pattern learner
# Placeholder implementation
for result in all_results:
fork_performance = {
"fork_id": result.fork_id,
"engagement_score": result.engagement_score,
"retention_score": result.predicted_retention,
"timing_offsets": result.timing_offsets,
"platform": result.platform.value,
"device": result.device.value
}
# Would send to memory manager for storage
# Would send to pattern learner for RL updates
pass
def export_analytics(self, results: Dict[Platform, Dict[Device, SimulationResult]],
output_file: str = "playback_analytics.json"):
"""Export simulation results for analysis"""
export_data = {
"simulation_timestamp": "2024-01-01T00:00:00Z", # Would use real timestamp
"platforms": {}
}
for platform, device_results in results.items():
platform_data = {
"devices": {}
}
for device, result in device_results.items():
device_data = {
"fork_id": result.fork_id,
"predicted_retention": float(result.predicted_retention),
"engagement_score": float(result.engagement_score),
"phase_alignment": float(result.phase_alignment_score),
"clarity": float(result.clarity_score),
"intelligibility": float(result.intelligibility_score),
"risk_score": float(result.risk_score),
"warnings": result.warnings,
"stress_flags": result.stress_flags,
"recommendations": result.recommended_adjustments,
"timing_offsets": result.timing_offsets
}
platform_data["devices"][device.value] = device_data
export_data["platforms"][platform.value] = platform_data
with open(output_file, 'w') as f:
json.dump(export_data, f, indent=2)
return export_data
def main():
"""Example usage"""
# Create dummy audio (5 seconds)
sample_rate = 44100
duration = 5.0
t = np.linspace(0, duration, int(sample_rate * duration))
# Complex audio with hook, silence, and beats
audio = (
0.3 * np.sin(2 * np.pi * 440 * t) + # Base tone
0.2 * np.sin(2 * np.pi * 880 * t) + # Harmonic
0.1 * np.random.randn(len(t)) # Noise
)
# Add silence at 1.5s
silence_start = int(1.5 * sample_rate)
silence_end = int(1.7 * sample_rate)
audio[silence_start:silence_end] = 0.0
# Add hook at 0.5s
hook_start = int(0.5 * sample_rate)
hook_end = int(1.0 * sample_rate)
audio[hook_start:hook_end] *= 2.0
# Normalize
audio = audio / np.max(np.abs(audio)) * 0.8
# Initialize simulator
simulator = PlaybackSimulatorEngine()
# Simulate for TikTok on iPhone
print("Simulating TikTok playback on iPhone speaker...")
result = simulator.simulate_video(
audio, sample_rate,
Platform.TIKTOK,
Device.IPHONE_SPEAKER,
num_forks=12
)
print(f"\n✅ Best Fork: {result.fork_id}")
print(f" Predicted Retention: {result.predicted_retention:.2%}")
print(f" Engagement Score: {result.engagement_score:.2%}")
print(f" Clarity: {result.clarity_score:.2%}")
print(f" Intelligibility: {result.intelligibility_score:.2%}")
print(f" Risk Score: {result.risk_score:.2%}")
if result.warnings:
print(f"\n⚠️ Warnings:")
for warning in result.warnings:
print(f" {warning}")
if result.recommended_adjustments:
print(f"\n🔧 Recommended Adjustments:")
for rec in result.recommended_adjustments:
print(f" [{rec['priority']}] {rec['issue']}: {rec['action']}")
# Simulate across all platforms
print("\n\nSimulating across all platforms and devices...")
all_results = simulator.simulate_all_platforms(audio, sample_rate, num_forks=6)
# Export analytics
analytics = simulator.export_analytics(all_results)
print("\n✅ Analytics exported to playback_analytics.json")
# Find best platform/device combination
best_combo = None
best_score = 0.0
for platform, device_results in all_results.items():
for device, result in device_results.items():
if result.engagement_score > best_score:
best_score = result.engagement_score
best_combo = (platform, device, result)
if best_combo:
platform, device, result = best_combo
print(f"\n🏆 Best Platform/Device Combo:")
print(f" {platform.value} on {device.value}")
print(f" Engagement Score: {result.engagement_score:.2%}")
print(f" Risk Score: {result.risk_score:.2%}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment