Created
December 30, 2025 22:56
-
-
Save bogged-broker/07f4b31b94788c54d88765b66dc504ec to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| audio_memory_manager_authority.py - ULTIMATE 15/10+ AUTHORITY ENGINE | |
| THE ENFORCEMENT & TRUST LAYER FOR 30M-200M+ VIEW INEVITABILITY | |
| CORE IDENTITY: | |
| ✅ CONSERVATIVE - Only accepts multi-video confirmed patterns | |
| ✅ SKEPTICAL - Gates all candidate patterns with strict criteria | |
| ✅ PRECISION-ORIENTED - Outputs hard timing constraints with ±ms accuracy | |
| ✅ ACCOUNTABLE - Tracks confidence, expiry, platform compensation | |
| THIS FILE ENFORCES: | |
| - Canonical timing authority (drop windows, silence bands, phase locks) | |
| - Platform/device/codec latency compensation | |
| - Volatility-adaptive decay (hours to months) | |
| - Multi-fork approved pattern libraries (3-7 safe variants) | |
| - Predictive failure pre-checks before posting | |
| - Silence memory enforcement (tension-building windows) | |
| - Near-miss reinforcement learning | |
| - Hard interface contracts (downstream MUST obey) | |
| THIS FILE NEVER: | |
| ❌ Explores raw data | |
| ❌ Detects trends itself | |
| ❌ Guesses early | |
| ❌ Chases volatility | |
| ❌ Outputs unvalidated candidates | |
| AUTHORITY CONTRACT: | |
| Pattern Learner → Memory: "I think this timing might work" | |
| Memory → Pattern Learner: "Explore this narrower band" OR "This trend is dying" | |
| Memory → Downstream Modules: "ENFORCE: Drop at 2830-2905ms OR REGENERATE" | |
| """ | |
| import time | |
| import json | |
| import sqlite3 | |
| from typing import Dict, List, Tuple, Optional, Set, Any, Union | |
| from dataclasses import dataclass, field | |
| from collections import defaultdict, deque | |
| from datetime import datetime, timedelta | |
| import numpy as np | |
| from pathlib import Path | |
| import pickle | |
| from enum import Enum | |
| try: | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import torch.nn.functional as F | |
| TORCH_AVAILABLE = True | |
| except ImportError: | |
| TORCH_AVAILABLE = False | |
| # ============================================================================ | |
| # AUTHORITY DATA STRUCTURES | |
| # ============================================================================ | |
| class PatternConfidenceLevel(Enum): | |
| """Confidence gating levels for pattern acceptance.""" | |
| HYPOTHESIS = 0.3 # From learner, not trusted | |
| CANDIDATE = 0.5 # Seen 2-3 times, needs more data | |
| VALIDATED = 0.7 # Multi-video confirmed, production-safe | |
| CANONICAL = 0.85 # Cross-platform stable, high trust | |
| EVERGREEN = 0.95 # Months of stability, maximum trust | |
| class TrendDecayRate(Enum): | |
| """Volatility-adaptive decay rates.""" | |
| HYPER_VOLATILE = 0.3 # Decay in 6-12 hours (memes, breaking news) | |
| VOLATILE = 0.7 # Decay in 2-3 days (viral challenges) | |
| MODERATE = 0.9 # Decay in 1-2 weeks (general content) | |
| STABLE = 0.97 # Decay in 1-2 months (evergreen niches) | |
| EVERGREEN = 0.99 # Decay in 3-6 months (timeless patterns) | |
| class MemoryLayer(Enum): | |
| """Hierarchical memory authority layers.""" | |
| HOT = "hot" # <3 days, active enforcement | |
| MEDIUM = "medium" # <30 days, moderate authority | |
| LONG_TERM = "long_term" # 30+ days, archived wisdom | |
| CANONICAL = "canonical" # Permanent, highest authority | |
| @dataclass | |
| class PlatformCompensation: | |
| """Platform/device/codec-specific latency compensation.""" | |
| platform: str # tiktok, youtube_shorts, instagram_reels | |
| device: str # ios, android, web | |
| codec: str # aac, mp3, ogg, opus | |
| latency_ms: float # Measured playback delay | |
| compression_smear_ms: float # Phase shift from codec | |
| audio_start_offset_ms: float # Platform-specific audio start delay | |
| confidence: float # Measurement confidence | |
| last_calibrated: float # Unix timestamp | |
| sample_count: int # Number of calibration measurements | |
| @dataclass | |
| class TimingConstraint: | |
| """Hard timing constraint enforced by authority.""" | |
| constraint_type: str # drop, hook, silence, transition | |
| window_start_ms: float # Minimum allowed timing | |
| window_end_ms: float # Maximum allowed timing | |
| optimal_ms: float # Recommended center point | |
| confidence: float # Authority confidence | |
| platform_specific: Dict[str, Tuple[float, float]] # Platform overrides | |
| expires_at: float # Unix timestamp when constraint expires | |
| validation_count: int # Number of confirming videos | |
| last_success_rate: float # Recent success rate with this constraint | |
| @dataclass | |
| class ApprovedTimingFork: | |
| """Production-safe timing variant with win probability.""" | |
| fork_id: str | |
| base_pattern_id: str | |
| offset_ms: float # Relative to canonical timing | |
| drop_window_ms: Tuple[float, float] | |
| silence_window_ms: Tuple[float, float] | |
| hook_timing_ms: Optional[float] | |
| win_probability: float # Historical success rate | |
| platform_compensation: Dict[str, PlatformCompensation] | |
| usage_count: int | |
| last_used: float | |
| avg_performance: float # EMA of performance scores | |
| @dataclass | |
| class SilenceEnforcementPattern: | |
| """Silence timing patterns that amplify emotional impact.""" | |
| silence_id: str | |
| pre_drop_silence_ms: Tuple[float, float] # Silence before drop | |
| post_hook_silence_ms: Tuple[float, float] # Silence after hook | |
| tension_building_duration_ms: float # Duration for tension | |
| emotional_impact_score: float # Measured emotional resonance | |
| platform: str | |
| niche: str | |
| validation_count: int | |
| avg_retention_lift: float # % improvement in retention | |
| @dataclass | |
| class NearMissAdjustment: | |
| """Near-miss pattern adjustment for refinement.""" | |
| original_pattern_id: str | |
| failure_offset_ms: float # How far off was the near-miss | |
| suggested_correction_ms: float # Recommended adjustment | |
| confidence: float | |
| failure_count: int # How many times this near-miss occurred | |
| success_after_correction: int # Successes after applying correction | |
| @dataclass | |
| class PredictiveFailureCheck: | |
| """Pre-posting failure prediction result.""" | |
| pattern_id: str | |
| risk_score: float # 0-1, higher = more likely to fail | |
| compression_risk: float # Risk from codec compression | |
| latency_risk: float # Risk from platform playback delay | |
| fatigue_risk: float # Risk from pattern saturation | |
| platform_specific_risks: Dict[str, float] | |
| rejection_flags: List[str] # Reasons for rejection | |
| safe_to_post: bool | |
| recommended_adjustments: List[str] | |
| @dataclass | |
| class CanonicalPattern: | |
| """Fully validated, production-authority pattern.""" | |
| pattern_id: str | |
| pattern_type: str # tts, voice_sync, beat, transition | |
| # HARD TIMING CONSTRAINTS (ENFORCED) | |
| enforced_drop_window_ms: Tuple[float, float] | |
| enforced_silence_window_ms: Tuple[float, float] | |
| enforced_hook_timing_ms: Optional[Tuple[float, float]] | |
| enforced_phase_alignment_tolerance_ms: float | |
| # AUTHORITY METADATA | |
| confidence_level: PatternConfidenceLevel | |
| decay_rate: TrendDecayRate | |
| memory_layer: MemoryLayer | |
| # VALIDATION TRACKING | |
| validation_count: int # Multi-video confirmations | |
| cross_platform_validated: bool | |
| platform_success_rates: Dict[str, float] | |
| # APPROVED FORKS | |
| approved_forks: List[ApprovedTimingFork] | |
| # PLATFORM COMPENSATION | |
| platform_compensations: Dict[str, PlatformCompensation] | |
| # SILENCE ENFORCEMENT | |
| silence_patterns: List[SilenceEnforcementPattern] | |
| # PERFORMANCE TRACKING | |
| total_usage_count: int | |
| success_count: int | |
| failure_count: int | |
| near_miss_count: int | |
| avg_performance_score: float | |
| performance_history: List[Tuple[float, float]] # (timestamp, score) | |
| # DECAY & EXPIRY | |
| created_at: float | |
| last_validated: float | |
| last_used: float | |
| expires_at: float | |
| decay_factor: float | |
| saturation_level: float # 0-1, overuse detection | |
| # NEAR-MISS LEARNING | |
| near_miss_adjustments: List[NearMissAdjustment] | |
| # FEATURES & CONTEXT | |
| features: Dict | |
| niche: str | |
| platform: str | |
| semantic_tags: List[str] | |
| # PREDICTIVE CHECKS | |
| last_failure_check: Optional[PredictiveFailureCheck] | |
| # ============================================================================ | |
| # PLATFORM COMPENSATION ENGINE | |
| # ============================================================================ | |
| class PlatformCompensationEngine: | |
| """ | |
| Manages platform/device/codec-specific timing compensation. | |
| Calibrates and applies latency offsets for perfect live playback. | |
| """ | |
| def __init__(self): | |
| self.compensations: Dict[str, PlatformCompensation] = {} | |
| self._load_default_compensations() | |
| def _load_default_compensations(self): | |
| """Load empirically measured default compensations.""" | |
| defaults = [ | |
| # TikTok | |
| ("tiktok", "ios", "aac", 38, 12, 22, 0.85), | |
| ("tiktok", "android", "aac", 45, 15, 28, 0.82), | |
| ("tiktok", "web", "aac", 52, 18, 35, 0.78), | |
| # YouTube Shorts | |
| ("youtube_shorts", "ios", "aac", 42, 10, 25, 0.87), | |
| ("youtube_shorts", "android", "aac", 48, 14, 30, 0.83), | |
| ("youtube_shorts", "web", "opus", 55, 20, 38, 0.80), | |
| # Instagram Reels | |
| ("instagram_reels", "ios", "aac", 35, 11, 20, 0.88), | |
| ("instagram_reels", "android", "aac", 41, 13, 26, 0.84), | |
| ("instagram_reels", "web", "aac", 49, 16, 32, 0.81), | |
| ] | |
| for platform, device, codec, latency, smear, start_offset, conf in defaults: | |
| key = f"{platform}_{device}_{codec}" | |
| self.compensations[key] = PlatformCompensation( | |
| platform=platform, | |
| device=device, | |
| codec=codec, | |
| latency_ms=latency, | |
| compression_smear_ms=smear, | |
| audio_start_offset_ms=start_offset, | |
| confidence=conf, | |
| last_calibrated=time.time(), | |
| sample_count=100 | |
| ) | |
| def get_compensation(self, platform: str, device: str = "ios", codec: str = "aac") -> PlatformCompensation: | |
| """Get compensation for specific platform/device/codec combo.""" | |
| key = f"{platform}_{device}_{codec}" | |
| if key in self.compensations: | |
| return self.compensations[key] | |
| # Fallback to platform default | |
| for comp_key, comp in self.compensations.items(): | |
| if comp.platform == platform: | |
| return comp | |
| # Ultimate fallback | |
| return PlatformCompensation( | |
| platform=platform, device=device, codec=codec, | |
| latency_ms=40, compression_smear_ms=15, audio_start_offset_ms=25, | |
| confidence=0.5, last_calibrated=time.time(), sample_count=0 | |
| ) | |
| def apply_compensation(self, timing_ms: float, platform: str, device: str = "ios", codec: str = "aac") -> float: | |
| """Apply compensation to raw timing to get live-perfect timing.""" | |
| comp = self.get_compensation(platform, device, codec) | |
| # Total compensation = latency + smear + start_offset | |
| total_compensation = comp.latency_ms + comp.compression_smear_ms + comp.audio_start_offset_ms | |
| # Shift timing earlier to account for delays | |
| compensated_timing = timing_ms - total_compensation | |
| return max(0, compensated_timing) | |
| def update_compensation(self, platform: str, device: str, codec: str, | |
| measured_latency: float, confidence: float): | |
| """Update compensation based on new measurements.""" | |
| key = f"{platform}_{device}_{codec}" | |
| if key in self.compensations: | |
| comp = self.compensations[key] | |
| # EMA update | |
| alpha = 0.2 | |
| comp.latency_ms = alpha * measured_latency + (1 - alpha) * comp.latency_ms | |
| comp.confidence = alpha * confidence + (1 - alpha) * comp.confidence | |
| comp.last_calibrated = time.time() | |
| comp.sample_count += 1 | |
| else: | |
| # Create new compensation entry | |
| self.compensations[key] = PlatformCompensation( | |
| platform=platform, device=device, codec=codec, | |
| latency_ms=measured_latency, compression_smear_ms=15, | |
| audio_start_offset_ms=25, confidence=confidence, | |
| last_calibrated=time.time(), sample_count=1 | |
| ) | |
| # ============================================================================ | |
| # PREDICTIVE FAILURE ENGINE | |
| # ============================================================================ | |
| class PredictiveFailureEngine: | |
| """ | |
| Pre-posting failure prediction engine. | |
| Simulates compression, latency, and fatigue to block risky patterns. | |
| """ | |
| def __init__(self, compensation_engine: PlatformCompensationEngine): | |
| self.compensation_engine = compensation_engine | |
| self.fatigue_threshold = 0.75 # Saturation level that triggers fatigue | |
| self.risk_threshold = 0.6 # Overall risk above which we reject | |
| def check_pattern(self, pattern: CanonicalPattern, target_platform: str = "tiktok", | |
| target_device: str = "ios") -> PredictiveFailureCheck: | |
| """ | |
| Run comprehensive pre-posting failure check. | |
| Returns PredictiveFailureCheck with risk assessment and rejection flags. | |
| """ | |
| rejection_flags = [] | |
| risks = {} | |
| # 1. Compression Risk | |
| compression_risk = self._assess_compression_risk(pattern, target_platform) | |
| risks['compression'] = compression_risk | |
| if compression_risk > 0.7: | |
| rejection_flags.append(f"HIGH_COMPRESSION_RISK: {compression_risk:.2f}") | |
| # 2. Latency Risk | |
| latency_risk = self._assess_latency_risk(pattern, target_platform, target_device) | |
| risks['latency'] = latency_risk | |
| if latency_risk > 0.7: | |
| rejection_flags.append(f"HIGH_LATENCY_RISK: {latency_risk:.2f}") | |
| # 3. Fatigue Risk | |
| fatigue_risk = self._assess_fatigue_risk(pattern) | |
| risks['fatigue'] = fatigue_risk | |
| if fatigue_risk > 0.7: | |
| rejection_flags.append(f"PATTERN_SATURATION: {fatigue_risk:.2f}") | |
| # 4. Platform-Specific Risks | |
| platform_risks = self._assess_platform_specific_risks(pattern, target_platform) | |
| # Overall risk score (weighted combination) | |
| overall_risk = ( | |
| 0.3 * compression_risk + | |
| 0.3 * latency_risk + | |
| 0.4 * fatigue_risk | |
| ) | |
| # Decision: Safe to post? | |
| safe_to_post = overall_risk < self.risk_threshold and len(rejection_flags) == 0 | |
| # Recommended adjustments | |
| adjustments = [] | |
| if compression_risk > 0.5: | |
| adjustments.append("Reduce high-frequency content near drop") | |
| if latency_risk > 0.5: | |
| adjustments.append(f"Apply +{self.compensation_engine.get_compensation(target_platform, target_device).latency_ms:.0f}ms compensation") | |
| if fatigue_risk > 0.5: | |
| adjustments.append("Switch to alternative fork or wait for decay") | |
| return PredictiveFailureCheck( | |
| pattern_id=pattern.pattern_id, | |
| risk_score=overall_risk, | |
| compression_risk=compression_risk, | |
| latency_risk=latency_risk, | |
| fatigue_risk=fatigue_risk, | |
| platform_specific_risks=platform_risks, | |
| rejection_flags=rejection_flags, | |
| safe_to_post=safe_to_post, | |
| recommended_adjustments=adjustments | |
| ) | |
| def _assess_compression_risk(self, pattern: CanonicalPattern, platform: str) -> float: | |
| """Assess risk from codec compression smearing.""" | |
| # Patterns with tight timing windows are more sensitive to compression | |
| drop_window_width = pattern.enforced_drop_window_ms[1] - pattern.enforced_drop_window_ms[0] | |
| if drop_window_width < 50: # Very tight window | |
| return 0.8 | |
| elif drop_window_width < 100: | |
| return 0.5 | |
| else: | |
| return 0.2 | |
| def _assess_latency_risk(self, pattern: CanonicalPattern, platform: str, device: str) -> float: | |
| """Assess risk from platform playback latency.""" | |
| comp = self.compensation_engine.get_compensation(platform, device) | |
| # High latency with low confidence = high risk | |
| risk = (comp.latency_ms / 100.0) * (1.0 - comp.confidence) | |
| return np.clip(risk, 0, 1) | |
| def _assess_fatigue_risk(self, pattern: CanonicalPattern) -> float: | |
| """Assess risk from pattern overuse/saturation.""" | |
| return pattern.saturation_level | |
| def _assess_platform_specific_risks(self, pattern: CanonicalPattern, platform: str) -> Dict[str, float]: | |
| """Assess platform-specific risks.""" | |
| risks = {} | |
| if platform in pattern.platform_success_rates: | |
| success_rate = pattern.platform_success_rates[platform] | |
| risks[platform] = 1.0 - success_rate | |
| else: | |
| risks[platform] = 0.5 # Unknown platform = moderate risk | |
| return risks | |
| # ============================================================================ | |
| # SILENCE ENFORCEMENT ENGINE | |
| # ============================================================================ | |
| class SilenceEnforcementEngine: | |
| """ | |
| Manages silence timing patterns that amplify emotional impact. | |
| Enforces tension-building silence windows. | |
| """ | |
| def __init__(self): | |
| self.silence_patterns: Dict[str, SilenceEnforcementPattern] = {} | |
| def create_silence_pattern( | |
| self, | |
| silence_id: str, | |
| pre_drop_silence_ms: Tuple[float, float], | |
| post_hook_silence_ms: Tuple[float, float], | |
| tension_duration_ms: float, | |
| emotional_impact: float, | |
| platform: str, | |
| niche: str | |
| ) -> SilenceEnforcementPattern: | |
| """Create and register new silence pattern.""" | |
| pattern = SilenceEnforcementPattern( | |
| silence_id=silence_id, | |
| pre_drop_silence_ms=pre_drop_silence_ms, | |
| post_hook_silence_ms=post_hook_silence_ms, | |
| tension_building_duration_ms=tension_duration_ms, | |
| emotional_impact_score=emotional_impact, | |
| platform=platform, | |
| niche=niche, | |
| validation_count=1, | |
| avg_retention_lift=0.0 | |
| ) | |
| self.silence_patterns[silence_id] = pattern | |
| return pattern | |
| def get_optimal_silence(self, niche: str, platform: str) -> Optional[SilenceEnforcementPattern]: | |
| """Get highest-impact silence pattern for niche/platform.""" | |
| candidates = [ | |
| p for p in self.silence_patterns.values() | |
| if p.niche == niche and p.platform == platform and p.validation_count >= 3 | |
| ] | |
| if not candidates: | |
| return None | |
| return max(candidates, key=lambda p: p.emotional_impact_score * p.avg_retention_lift) | |
| def enforce_silence_windows(self, base_timing_ms: float, silence_pattern: SilenceEnforcementPattern) -> Dict[str, Tuple[float, float]]: | |
| """Apply silence enforcement to timing.""" | |
| return { | |
| 'pre_drop_silence': ( | |
| base_timing_ms - silence_pattern.pre_drop_silence_ms[1], | |
| base_timing_ms - silence_pattern.pre_drop_silence_ms[0] | |
| ), | |
| 'post_hook_silence': ( | |
| base_timing_ms + silence_pattern.post_hook_silence_ms[0], | |
| base_timing_ms + silence_pattern.post_hook_silence_ms[1] | |
| ) | |
| } | |
| # ============================================================================ | |
| # VOLATILITY-ADAPTIVE DECAY ENGINE | |
| # ============================================================================ | |
| class VolatilityAdaptiveDecayEngine: | |
| """ | |
| Dynamically adjusts pattern decay rates based on trend volatility. | |
| Fast trends decay in hours, evergreens persist for months. | |
| """ | |
| def __init__(self): | |
| self.niche_volatility = { | |
| 'memes': TrendDecayRate.HYPER_VOLATILE, | |
| 'breaking_news': TrendDecayRate.HYPER_VOLATILE, | |
| 'viral_challenges': TrendDecayRate.VOLATILE, | |
| 'gaming': TrendDecayRate.VOLATILE, | |
| 'music_trends': TrendDecayRate.MODERATE, | |
| 'fitness': TrendDecayRate.STABLE, | |
| 'education': TrendDecayRate.EVERGREEN, | |
| 'asmr': TrendDecayRate.EVERGREEN, | |
| } | |
| self.platform_volatility_multiplier = { | |
| 'tiktok': 1.5, # Faster decay | |
| 'instagram_reels': 1.3, | |
| 'youtube_shorts': 1.0, | |
| 'youtube': 0.7, # Slower decay | |
| } | |
| def get_decay_rate(self, niche: str, platform: str, recent_performance: List[float]) -> TrendDecayRate: | |
| """Determine appropriate decay rate based on niche, platform, and performance.""" | |
| base_rate = self.niche_volatility.get(niche, TrendDecayRate.MODERATE) | |
| platform_mult = self.platform_volatility_multiplier.get(platform, 1.0) | |
| # Analyze performance volatility | |
| if len(recent_performance) > 5: | |
| variance = np.var(recent_performance) | |
| if variance > 0.3: # High variance = volatile | |
| return TrendDecayRate.VOLATILE | |
| elif variance < 0.05: # Low variance = stable | |
| return TrendDecayRate.STABLE | |
| # Adjust base rate by platform | |
| adjusted_value = base_rate.value * platform_mult | |
| if adjusted_value < 0.4: | |
| return TrendDecayRate.HYPER_VOLATILE | |
| elif adjusted_value < 0.75: | |
| return TrendDecayRate.VOLATILE | |
| elif adjusted_value < 0.92: | |
| return TrendDecayRate.MODERATE | |
| elif adjusted_value < 0.98: | |
| return TrendDecayRate.STABLE | |
| else: | |
| return TrendDecayRate.EVERGREEN | |
| def compute_decay_factor(self, pattern: CanonicalPattern, time_since_last_use: float) -> float: | |
| """Compute current decay factor based on volatility-adaptive rate.""" | |
| decay_rate = pattern.decay_rate | |
| # Time-based decay with volatility adjustment | |
| if decay_rate == TrendDecayRate.HYPER_VOLATILE: | |
| half_life = 6 * 3600 # 6 hours | |
| elif decay_rate == TrendDecayRate.VOLATILE: | |
| half_life = 2.5 * 86400 # 2.5 days | |
| elif decay_rate == TrendDecayRate.MODERATE: | |
| half_life = 10 * 86400 # 10 days | |
| elif decay_rate == TrendDecayRate.STABLE: | |
| half_life = 45 * 86400 # 45 days | |
| else: # EVERGREEN | |
| half_life = 120 * 86400 # 120 days | |
| decay_factor = decay_rate.value ** (time_since_last_use / half_life) | |
| # Saturation penalty | |
| saturation_penalty = 1.0 - (0.5 * pattern.saturation_level) | |
| return decay_factor * saturation_penalty | |
| # ============================================================================ | |
| # NEAR-MISS REINFORCEMENT ENGINE | |
| # ============================================================================ | |
| class NearMissReinforcementEngine: | |
| """ | |
| Learns from near-miss failures to refine timing constraints. | |
| "Almost viral" patterns get micro-adjustments. | |
| """ | |
| def __init__(self): | |
| self.adjustment_history: Dict[str, List[NearMissAdjustment]] = defaultdict(list) | |
| self.success_threshold = 0.7 # Performance above this = success | |
| self.near_miss_threshold = 0.5 # Performance above this but below success = near-miss | |
| def analyze_near_miss( | |
| self, | |
| pattern_id: str, | |
| actual_timing_ms: float, | |
| optimal_timing_ms: float, | |
| performance_score: float | |
| ) -> Optional[NearMissAdjustment]: | |
| """Analyze a near-miss failure and suggest correction.""" | |
| if performance_score >= self.success_threshold: | |
| return None # Not a failure | |
| if performance_score < self.near_miss_threshold: | |
| return None # Complete failure, not a near-miss | |
| # This is a near-miss - calculate offset | |
| offset = actual_timing_ms - optimal_timing_ms | |
| # Suggest correction (inverse of offset, but conservative) | |
| suggested_correction = -offset * 0.7 # 70% correction to avoid overcorrection | |
| adjustment = NearMissAdjustment( | |
| original_pattern_id=pattern_id, | |
| failure_offset_ms=offset, | |
| suggested_correction_ms=suggested_correction, | |
| confidence=performance_score, # Higher performance = higher confidence in correction | |
| failure_count=1, | |
| success_after_correction=0 | |
| ) | |
| self.adjustment_history[pattern_id].append(adjustment) | |
| return adjustment | |
| def get_aggregated_correction(self, pattern_id: str) -> Optional[float]: | |
| """Get aggregated correction from multiple near-misses.""" | |
| if pattern_id not in self.adjustment_history: | |
| return None | |
| adjustments = self.adjustment_history[pattern_id] | |
| if not adjustments: | |
| return None | |
| # Weighted average by confidence | |
| total_weight = sum(adj.confidence * adj.failure_count for adj in adjustments) | |
| if total_weight == 0: | |
| return None | |
| weighted_correction = sum( | |
| adj.suggested_correction_ms * adj.confidence * adj.failure_count | |
| for adj in adjustments | |
| ) / total_weight | |
| return weighted_correction | |
| def apply_correction_to_pattern(self, pattern: CanonicalPattern) -> CanonicalPattern: | |
| """Apply aggregated near-miss corrections to pattern constraints.""" | |
| correction = self.get_aggregated_correction(pattern.pattern_id) | |
| if correction is None: | |
| return pattern | |
| # Apply correction to drop window | |
| pattern.enforced_drop_window_ms = ( | |
| pattern.enforced_drop_window_ms[0] + correction, | |
| pattern.enforced_drop_window_ms[1] + correction | |
| ) | |
| # Update forks | |
| for fork in pattern.approved_forks: | |
| fork.offset_ms += correction | |
| fork.drop_window_ms = ( | |
| fork.drop_window_ms[0] + correction, | |
| fork.drop_window_ms[1] + correction | |
| ) | |
| return pattern | |
| # ============================================================================ | |
| # MULTI-FORK LIBRARY MANAGER | |
| # ============================================================================ | |
| class MultiForkLibraryManager: | |
| """ | |
| Manages 3-7 approved timing forks per pattern with win probabilities. | |
| """ | |
| def __init__(self, compensation_engine: PlatformCompensationEngine): | |
| self.compensation_engine = compensation_engine | |
| self.forks: Dict[str, List[ApprovedTimingFork]] = defaultdict(list) | |
| def generate_forks( | |
| self, | |
| base_pattern: CanonicalPattern, | |
| num_forks: int = 5 | |
| ) -> List[ApprovedTimingFork]: | |
| """ | |
| Generate micro-timing fork variants around canonical pattern. | |
| Creates forks at: [-60ms, -30ms, 0ms, +20ms, +50ms] offsets | |
| """ | |
| base_drop = (base_pattern.enforced_drop_window_ms[0] + base_pattern.enforced_drop_window_ms[1]) / 2 | |
| base_silence = base_pattern.enforced_silence_window_ms | |
| offsets = [-60, -30, 0, +20, +50] if num_forks == 5 else [-40, -20, 0, +18, +35, +55, +75] | |
| forks = [] | |
| for i, offset in enumerate(offsets[:num_forks]): | |
| fork_id = f"{base_pattern.pattern_id}_fork_{i}" | |
| # Apply offset to drop window | |
| drop_window = ( | |
| base_pattern.enforced_drop_window_ms[0] + offset, | |
| base_pattern.enforced_drop_window_ms[1] + offset | |
| ) | |
| # Generate platform compensations | |
| platform_comps = {} | |
| for platform in ['tiktok', 'youtube_shorts', 'instagram_reels']: | |
| for device in ['ios', 'android']: | |
| comp = self.compensation_engine.get_compensation(platform, device) | |
| key = f"{platform}_{device}" | |
| platform_comps[key] = comp | |
| fork = ApprovedTimingFork( | |
| fork_id=fork_id, | |
| base_pattern_id=base_pattern.pattern_id, | |
| offset_ms=offset, | |
| drop_window_ms=drop_window, | |
| silence_window_ms=base_silence, | |
| hook_timing_ms=None, | |
| win_probability=0.5 if offset == 0 else 0.3 + 0.2 * np.exp(-abs(offset) / 50), | |
| platform_compensation=platform_comps, | |
| usage_count=0, | |
| last_used=time.time(), | |
| avg_performance=0.5 | |
| ) | |
| forks.append(fork) | |
| self.forks[base_pattern.pattern_id] = forks | |
| return forks | |
| def select_best_fork( | |
| self, | |
| pattern_id: str, | |
| platform: str = "tiktok |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment