Skip to content

Instantly share code, notes, and snippets.

@bogged-broker
Created December 31, 2025 01:19
Show Gist options
  • Select an option

  • Save bogged-broker/805e2a3d5fbaaeb16fed589f40218240 to your computer and use it in GitHub Desktop.

Select an option

Save bogged-broker/805e2a3d5fbaaeb16fed589f40218240 to your computer and use it in GitHub Desktop.
def setup_logging(self):
"""Setup comprehensive logging system"""
log_file = self.storage_path / "pattern_learner.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('AudioPatternLearner')
# ========================================================================
# CORE INGESTION: EXTRACT EVERYTHING WITH SUB-MS PRECISION
# ========================================================================
def ingest_video_comprehensive(self, audio_path: str, video_path: Optional[str],
video_id: str, performance: PerformanceMetrics,
metadata: Dict[str, Any]):
"""
COMPREHENSIVE video ingestion with ALL modalities.
This is the main entry point for learning.
"""
self.logger.info(f"๐Ÿ” Ingesting video {video_id} with full multi-modal extraction")
# 1. Extract micro-timing data (sub-millisecond precision)
timing_data = self.micro_timing_extractor.extract_from_audio(audio_path, video_id)
self.micro_timing_db[video_id] = timing_data
# 2. Extract visual features (if video available)
visual_features = None
if video_path and self.config['enable_multimodal']:
visual_features = self.multimodal_extractor.extract_visual_features(video_path, video_id)
# 3. Extract contextual features
context_features = self.multimodal_extractor.extract_contextual_features(metadata, video_id)
# 4. Learn distributions from timing data
self._learn_timing_distributions(timing_data, metadata.get('niche', 'global'),
metadata.get('platform', 'global'))
# 5. Check for near-miss patterns
if performance.retention_2s > 0.70 and performance.views_total < self.config['viral_threshold']:
self.failure_engine.catalog_near_miss(
video_id, performance.views_total, performance.retention_2s, timing_data
)
self.logger.info(f"๐Ÿ“Š Near-miss cataloged: {video_id}")
# 6. Track distribution shifts (phase detection)
if timing_data.offset_distribution_ms:
self.phase_detector.track_distribution(
'beat_offset',
timing_data.offset_distribution_ms,
metadata.get('niche', 'global')
)
# 7. Extract temporal features for sequence model
temporal_features = TemporalPatternExtractor.extract_temporal_features(timing_data)
# 8. Integrate external trends
trend_boost = 1.0
if self.config['enable_external_trends']:
trend_boost = self.trend_integrator.compute_trend_boost(
metadata.get('audio_track'),
metadata.get('niche', ''),
metadata.get('hashtags', [])
)
# 9. Store in prioritized replay buffer
record = {
'video_id': video_id,
'timing_data': timing_data,
'temporal_features': temporal_features,
'visual_features': visual_features,
'context_features': context_features,
'performance': performance,
'metadata': metadata,
'trend_boost': trend_boost,
'timestamp': datetime.now(),
'priority': performance.viral_score * trend_boost # Priority by viral potential
}
self.replay_buffer.append(record)
self.priority_weights[video_id] = record['priority']
# 10. Update niche statistics
self._update_niche_stats(metadata.get('niche', 'global'), performance)
# 11. Track pattern lifecycle
pattern_key = f"{metadata.get('niche')}_{metadata.get('platform')}"
if pattern_key in self.discovered_patterns:
self.lifecycle_manager.track_pattern_lifecycle(
pattern_key,
performance.viral_score
)
self.logger.info(f"โœ… Video {video_id} fully ingested: viral_score={performance.viral_score:.2f}, "
f"trend_boost={trend_boost:.2f}")
def _learn_timing_distributions(self, timing_data: MicroTimingData, niche: str, platform: str):
"""Learn full distributions (NOT averages) from timing data"""
# Beat-to-syllable offsets
if timing_data.offset_distribution_ms:
self.distribution_learner.learn_distribution(
'beat_offset', timing_data.offset_distribution_ms, niche, platform
)
# Pre-hook silence
if timing_data.pre_hook_silence_ms:
self.distribution_learner.learn_distribution(
'pre_hook_silence', timing_data.pre_hook_silence_ms, niche, platform
)
# Hook beat phase
if timing_data.hook_beat_phase:
self.distribution_learner.learn_distribution(
'hook_phase', timing_data.hook_beat_phase, niche, platform
)
# Stress cadence
if timing_data.stress_cadence_ms:
self.distribution_learner.learn_distribution(
'stress_cadence', timing_data.stress_cadence_ms, niche, platform
)
def _update_niche_stats(self, niche: str, performance: PerformanceMetrics):
"""Update niche performance statistics"""
stats = self.niche_performance[niche]
stats['total_videos'] += 1
n = stats['total_videos']
stats['avg_views'] = ((n - 1) * stats['avg_views'] + performance.views_total) / n
# ========================================================================
# TEMPORAL SEQUENCE MODEL TRAINING
# ========================================================================
def train_temporal_models(self):
"""Train LSTM/Attention models on temporal sequences"""
if not self.config['enable_temporal_models']:
return
if len(self.replay_buffer) < 50:
self.logger.warning("Insufficient data for temporal model training")
return
self.logger.info("๐Ÿง  Training temporal sequence models...")
# Prepare temporal sequences
sequences = []
targets = []
for record in self.replay_buffer:
temporal_seq = record['temporal_features']
viral_score = record['performance'].viral_score
if temporal_seq is not None and temporal_seq.shape[0] > 0:
sequences.append(temporal_seq)
targets.append(viral_score)
if len(sequences) < 20:
return
# Convert to tensors
X = torch.FloatTensor(np.array(sequences)) # (batch, seq_len, features)
y = torch.FloatTensor(targets).unsqueeze(1) # (batch, 1)
# Train for multiple epochs
self.temporal_model.train()
for epoch in range(10):
self.temporal_optimizer.zero_grad()
predictions = self.temporal_model(X)
loss = nn.MSELoss()(predictions, y)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.temporal_model.parameters(), 1.0)
self.temporal_optimizer.step()
if epoch % 3 == 0:
self.logger.info(f" Temporal model epoch {epoch}: loss={loss.item():.4f}")
self.logger.info("โœ… Temporal model training complete")
# ========================================================================
# MULTI-MODAL PREDICTION WITH BAYESIAN CONFIDENCE
# ========================================================================
def predict_viral_probability_multimodal(self, audio_path: str, video_path: Optional[str],
metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
CRITICAL: Full multi-modal prediction with Bayesian uncertainty.
Returns comprehensive prediction with confidence intervals.
"""
video_id = f"predict_{int(datetime.now().timestamp())}"
# Extract all features
timing_data = self.micro_timing_extractor.extract_from_audio(audio_path, video_id)
temporal_features = TemporalPatternExtractor.extract_temporal_features(timing_data)
visual_features = None
if video_path and self.config['enable_multimodal']:
visual_features = self.multimodal_extractor.extract_visual_features(video_path, video_id)
context_features = self.multimodal_extractor.extract_contextual_features(metadata, video_id)
# Temporal model prediction with uncertainty
X_temporal = torch.FloatTensor(temporal_features).unsqueeze(0) # (1, seq_len, features)
mean_pred, std_pred, ci_width = self.bayesian_estimator.predict_with_uncertainty(X_temporal)
confidence = self.bayesian_estimator.compute_confidence_score(std_pred, mean_pred)
# Get trend boost
trend_boost = 1.0
if self.config['enable_external_trends']:
trend_boost = self.trend_integrator.compute_trend_boost(
metadata.get('audio_track'),
metadata.get('niche', ''),
metadata.get('hashtags', [])
)
# Adjust prediction with trends
adjusted_score = mean_pred * trend_boost
# Find similar patterns for context
similar_patterns = self._find_similar_patterns_from_timing(
timing_data,
metadata.get('niche', ''),
metadata.get('platform', '')
)
# Generate timing hypotheses
timing_hypotheses = self._generate_timing_hypotheses(timing_data, metadata)
# Decision logic with gating
decision, reason = self._make_posting_decision(
adjusted_score, confidence, context_features, similar_patterns
)
# RL policy suggestion
rl_adjustments = None
if len(self.rl_reward_buffer) > 20:
state = self._build_rl_state(timing_data, context_features, mean_pred)
action = self.rl_policy.select_action(state, explore=False)
rl_adjustments = self.rl_policy.decode_action_to_params(action)
return {
'predicted_viral_score': float(adjusted_score),
'base_prediction': float(mean_pred),
'uncertainty_std': float(std_pred),
'confidence_interval_width': float(ci_width),
'confidence': float(confidence),
'trend_boost': float(trend_boost),
'decision': decision,
'reason': reason,
'similar_patterns': similar_patterns,
'timing_hypotheses': [h.to_dict() for h in timing_hypotheses],
'rl_adjustments': rl_adjustments,
'micro_timing_summary': {
'first_hook_ms': timing_data.hook_onsets_ms[0] if timing_data.hook_onsets_ms else None,
'avg_beat_offset_ms': float(np.mean(timing_data.offset_distribution_ms)) if timing_data.offset_distribution_ms else 0,
'pre_hook_silence_avg_ms': float(np.mean(timing_data.pre_hook_silence_ms)) if timing_data.pre_hook_silence_ms else 0
},
'phase_shift_warnings': self._check_phase_shifts(metadata.get('niche', 'global'))
}
def _make_posting_decision(self, viral_score: float, confidence: float,
context: ContextualFeatures, similar_patterns: List[Dict]) -> Tuple[str, str]:
"""
GATING LOGIC: Decide POST / REVISE / HOLD based on comprehensive analysis.
"""
# Check confidence threshold
if confidence < 0.5:
return "REVISE", f"Low confidence ({confidence:.2%}) - pattern uncertain"
# Check viral threshold
if viral_score >= 5.0 and confidence >= 0.7:
return "POST", f"High viral probability ({viral_score:.1f}) with strong confidence"
if viral_score >= 3.5 and confidence >= 0.6:
# Check if we're in viral window
if context.is_viral_window:
return "POST", f"Good viral potential ({viral_score:.1f}) during optimal posting window"
else:
return "HOLD", f"Good potential ({viral_score:.1f}) but wait for viral window"
# Check if similar patterns exist
if similar_patterns and len(similar_patterns) >= 2:
avg_similar_score = np.mean([p['viral_score'] for p in similar_patterns])
if avg_similar_score >= 4.0:
return "POST", f"Strong similar patterns exist (avg score: {avg_similar_score:.1f})"
if viral_score < 2.0:
return "REVISE", f"Low viral probability ({viral_score:.1f}) - needs optimization"
return "REVISE", f"Moderate potential ({viral_score:.1f}) - suggest improvements"
def _find_similar_patterns_from_timing(self, timing_data: MicroTimingData,
niche: str, platform: str) -> List[Dict[str, Any]]:
"""Find similar patterns based on timing similarity"""
similar = []
for pattern_id, pattern in self.discovered_patterns.items():
if pattern.niche != niche or pattern.platform != platform:
continue
# Compute timing similarity
similarity_score = self._compute_timing_similarity(timing_data, pattern)
if similarity_score > 0.7:
similar.append({
'pattern_id': pattern_id,
'similarity': float(similarity_score),
'viral_score': pattern.viral_efficacy_score,
'confidence': pattern.confidence
})
# Sort by similarity
similar.sort(key=lambda x: x['similarity'], reverse=True)
return similar[:5]
def _compute_timing_similarity(self, timing_data: MicroTimingData, pattern: AudioPattern) -> float:
"""Compute similarity between timing data and pattern"""
similarity = 0.0
count = 0
# Hook timing similarity
if timing_data.hook_onsets_ms and pattern.hook_timings:
first_hook_diff = abs(timing_data.hook_onsets_ms[0] - pattern.hook_timings[0])
hook_sim = 1.0 - min(first_hook_diff / 5000, 1.0) # Normalize to 5 seconds
similarity += hook_sim
count += 1
# Beat alignment similarity
if timing_data.offset_distribution_ms:
avg_offset = np.mean(np.abs(timing_data.offset_distribution_ms))
target_offset = (1.0 - pattern.beat_alignment_target) * 100 # Convert to ms
offset_diff = abs(avg_offset - target_offset)
offset_sim = 1.0 - min(offset_diff / 100, 1.0)
similarity += offset_sim
count += 1
return similarity / count if count > 0 else 0.0
def _generate_timing_hypotheses(self, timing_data: MicroTimingData,
metadata: Dict[str, Any]) -> List[TimingHypothesis]:
"""Generate multi-variant timing hypotheses"""
hypotheses = []
niche = metadata.get('niche', 'global')
platform = metadata.get('platform', 'global')
# Get promising ranges from distributions
hook_ranges = self.distribution_learner.get_promising_ranges('pre_hook_silence', niche, platform)
# Generate candidates around current timing
if timing_data.hook_onsets_ms:
base_hook_time = timing_data.hook_onsets_ms[0]
candidates = self.candidate_generator.generate_candidates(
base_hook_time,
hook_ranges,
volatility='medium',
num_candidates=5
)
hypotheses.extend(candidates)
return hypotheses
def _check_phase_shifts(self, niche: str) -> List[Dict[str, Any]]:
"""Check for recent phase shift warnings"""
warnings = []
# Check beat offset shifts
shift = self.phase_detector.detect_phase_shift('beat_offset', niche)
if shift:
warnings.append({
'feature': 'beat_offset',
'trend': shift.trend,
'delta_ms': shift.delta_ms,
'volatility': shift.volatility,
'confidence': shift.confidence
})
return warnings
def _build_rl_state(self, timing_data: MicroTimingData, context: ContextualFeatures,
predicted_score: float) -> np.ndarray:
"""Build state vector for RL policy"""
state = [
timing_data.tempo_bpm / 200.0,
timing_data.hook_onsets_ms[0] / 10000 if timing_data.hook_onsets_ms else 0,
np.mean(timing_data.offset_distribution_ms) / 100 if timing_data.offset_distribution_ms else 0,
context.trending_sound_score,
context.hashtag_momentum,
context.upload_hour / 24.0,
1.0 if context.is_viral_window else 0.0,
predicted_score / 10.0,
*([0] * 22) # Pad to 30 dimensions
]
return np.array(state[:30])
# ========================================================================
# DELAYED REWARD TRACKING & RL UPDATE
# ========================================================================
def update_pattern_performance_with_delayed_rewards(self, video_id: str,
performance_24h: PerformanceMetrics,
performance_72h: Optional[PerformanceMetrics] = None):
"""
Update RL policy with delayed rewards (24h, 72h performance).
CRITICAL for learning true viral patterns.
"""
if video_id not in self.delayed_reward_tracker:
self.delayed_reward_tracker[video_id] = []
# Compute reward at 24h
reward_24h = performance_24h.viral_score / 10.0 # Normalize to 0-1
self.delayed_reward_tracker[video_id].append(reward_24h)
# Compute reward at 72h if available
if performance_72h:
reward_72h = performance_72h.viral_score / 10.0
self.delayed_reward_tracker[video_id].append(reward_72h)
# Update RL policy with delayed rewards
self._update_rl_with_delayed_rewards(video_id)
self.logger.info(f"โœ… Delayed rewards tracked for {video_id}: 24h_reward={reward_24h:.3f}")
def _update_rl_with_delayed_rewards(self, video_id: str):
"""Update RL policy using delayed reward signal"""
if video_id not in self.delayed_reward_tracker:
return
rewards = self.delayed_reward_tracker[video_id]
if len(rewards) < 1:
return
# Find original record
record = None
for r in self.replay_buffer:
if r['video_id'] == video_id:
record = r
break
if not record:
return
# Build state
timing_data = record['timing_data']
context = record['context_features']
state = self._build_rl_state(timing_data, context, np.mean(rewards))
# Get action that was taken (approximate from timing)
action = np.zeros(20)
if timing_data.hook_onsets_ms:
action[3] = (timing_data.hook_onsets_ms[0] - 2000) / 2000 # Hook timing deviation
# Compute discounted reward
gamma = 0.95
if len(rewards) == 2:
total_reward = rewards[0] + gamma * rewards[1]
else:
total_reward = rewards[0]
# Update policy
if len(self.rl_reward_buffer) > 20:
states = np.array([state])
actions = np.array([action])
rewards_array = np.array([total_reward])
loss = self.rl_policy.update_policy(states, actions, rewards_array)
self.logger.info(f" RL policy updated: loss={loss:.4f}, reward={total_reward:.3f}")
self.rl_reward_buffer.append(total_reward)
# ========================================================================
# ACTIONABLE RECOMMENDATIONS FOR TTS/VOICE-SYNC
# ========================================================================
def generate_actionable_recommendations(self, niche: str, platform: str,
beat_type: str) -> Dict[str, Any]:
"""
Generate PRECISE, ACTIONABLE recommendations for TTS/voice-sync engines.
Output includes exact parameter values, not just suggestions.
"""
# Find best pattern
matching_patterns = [
p for p in self.discovered_patterns.values()
if p.niche == niche and p.platform == platform
]
if not matching_patterns:
return self._generate_fallback_recommendations(niche, platform)
# Get top pattern by efficacy * weight * confidence
best_pattern = max(matching_patterns,
key=lambda p: p.viral_efficacy_score * p.weight * p.confidence)
# Get adaptive decay rate from lifecycle
pattern_key = f"{niche}_{platform}"
decay_rate = self.lifecycle_manager.compute_adaptive_decay(pattern_key)
momentum = self.lifecycle_manager.get_pattern_momentum(pattern_key)
# Get distribution-based timing ranges
hook_ranges = self.distribution_learner.get_promising_ranges('pre_hook_silence', niche, platform)
offset_ranges = self.distribution_learner.get_promising_ranges('beat_offset', niche, platform)
# Get RL adjustments
rl_adjustments = {}
if len(self.rl_reward_buffer) > 20:
# Use average state for niche
avg_state = np.zeros(30)
avg_state[0] = best_pattern.optimal_pace / 200.0
avg_state[1] = best_pattern.optimal_pitch_range[0] / 300.0
avg_state[2] = best_pattern.optimal_energy / 1.0
action = self.rl_policy.select_action(avg_state, explore=False)
rl_adjustments = self.rl_policy.decode_action_to_params(action)
# Build comprehensive recommendation
recommendation = {
'niche': niche,
'platform': platform,
'beat_type': beat_type,
'confidence': float(best_pattern.confidence),
'pattern_lifecycle_stage': self.lifecycle_manager.lifecycle_data.get(pattern_key, {}).get('lifecycle_stage', 'unknown'),
'pattern_momentum': float(momentum),
# EXACT TTS PARAMETERS
'tts_parameters': {
'pace_wpm': float(best_pattern.optimal_pace + rl_adjustments.get('pace_adjustment', 0)),
'pitch_base_hz': float(best_pattern.optimal_pitch_range[0] + rl_adjustments.get('pitch_adjustment', 0)),
'pitch_variance_hz': float((best_pattern.optimal_pitch_range[1] - best_pattern.optimal_pitch_range[0]) / 2),
'energy_level': float(np.clip(best_pattern.optimal_energy + rl_adjustments.get('energy_adjustment', 0), 0.4, 0.9)),
'voice_style': 'dynamic' if best_pattern.optimal_energy > 0.7 else 'calm'
},
# EXACT TIMING PARAMETERS
'timing_parameters': {
'first_hook_ms': float(best_pattern.hook_timings[0] if best_pattern.hook_timings else 1500),
'hook_intervals_ms': [float(best_pattern.hook_timings[i+1] - best_pattern.hook_timings[i])
for i in range(len(best_pattern.hook_timings) - 1)] if len(best_pattern.hook_timings) > 1 else [],
'pre_hook_silence_ranges_ms': [(float(r[0]), float(r[1])) for r in hook_ranges[:3]],
'beat_offset_target_ms': float(offset_ranges[0][0]) if offset_ranges else 0.0,
'beat_alignment_error_max': 0.05 # 50ms max deviation
},
# PAUSE PATTERNS
'pause_parameters': [
{
'position_ms': float(pos),
'duration_ms': float(dur * rl_adjustments.get('pause_duration_mult', 1.0))
}
for pos, dur in best_pattern.pause_pattern[:5]
],
# EMPHASIS PATTERNS
'emphasis_parameters': {
'syllable_stress_target': 1.3, # 30% above baseline
'hook_amplitude_boost': float(np.mean(best_pattern.hook_timings[:3]) if best_pattern.hook_timings else 1.2),
'beat_emphasis_positions': best_pattern.hook_timings[:5]
},
# GATING CONDITIONS
'gating': {
'min_confidence_to_post': 0.7,
'require_viral_window': momentum < 0, # Require if declining
'allow_experimental': momentum > 0.5 # Allow if rising
},
# ALTERNATIVES (multi-variant)
'alternative_timings': [
{
'variant': i + 1,
'first_hook_ms': float(best_pattern.hook_timings[0] + var if best_pattern.hook_timings else 1500 + var),
'confidence': float(best_pattern.confidence * (1 - i * 0.1))
}
for i, var in enumerate([-200, -100, 100, 200])
],
# TREND CONTEXT
'trend_signals': {
'pattern_decay_rate': float(decay_rate),
'pattern_momentum': float(momentum),
'external_trend_boost': self.trend_integrator.compute_trend_boost(None, niche, [])
},
# WARNINGS
'warnings': self._get_recommendation_warnings(best_pattern, niche)
}
return recommendation
def _generate_fallback_recommendations(self, niche: str, platform: str) -> Dict[str, Any]:
"""Generate fallback recommendations when no patterns exist"""
return {
'niche': niche,
'platform': platform,
'confidence': 0.3,
'tts_parameters': {
'pace_wpm': 160,
'pitch_base_hz': 220,
'pitch_variance_hz': 40,
'energy_level': 0.7,
'voice_style': 'dynamic'
},
'timing_parameters': {
'first_hook_ms': 1500,
'hook_intervals_ms': [6000, 6000],
'pre_hook_silence_ranges_ms': [(100, 200)],
'beat_offset_target_ms': 0,
'beat_alignment_error_max': 0.1
},
'warnings': ['No established patterns - using baseline']
}
def _get_recommendation_warnings(self, pattern: AudioPattern, niche: str) -> List[str]:
"""Generate warnings for recommendations"""
warnings = []
if pattern.confidence < 0.6:
warnings.append("Pattern confidence below 60% - consider A/B testing")
lifecycle_data = self.lifecycle_manager.lifecycle_data.get(f"{niche}_{pattern.platform}", {})
if lifecycle_data.get('lifecycle_stage') == 'declining':
warnings.append("Pattern is in declining phase - monitor closely")
if pattern.sample_count < 10:
warnings.append(f"Limited samples ({pattern.sample_count}) - pattern may not be stable")
# Check for phase shifts
shift = self.phase_detector.detect_phase_shift('beat_offset', niche)
if shift and shift.volatility == 'high':
warnings.append(f"High timing volatility detected - {shift.trend}")
return warnings
# ========================================================================
# CONTINUOUS CALIBRATION & EVALUATION
# ========================================================================
def evaluate_prediction_calibration(self, hours_back: int = 24) -> Dict[str, float]:
"""
Evaluate how well predictions match actual performance.
CRITICAL for maintaining 5M+ baseline accuracy.
"""
cutoff = datetime.now() - timedelta(hours=hours_back)
# Get recent predictions that have actuals
recent_with_actuals = []
for video_id, rewards in self.delayed_reward_tracker.items():
if not rewards:
continue
# Find original prediction
for record in self.replay_buffer:
if record['video_id'] == video_id and record['timestamp'] >= cutoff:
# Get prediction (would have been made before posting)
predicted = record.get('predicted_score', 0)
actual = rewards[0] * 10 # Convert back to viral score
recent_with_actuals.append((predicted, actual))
# Update calibration tracker
confidence = record.get('confidence', 0.5)
self.calibration_tracker.add_prediction(predicted, actual, confidence)
if not recent_with_actuals:
return {'error': 'No recent predictions with actuals'}
predictions = np.array([p[0] for p in recent_with_actuals])
actuals = np.array([p[1] for p in recent_with_actuals])
# Compute metrics
mae = np.mean(np.abs(predictions - actuals))
rmse = np.sqrt(np.mean((predictions - actuals) ** 2))
# 5M+ classification accuracy
pred_viral = predictions >= 5.0
actual_viral = actuals >= 5.0
viral_accuracy = np.mean(pred_viral == actual_viral)
# Calibration error
calibration_error = self.calibration_tracker.compute_calibration_error()
metrics = {# ============================================================================
# ENHANCED AUDIO PATTERN LEARNER (MAIN ORCHESTRATOR)
# ============================================================================
class AudioPatternLearner:
"""
๐Ÿงฌ THE DISCOVERY / HYPOTHESIS ENGINE
15/10 GOD-TIER FEATURES:
โœ… Sub-millisecond micro-timing extraction
โœ… Temporal sequence modeling (LSTM/Attention)
โœ… Multi-modal integration (audio + video + context)
โœ… Bayesian confidence calibration
โœ… External trend integration (Spotify, TikTok, Google)
โœ… Pattern lifecycle management with adaptive decay
โœ… RL policy with delayed rewards
โœ… Phase-aware embeddings and similarity
โœ… Near-miss cataloging and failure attribution
โœ… Distribution learning (no averages, full histograms)
โœ… Multi-variant candidate generation
โœ… Continuous calibration tracking
โœ… Actionable gating recommendations
This module PROPOSES hypotheses. It does NOT enforce constraints.
Memory decides truth. Learner explores possibilities.
"""
def __init__(self, storage_path: str = "./pattern_learner_data"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(exist_ok=True)
# Core engines
self.micro_timing_extractor = MicroTimingExtractor()
self.distribution_learner = DistributionLearner()
self.failure_engine = FailureAttributionEngine()
self.phase_detector = PhaseShiftDetector()
self.candidate_generator = CandidateGenerator()
self.lifecycle_manager = PatternLifecycleManager()
# Multi-modal processors
self.multimodal_extractor = MultiModalFeatureExtractor()
self.trend_integrator = ExternalTrendIntegrator()
# ML models (stratified by platform/niche)
self.global_predictor = ViralityPredictor()
self.stratified_models: Dict[str, ViralityPredictor] = {}
# Temporal sequence model
self.temporal_model = TemporalSequenceModel(input_dim=10, hidden_dim=128, num_layers=3)
self.temporal_optimizer = optim.Adam(self.temporal_model.parameters(), lr=0.001)
# Embedding models
self.embedding_model = EmbeddingNetwork(input_dim=100, embedding_dim=64)
self.embedding_optimizer = optim.Adam(self.embedding_model.parameters(), lr=0.001)
# RL components
self.rl_policy = RLAudioPolicy(state_dim=30, action_dim=20)
self.rl_reward_buffer: deque = deque(maxlen=1000)
self.delayed_reward_tracker: Dict[str, List[float]] = {} # Track 24h, 72h rewards
# Confidence calibration
self.bayesian_estimator = BayesianConfidenceEstimator(self.temporal_model, n_samples=30)
self.calibration_tracker = CalibrationTracker()
# Pattern storage
self.discovered_patterns: Dict[str, AudioPattern] = {}
self.timing_hypotheses: List[TimingHypothesis] = []
self.near_misses: List[NearMissRecord] = []
self.phase_shift_signals: List[PhaseShiftSignal] = []
# Micro-timing database
self.micro_timing_db: Dict[str, MicroTimingData] = {}
# Replay buffer with prioritization
self.replay_buffer: deque = deque(maxlen=10000)
self.priority_weights: Dict[str, float] = {}
# Feature tracking
self.feature_names: List[str] = []
self.niche_performance: Dict[str, Dict] = defaultdict(lambda: {
'total_videos': 0,
'avg_views': 0.0,
'top_patterns': [],
'model': None,
'lifecycle_stage': 'stable'
})
# Configuration
self.config = {
'viral_threshold': 5_000_000,
'super_viral_threshold': 30_000_000,
'min_sample_size': 20,
'pattern_decay_rate': 0.95,
'trend_window_days': 30,
'confidence_threshold': 0.7,
'update_frequency_hours': 6,
'micro_timing_precision_ms': 1.0,
'enable_temporal_models': True,
'enable_multimodal': True,
'enable_external_trends': True
}
# Logging
self.setup_logging()
self.logger.info("๐Ÿงฌ AudioPatternLearner initialized in DISCOVERY MODE")
def setup_logging(self):
"""Setup comprehensive logging
# Confidence decreases with larger variations
confidence = 1.0 - (i / len(variations)) * 0.3
# Build candidate range
candidate_range = (offset - var * 0.2, offset + var * 0.2)
hypothesis = TimingHypothesis(
hypothesis_id=f"candidate_{int(datetime.now().timestamp())}_{i}_{direction}",
candidate_ranges_ms=[candidate_range],
recommended_offset_ms=offset,
confidence=confidence,
volatility=volatility,
sample_size=0, # Will be updated by memory
trend_direction="stable",
niche="",
platform=""
)
candidates.append(hypothesis)
if len(candidates) >= num_candidates:
break
if len(candidates) >= num_candidates:
break
# Also include distribution-based candidates
for dist_range in distribution_ranges[:3]:
mid_point = (dist_range[0] + dist_range[1]) / 2
hypothesis = TimingHypothesis(
hypothesis_id=f"dist_candidate_{int(datetime.now().timestamp())}_{len(candidates)}",
candidate_ranges_ms=[dist_range],
recommended_offset_ms=mid_point,
confidence=0.7,
volatility=volatility,
sample_size=0,
trend_direction="stable"
)
candidates.append(hypothesis)
return candidates[:num_candidates]
# ============================================================================
# TEMPORAL SEQUENCE MODELS (LSTM/TRANSFORMER)
# ============================================================================
class TemporalSequenceModel(nn.Module):
"""
LSTM-based model for learning temporal patterns in audio sequences.
Captures hook timing, beat progression, emotional arcs over time.
"""
def __init__(self, input_dim: int = 50, hidden_dim: int = 128, num_layers: int = 3, output_dim: int = 1):
super().__init__()
self.lstm = nn.LSTM(
input_dim,
hidden_dim,
num_layers,
batch_first=True,
dropout=0.3,
bidirectional=True
)
self.attention = nn.MultiheadAttention(hidden_dim * 2, num_heads=4, dropout=0.2)
self.fc = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x, return_attention=False):
"""
x: (batch, seq_len, input_dim)
Returns: (batch, output_dim)
"""
lstm_out, _ = self.lstm(x) # (batch, seq_len, hidden_dim*2)
# Apply attention
lstm_out_t = lstm_out.transpose(0, 1) # (seq_len, batch, hidden_dim*2)
attn_out, attn_weights = self.attention(lstm_out_t, lstm_out_t, lstm_out_t)
attn_out = attn_out.transpose(0, 1) # (batch, seq_len, hidden_dim*2)
# Pool across sequence
pooled = torch.mean(attn_out, dim=1) # (batch, hidden_dim*2)
# Final prediction
output = self.fc(pooled)
if return_attention:
return output, attn_weights
return output
class TemporalPatternExtractor:
"""
Extract time-series feature matrices for temporal model training.
Converts micro-timing data into sequence tensors.
"""
@staticmethod
def extract_temporal_features(timing_data: MicroTimingData, max_seq_len: int = 50) -> np.ndarray:
"""
Convert micro-timing data into temporal feature sequence.
Returns: (seq_len, feature_dim) array
"""
features = []
# Create time-indexed features
max_time_ms = max(
max(timing_data.beat_onsets_ms) if timing_data.beat_onsets_ms else 0,
max(timing_data.syllable_onsets_ms) if timing_data.syllable_onsets_ms else 0,
max(timing_data.hook_onsets_ms) if timing_data.hook_onsets_ms else 0
)
if max_time_ms == 0:
return np.zeros((max_seq_len, 10))
# Divide into time windows
window_size_ms = max_time_ms / max_seq_len
for i in range(max_seq_len):
window_start = i * window_size_ms
window_end = (i + 1) * window_size_ms
# Count events in this window
beats_in_window = sum(1 for b in timing_data.beat_onsets_ms if window_start <= b < window_end)
syllables_in_window = sum(1 for s in timing_data.syllable_onsets_ms if window_start <= s < window_end)
hooks_in_window = sum(1 for h in timing_data.hook_onsets_ms if window_start <= h < window_end)
# Get offsets in window
offsets_in_window = [o for o, s in zip(timing_data.offset_distribution_ms, timing_data.syllable_onsets_ms)
if window_start <= s < window_end]
avg_offset = np.mean(offsets_in_window) if offsets_in_window else 0
# Stress ratios in window
stress_in_window = [r for r, s in zip(timing_data.syllable_stress_ratios, timing_data.syllable_onsets_ms[:len(timing_data.syllable_stress_ratios)])
if window_start <= s < window_end]
avg_stress = np.mean(stress_in_window) if stress_in_window else 0
# Build feature vector for this time window
window_features = [
beats_in_window / 10.0, # Normalized
syllables_in_window / 10.0,
hooks_in_window / 5.0,
avg_offset / 100.0, # Normalize to ยฑ1 range
avg_stress,
1.0 if hooks_in_window > 0 else 0.0, # Hook presence
len(offsets_in_window) / 10.0, # Syllable density
window_start / max_time_ms, # Normalized time position
(max_time_ms - window_start) / max_time_ms, # Time remaining
timing_data.tempo_bpm / 200.0 # Normalized tempo
]
features.append(window_features)
return np.array(features)
# ============================================================================
# MULTI-MODAL INTEGRATION (AUDIO + VIDEO + CONTEXT)
# ============================================================================
@dataclass
class VideoVisualFeatures:
"""Visual features extracted from video for multi-modal learning"""
video_id: str
# Scene/cut timing
scene_change_times_ms: List[float]
scene_durations_ms: List[float]
# Motion intensity
motion_intensity_per_second: List[float]
avg_motion_intensity: float
# Visual energy
brightness_variance: float
color_variance: float
# Caption timing (if available)
caption_onsets_ms: List[float]
caption_durations_ms: List[float]
# Thumbnail features
thumbnail_face_count: int
thumbnail_color_dominance: str
extraction_timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class ContextualFeatures:
"""Contextual signals for trend awareness"""
video_id: str
# Trend indicators
trending_sound_score: float # 0-1
hashtag_momentum: float # Rate of hashtag usage growth
topic_velocity: float # How fast topic is trending
# Temporal context
upload_hour: int # 0-23
upload_day_of_week: int # 0-6
is_weekend: bool
is_viral_window: bool # Peak posting hours
# Platform context
platform_saturation: float # Content saturation in niche
niche_momentum: float # Niche trending score
# External signals
google_trends_score: float
spotify_chart_position: Optional[int]
timestamp: datetime = field(default_factory=datetime.now)
class MultiModalFeatureExtractor:
"""Combines audio, video, and contextual features"""
@staticmethod
def extract_visual_features(video_path: str, video_id: str) -> VideoVisualFeatures:
"""
Extract visual features from video file.
In production, this would use CV models (OpenCV, PyTorch vision).
"""
# Placeholder implementation - would need actual video processing
return VideoVisualFeatures(
video_id=video_id,
scene_change_times_ms=[0, 3000, 6000, 10000], # Placeholder
scene_durations_ms=[3000, 3000, 4000, 5000],
motion_intensity_per_second=[0.5] * 15,
avg_motion_intensity=0.5,
brightness_variance=0.3,
color_variance=0.4,
caption_onsets_ms=[1000, 5000, 9000],
caption_durations_ms=[2000, 2000, 2000],
thumbnail_face_count=1,
thumbnail_color_dominance='warm'
)
@staticmethod
def extract_contextual_features(video_metadata: Dict[str, Any], video_id: str) -> ContextualFeatures:
"""Extract contextual features from metadata and external signals"""
upload_time = video_metadata.get('upload_timestamp', datetime.now())
return ContextualFeatures(
video_id=video_id,
trending_sound_score=video_metadata.get('trending_score', 0.0),
hashtag_momentum=video_metadata.get('hashtag_growth', 0.0),
topic_velocity=video_metadata.get('topic_velocity', 0.0),
upload_hour=upload_time.hour,
upload_day_of_week=upload_time.weekday(),
is_weekend=upload_time.weekday() >= 5,
is_viral_window=(9 <= upload_time.hour <= 11) or (18 <= upload_time.hour <= 22),
platform_saturation=video_metadata.get('saturation', 0.5),
niche_momentum=video_metadata.get('niche_momentum', 0.5),
google_trends_score=video_metadata.get('google_trends', 0.0),
spotify_chart_position=video_metadata.get('spotify_position')
)
@staticmethod
def combine_features(audio_features: np.ndarray,
visual_features: VideoVisualFeatures,
context_features: ContextualFeatures) -> np.ndarray:
"""Combine all modalities into joint feature vector"""
# Audio features (already provided)
# Visual features
visual_vec = [
len(visual_features.scene_change_times_ms) / 10.0,
visual_features.avg_motion_intensity,
visual_features.brightness_variance,
visual_features.color_variance,
len(visual_features.caption_onsets_ms) / 10.0,
visual_features.thumbnail_face_count / 5.0
]
# Contextual features
context_vec = [
context_features.trending_sound_score,
context_features.hashtag_momentum,
context_features.topic_velocity,
context_features.upload_hour / 24.0,
context_features.upload_day_of_week / 7.0,
1.0 if context_features.is_weekend else 0.0,
1.0 if context_features.is_viral_window else 0.0,
context_features.platform_saturation,
context_features.niche_momentum,
context_features.google_trends_score
]
# Concatenate all
combined = np.concatenate([
audio_features,
np.array(visual_vec),
np.array(context_vec)
])
return combined
# ============================================================================
# BAYESIAN CONFIDENCE CALIBRATION
# ============================================================================
class BayesianConfidenceEstimator:
"""
Bayesian uncertainty modeling for prediction confidence.
Uses dropout at inference time (MC Dropout).
"""
def __init__(self, model: nn.Module, n_samples: int = 30):
self.model = model
self.n_samples = n_samples
def enable_dropout(self):
"""Enable dropout layers during inference"""
for module in self.model.modules():
if isinstance(module, nn.Dropout):
module.train()
def predict_with_uncertainty(self, X: torch.Tensor) -> Tuple[float, float, float]:
"""
Run multiple forward passes with dropout to estimate uncertainty.
Returns: (mean_prediction, std_deviation, confidence_interval_width)
"""
self.enable_dropout()
predictions = []
with torch.no_grad():
for _ in range(self.n_samples):
output = self.model(X)
predictions.append(output.item())
predictions = np.array(predictions)
mean_pred = np.mean(predictions)
std_pred = np.std(predictions)
# 95% confidence interval
ci_width = 1.96 * std_pred
return mean_pred, std_pred, ci_width
def compute_confidence_score(self, std_pred: float, mean_pred: float) -> float:
"""
Convert uncertainty to confidence score (0-1).
Lower uncertainty = higher confidence.
"""
# Coefficient of variation
cv = std_pred / (abs(mean_pred) + 1e-6)
# Map to confidence (exponential decay)
confidence = np.exp(-cv * 2)
return float(np.clip(confidence, 0.1, 1.0))
class CalibrationTracker:
"""Track prediction calibration over time"""
def __init__(self):
self.predictions: List[Tuple[float, float, float]] = [] # (predicted, actual, confidence)
self.calibration_bins = defaultdict(list)
def add_prediction(self, predicted: float, actual: float, confidence: float):
"""Add a prediction-actual pair"""
self.predictions.append((predicted, actual, confidence))
# Bin by confidence
bin_idx = int(confidence * 10) # 0-10 bins
self.calibration_bins[bin_idx].append((predicted, actual))
def compute_calibration_error(self) -> float:
"""
Compute Expected Calibration Error (ECE).
Measures how well confidence scores match actual accuracy.
"""
if not self.predictions:
return 0.0
ece = 0.0
total_samples = len(self.predictions)
for bin_idx, pairs in self.calibration_bins.items():
if not pairs:
continue
bin_confidence = bin_idx / 10.0
# Compute accuracy in this bin (normalized error)
errors = [abs(pred - actual) / (actual + 1e-6) for pred, actual in pairs]
bin_accuracy = 1.0 - np.mean(errors)
# Weight by bin size
bin_weight = len(pairs) / total_samples
# Add to ECE
ece += bin_weight * abs(bin_accuracy - bin_confidence)
return ece
def get_calibration_curve(self) -> Dict[float, float]:
"""Get confidence vs accuracy curve"""
curve = {}
for bin_idx, pairs in self.calibration_bins.items():
if not pairs:
continue
bin_confidence = bin_idx / 10.0
errors = [abs(pred - actual) / (actual + 1e-6) for pred, actual in pairs]
bin_accuracy = 1.0 - np.mean(errors)
curve[bin_confidence] = bin_accuracy
return curve
# ============================================================================
# EXTERNAL TREND INTEGRATION
# ============================================================================
class ExternalTrendIntegrator:
"""
Integrate external trend signals (Spotify, TikTok, Google Trends).
Boosts pattern weights based on real-time trend data.
"""
def __init__(self):
self.trend_cache: Dict[str, Dict[str, Any]] = {}
self.last_update = datetime.now()
def fetch_spotify_trends(self, track_id: Optional[str] = None) -> Dict[str, Any]:
"""
Fetch Spotify chart data (placeholder - would use Spotify API).
Returns trending score and chart position.
"""
# Placeholder implementation
return {
'trending_score': 0.8,
'chart_position': 15,
'velocity': 'rising',
'timestamp': datetime.now()
}
def fetch_tiktok_trends(self, sound_id: Optional[str] = None) -> Dict[str, Any]:
"""
Fetch TikTok trending sounds (placeholder - would use TikTok API).
"""
return {
'trending_score': 0.9,
'video_count': 1_500_000,
'growth_rate': 0.3, # 30% daily growth
'peak_detected': False,
'timestamp': datetime.now()
}
def fetch_google_trends(self, query: str) -> Dict[str, Any]:
"""
Fetch Google Trends data (placeholder - would use Trends API).
"""
return {
'interest_score': 75, # 0-100
'velocity': 'rising',
'regional_interest': {'US': 85, 'UK': 70},
'timestamp': datetime.now()
}
def compute_trend_boost(self, audio_track: Optional[str], niche: str,
hashtags: List[str]) -> float:
"""
Compute overall trend boost multiplier (1.0 = neutral, >1.0 = trending).
"""
boost = 1.0
# Spotify boost
if audio_track:
spotify_data = self.fetch_spotify_trends(audio_track)
boost *= (1.0 + spotify_data['trending_score'] * 0.3)
# TikTok sound boost
tiktok_data = self.fetch_tiktok_trends()
if tiktok_data['trending_score'] > 0.7:
boost *= 1.2
# Google Trends boost
if hashtags:
for hashtag in hashtags[:3]:
trends_data = self.fetch_google_trends(hashtag)
if trends_data['interest_score'] > 60:
boost *= 1.1
# Cap boost to prevent extreme values
boost = min(boost, 2.0)
return boost
def get_optimal_posting_window(self, platform: str, niche: str) -> Dict[str, Any]:
"""
Determine optimal posting time based on historical trend analysis.
"""
# Placeholder - would use historical analysis
if platform == 'tiktok':
return {
'optimal_hours': [9, 10, 11, 18, 19, 20, 21],
'peak_day': 'Tuesday',
'current_boost': 1.15 if datetime.now().hour in [9, 10, 11, 18, 19, 20, 21] else 1.0
}
elif platform == 'youtube':
return {
'optimal_hours': [14, 15, 16, 17, 20, 21],
'peak_day': 'Friday',
'current_boost': 1.1
}
else:
return {
'optimal_hours': list(range(24)),
'peak_day': 'any',
'current_boost': 1.0
}
# ============================================================================
# PATTERN LIFECYCLE & DECAY MODELING
# ============================================================================
class PatternLifecycleManager:
"""
Manage pattern lifecycles with adaptive decay and reinforcement curves.
Tracks pattern evolution, aging, and trend velocity.
"""
def __init__(self):
self.lifecycle_data: Dict[str, Dict[str, Any]] = {}
def track_pattern_lifecycle(self, pattern_id: str, performance_score: float):
"""Track pattern performance over time"""
if pattern_id not in self.lifecycle_data:
self.lifecycle_data[pattern_id] = {
'creation_time': datetime.now(),
'performance_history': deque(maxlen=50),
'peak_performance': 0.0,
'lifecycle_stage': 'emerging',
'velocity': 0.0,
'acceleration': 0.0
}
data = self.lifecycle_data[pattern_id]
data['performance_history'].append({
'score': performance_score,
'timestamp': datetime.now()
})
# Update peak
if performance_score > data['peak_performance']:
data['peak_performance'] = performance_score
# Compute velocity (rate of change)
if len(data['performance_history']) >= 2:
recent = [p['score'] for p in list(data['performance_history'])[-5:]]
older = [p['score'] for p in list(data['performance_history'])[-10:-5]] if len(data['performance_history']) >= 10 else recent
data['velocity'] = (np.mean(recent) - np.mean(older)) if older else 0.0
# Acceleration (change in velocity)
if len(data['performance_history']) >= 10:
very_old = [p['score'] for p in list(data['performance_history'])[-15:-10]]
old_velocity = (np.mean(older) - np.mean(very_old)) if very_old else 0.0
data['acceleration'] = data['velocity'] - old_velocity
# Determine lifecycle stage
data['lifecycle_stage'] = self._determine_lifecycle_stage(data)
def _determine_lifecycle_stage(self, data: Dict[str, Any]) -> str:
"""Determine current lifecycle stage"""
velocity = data['velocity']
acceleration = data['acceleration']
age_days = (datetime.now() - data['creation_time']).days
recent_performance = list(data['performance_history'])[-5:] if data['performance_history'] else []
avg_recent = np.mean([p['score'] for p in recent_performance]) if recent_performance else 0
# Emerging: new and growing
if age_days < 7 and velocity > 0:
return 'emerging'
# Rising: strong positive velocity
if velocity > 0.1 and acceleration >= 0:
return 'rising'
# Peaked: velocity slowing, performance high
if velocity < 0.05 and avg_recent > data['peak_performance'] * 0.8:
return 'peaked'
# Declining: negative velocity
if velocity < -0.05:
return 'declining'
# Stable: low velocity, consistent performance
if abs(velocity) < 0.05 and age_days > 14:
return 'stable'
return 'unknown'
def compute_adaptive_decay(self, pattern_id: str, base_decay: float = 0.95) -> float:
"""
Compute adaptive decay rate based on lifecycle stage.
Rising patterns decay slower, declining patterns decay faster.
"""
if pattern_id not in self.lifecycle_data:
return base_decay
data = self.lifecycle_data[pattern_id]
stage = data['lifecycle_stage']
velocity = data['velocity']
# Adjust decay based on stage
if stage == 'emerging':
return base_decay ** 0.5 # Decay slower (explore new patterns)
elif stage == 'rising':
return base_decay ** 0.7 # Decay slower
elif stage == 'peaked':
return base_decay # Normal decay
elif stage == 'declining':
return base_decay ** 1.5 # Decay faster
elif stage == 'stable':
return base_decay ** 0.9 # Slightly slower decay
return base_decay
def get_pattern_momentum(self, pattern_id: str) -> float:
"""
Get pattern momentum score (combines velocity and acceleration).
Returns value typically in range [-1, 1].
"""
if pattern_id not in self.lifecycle_data:
return 0.0
data = self.lifecycle_data[pattern_id]
momentum = data['velocity'] * 0.7 + data['acceleration'] * 0.3
return float(np.clip(momentum, -1.0, 1.0))
# ============================================================================
# ENHANCED AUDIO PATTERN LEARNER (MAIN ORCHESTRATOR)
# ============================================================================"""
audio_pattern_learner.py
๐Ÿงฌ THE DISCOVERY / SENSING / HYPOTHESIS ENGINE
This module is ALLOWED to be wrong. Its job is to see reality change before metrics confirm it.
CORE IDENTITY:
โœ… Observes everything
โœ… Measures everything with sub-millisecond precision
โœ… Experiments aggressively
โœ… Detects phase shifts early
โœ… Proposes hypotheses with confidence scores
โœ… Generates multi-variant timing candidates
โŒ NEVER enforces timing
โŒ NEVER blocks posting
โŒ NEVER decides "truth"
โŒ NEVER overrides memory
โŒ NEVER applies hard constraints
This is the CURIOUS, AGGRESSIVE explorer that feeds audio_memory_manager.py
Memory decides truth. Learner proposes hypotheses.
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any, Union
from datetime import datetime, timedelta
from collections import defaultdict, deque
import json
import pickle
from pathlib import Path
import logging
from scipy import stats, signal as scipy_signal
# ML/DL imports
try:
import xgboost as xgb
from sklearn.cluster import KMeans, HDBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.ensemble import IsolationForest
import torch
import torch.nn as nn
import torch.optim as optim
except ImportError:
print("Warning: Some ML libraries not installed. Install: xgboost, scikit-learn, torch")
# Audio processing
try:
import librosa
except ImportError:
print("Warning: Audio libraries not installed. Install: librosa, scipy")
# ============================================================================
# MICRO-TIMING DATA STRUCTURES (SUB-10MS PRECISION)
# ============================================================================
@dataclass
class MicroTimingData:
"""Sub-millisecond precision timing extraction (RAW, NO AGGREGATION)"""
video_id: str
# Raw timestamps (milliseconds)
beat_onsets_ms: List[float]
syllable_onsets_ms: List[float]
hook_onsets_ms: List[float]
pause_onsets_ms: List[float]
drop_onsets_ms: List[float]
# Micro-offsets (beat โ†” syllable alignment)
offset_distribution_ms: List[float] # Each syllable's offset from nearest beat
beat_to_syllable_deltas: List[float]
syllable_to_hook_deltas: List[float]
# Tension/release windows
pre_hook_silence_ms: List[float] # Silence before each hook
pre_drop_silence_ms: List[float] # Tension before drops
post_hook_decay_ms: List[float] # Energy decay after hooks
# Phase alignment
hook_beat_phase: List[float] # Hook position within beat cycle (-180ยฐ to +180ยฐ)
drop_beat_phase: List[float]
# Stress patterns
syllable_stress_ratios: List[float] # Relative amplitude of each syllable
stress_cadence_ms: List[float] # Time between stressed syllables
# Metadata
tempo_bpm: float
extraction_timestamp: datetime
version: str = "learner_v1.0"
def to_dict(self) -> Dict[str, Any]:
"""Convert to JSON-serializable dict"""
return {
'video_id': self.video_id,
'beat_onsets_ms': self.beat_onsets_ms,
'syllable_onsets_ms': self.syllable_onsets_ms,
'hook_onsets_ms': self.hook_onsets_ms,
'pause_onsets_ms': self.pause_onsets_ms,
'drop_onsets_ms': self.drop_onsets_ms,
'offset_distribution_ms': self.offset_distribution_ms,
'beat_to_syllable_deltas': self.beat_to_syllable_deltas,
'syllable_to_hook_deltas': self.syllable_to_hook_deltas,
'pre_hook_silence_ms': self.pre_hook_silence_ms,
'pre_drop_silence_ms': self.pre_drop_silence_ms,
'post_hook_decay_ms': self.post_hook_decay_ms,
'hook_beat_phase': self.hook_beat_phase,
'drop_beat_phase': self.drop_beat_phase,
'syllable_stress_ratios': self.syllable_stress_ratios,
'stress_cadence_ms': self.stress_cadence_ms,
'tempo_bpm': self.tempo_bpm,
'extraction_timestamp': self.extraction_timestamp.isoformat(),
'version': self.version
}
@dataclass
class TimingHypothesis:
"""A proposed timing pattern (UNTRUSTED, for memory to validate)"""
hypothesis_id: str
# Candidate timing ranges
candidate_ranges_ms: List[Tuple[float, float]] # [(min, max), ...]
recommended_offset_ms: float
# Confidence metrics
confidence: float # 0-1
volatility: str # 'low', 'medium', 'high'
sample_size: int
# Source tracking
source: str = "pattern_learner"
niche: str = ""
platform: str = ""
# Trend signals
trend_direction: str = "stable" # 'increasing', 'decreasing', 'stable', 'chaotic'
delta_ms: float = 0.0 # Trend shift magnitude
# Near-miss attribution
near_miss_tags: List[str] = field(default_factory=list)
# Metadata
timestamp: datetime = field(default_factory=datetime.now)
version: str = "learner_v1.0"
def to_dict(self) -> Dict[str, Any]:
return {
'hypothesis_id': self.hypothesis_id,
'candidate_ranges_ms': self.candidate_ranges_ms,
'recommended_offset_ms': self.recommended_offset_ms,
'confidence': self.confidence,
'volatility': self.volatility,
'sample_size': self.sample_size,
'source': self.source,
'niche': self.niche,
'platform': self.platform,
'trend_direction': self.trend_direction,
'delta_ms': self.delta_ms,
'near_miss_tags': self.near_miss_tags,
'timestamp': self.timestamp.isoformat(),
'version': self.version
}
@dataclass
class NearMissRecord:
"""Records videos that "almost worked" for failure attribution"""
video_id: str
views: int
retention_2s: float
# Why it failed
failure_reason: str # 'drop_late', 'hook_early', 'fatigue_onset', 'compression_smear'
estimated_offset_error_ms: float
# What was tried
actual_hook_timing_ms: float
actual_drop_timing_ms: float
beat_alignment_error: float
# Context
niche: str
platform: str
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class PhaseShiftSignal:
"""Early warning signal for timing distribution drift"""
signal_id: str
trend: str # 'beat_anticipation_increasing', 'silence_collapsing', 'variance_exploding'
delta_ms: float
volatility: str
affected_niches: List[str]
confidence: float
timestamp: datetime = field(default_factory=datetime.now)
# ============================================================================
# MICRO-TIMING EXTRACTION ENGINE
# ============================================================================
class MicroTimingExtractor:
"""Extract sub-10ms precision timing data from audio"""
@staticmethod
def extract_from_audio(audio_path: str, video_id: str) -> MicroTimingData:
"""
Extract all micro-timing features with ยฑ1ms precision.
This is RAW extractionโ€”NO aggregation or filtering.
"""
try:
# Load audio
y, sr = librosa.load(audio_path, sr=22050)
# Beat tracking with high precision
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr, units='frames')
beat_times_ms = librosa.frames_to_time(beat_frames, sr=sr) * 1000
# Onset detection (for syllables/hooks)
onset_frames = librosa.onset.onset_detect(y=y, sr=sr, units='frames',
backtrack=False, delta=0.01)
onset_times_ms = librosa.frames_to_time(onset_frames, sr=sr) * 1000
# Spectral flux for hook detection (high energy changes)
spectral_flux = np.diff(librosa.feature.spectral_centroid(y=y, sr=sr)[0])
hook_candidates = np.where(spectral_flux > np.percentile(spectral_flux, 90))[0]
hook_times_ms = librosa.frames_to_time(hook_candidates, sr=sr, hop_length=512) * 1000
# Silence detection (RMS thresholding)
rms = librosa.feature.rms(y=y)[0]
silence_threshold = np.percentile(rms, 15)
silence_frames = np.where(rms < silence_threshold)[0]
silence_times_ms = librosa.frames_to_time(silence_frames, sr=sr, hop_length=512) * 1000
# Group consecutive silences
pause_onsets = []
if len(silence_times_ms) > 0:
current_pause_start = silence_times_ms[0]
for i in range(1, len(silence_times_ms)):
if silence_times_ms[i] - silence_times_ms[i-1] > 100: # >100ms gap
pause_onsets.append(current_pause_start)
current_pause_start = silence_times_ms[i]
pause_onsets.append(current_pause_start)
# Drop detection (sudden energy increases after silence)
energy = librosa.feature.rms(y=y)[0]
energy_diff = np.diff(energy)
drop_candidates = np.where(energy_diff > np.percentile(energy_diff, 95))[0]
drop_times_ms = librosa.frames_to_time(drop_candidates, sr=sr, hop_length=512) * 1000
# Compute micro-offsets (syllable โ†” beat alignment)
offsets = []
beat_to_syl_deltas = []
for onset_ms in onset_times_ms:
# Find nearest beat
if len(beat_times_ms) > 0:
nearest_beat_idx = np.argmin(np.abs(beat_times_ms - onset_ms))
nearest_beat_ms = beat_times_ms[nearest_beat_idx]
offset = onset_ms - nearest_beat_ms
offsets.append(offset)
beat_to_syl_deltas.append(offset)
# Syllable โ†’ hook deltas
syl_to_hook = []
for hook_ms in hook_times_ms[:10]: # First 10 hooks
if len(onset_times_ms) > 0:
nearest_onset_idx = np.argmin(np.abs(onset_times_ms - hook_ms))
nearest_onset_ms = onset_times_ms[nearest_onset_idx]
syl_to_hook.append(hook_ms - nearest_onset_ms)
# Pre-hook silence windows
pre_hook_silence = []
for hook_ms in hook_times_ms[:10]:
# Find most recent pause before this hook
prior_pauses = [p for p in pause_onsets if p < hook_ms]
if prior_pauses:
silence_duration = hook_ms - prior_pauses[-1]
pre_hook_silence.append(silence_duration)
# Pre-drop silence
pre_drop_silence = []
for drop_ms in drop_times_ms[:5]:
prior_pauses = [p for p in pause_onsets if p < drop_ms]
if prior_pauses:
pre_drop_silence.append(drop_ms - prior_pauses[-1])
# Post-hook decay (energy falloff after hook)
post_hook_decay = []
for hook_ms in hook_times_ms[:10]:
hook_frame = librosa.time_to_frames(hook_ms / 1000, sr=sr, hop_length=512)
if hook_frame + 20 < len(energy):
decay_slope = (energy[hook_frame + 20] - energy[hook_frame]) / 20
post_hook_decay.append(abs(decay_slope) * 1000) # Normalize
# Beat phase alignment for hooks
hook_beat_phases = []
for hook_ms in hook_times_ms[:10]:
if len(beat_times_ms) > 1 and tempo > 0:
beat_interval_ms = 60000 / tempo
phase_in_cycle = (hook_ms % beat_interval_ms) / beat_interval_ms
phase_degrees = (phase_in_cycle - 0.5) * 360 # -180 to +180
hook_beat_phases.append(phase_degrees)
# Drop beat phases
drop_beat_phases = []
for drop_ms in drop_times_ms[:5]:
if len(beat_times_ms) > 1 and tempo > 0:
beat_interval_ms = 60000 / tempo
phase_in_cycle = (drop_ms % beat_interval_ms) / beat_interval_ms
phase_degrees = (phase_in_cycle - 0.5) * 360
drop_beat_phases.append(phase_degrees)
# Syllable stress ratios (amplitude relative to local mean)
onset_amplitudes = []
for onset_frame in onset_frames[:50]:
if onset_frame < len(rms):
onset_amplitudes.append(rms[onset_frame])
if onset_amplitudes:
mean_amp = np.mean(onset_amplitudes)
stress_ratios = [amp / (mean_amp + 1e-6) for amp in onset_amplitudes]
else:
stress_ratios = []
# Stress cadence (time between stressed syllables)
stress_cadence = []
stressed_onsets = [onset_times_ms[i] for i, ratio in enumerate(stress_ratios)
if i < len(onset_times_ms) and ratio > 1.2]
if len(stressed_onsets) > 1:
stress_cadence = [stressed_onsets[i+1] - stressed_onsets[i]
for i in range(len(stressed_onsets) - 1)]
# Build MicroTimingData object
timing_data = MicroTimingData(
video_id=video_id,
beat_onsets_ms=beat_times_ms.tolist(),
syllable_onsets_ms=onset_times_ms.tolist(),
hook_onsets_ms=hook_times_ms.tolist(),
pause_onsets_ms=pause_onsets,
drop_onsets_ms=drop_times_ms.tolist(),
offset_distribution_ms=offsets,
beat_to_syllable_deltas=beat_to_syl_deltas,
syllable_to_hook_deltas=syl_to_hook,
pre_hook_silence_ms=pre_hook_silence,
pre_drop_silence_ms=pre_drop_silence,
post_hook_decay_ms=post_hook_decay,
hook_beat_phase=hook_beat_phases,
drop_beat_phase=drop_beat_phases,
syllable_stress_ratios=stress_ratios,
stress_cadence_ms=stress_cadence,
tempo_bpm=float(tempo),
extraction_timestamp=datetime.now()
)
return timing_data
except Exception as e:
logging.error(f"Micro-timing extraction failed for {video_id}: {e}")
# Return empty timing data on failure
return MicroTimingData(
video_id=video_id,
beat_onsets_ms=[],
syllable_onsets_ms=[],
hook_onsets_ms=[],
pause_onsets_ms=[],
drop_onsets_ms=[],
offset_distribution_ms=[],
beat_to_syllable_deltas=[],
syllable_to_hook_deltas=[],
pre_hook_silence_ms=[],
pre_drop_silence_ms=[],
post_hook_decay_ms=[],
hook_beat_phase=[],
drop_beat_phase=[],
syllable_stress_ratios=[],
stress_cadence_ms=[],
tempo_bpm=0.0,
extraction_timestamp=datetime.now()
)
# ============================================================================
# DISTRIBUTION-BASED LEARNING ENGINE (NO AVERAGES)
# ============================================================================
class DistributionLearner:
"""
Learns FULL distributions, not averages.
Tracks histograms, skew, kurtosis, multi-modal clusters.
"""
def __init__(self):
self.distributions: Dict[str, Dict[str, Any]] = defaultdict(dict)
def learn_distribution(self, feature_name: str, values: List[float],
niche: str = "global", platform: str = "global"):
"""Learn full distribution for a feature"""
if not values:
return
key = f"{niche}_{platform}_{feature_name}"
# Compute distribution statistics
values_array = np.array(values)
# Histogram (20 bins)
hist, bin_edges = np.histogram(values_array, bins=20)
# Statistical moments
mean = np.mean(values_array)
std = np.std(values_array)
skewness = stats.skew(values_array)
kurt = stats.kurtosis(values_array)
# Percentiles
percentiles = {
'p10': np.percentile(values_array, 10),
'p25': np.percentile(values_array, 25),
'p50': np.percentile(values_array, 50),
'p75': np.percentile(values_array, 75),
'p90': np.percentile(values_array, 90)
}
# Multi-modal cluster detection (simple)
try:
from sklearn.mixture import GaussianMixture
gmm = GaussianMixture(n_components=min(3, len(values_array) // 10))
gmm.fit(values_array.reshape(-1, 1))
cluster_centers = gmm.means_.flatten().tolist()
except:
cluster_centers = [mean]
# Store distribution
self.distributions[key] = {
'histogram': hist.tolist(),
'bin_edges': bin_edges.tolist(),
'mean': float(mean),
'std': float(std),
'skewness': float(skewness),
'kurtosis': float(kurt),
'percentiles': percentiles,
'cluster_centers': cluster_centers,
'sample_count': len(values_array),
'last_updated': datetime.now()
}
def get_promising_ranges(self, feature_name: str, niche: str = "global",
platform: str = "global", top_k: int = 3) -> List[Tuple[float, float]]:
"""
Return top-K promising ranges (NOT "best"โ€”just "seems promising").
Returns list of (min, max) tuples.
"""
key = f"{niche}_{platform}_{feature_name}"
if key not in self.distributions:
return []
dist = self.distributions[key]
# Build ranges around cluster centers
ranges = []
for center in dist['cluster_centers'][:top_k]:
# Use ยฑ1 std around each cluster
range_min = center - dist['std'] * 0.5
range_max = center + dist['std'] * 0.5
ranges.append((float(range_min), float(range_max)))
return ranges
def detect_distribution_shift(self, feature_name: str, new_values: List[float],
niche: str = "global", platform: str = "global") -> Optional[Dict[str, Any]]:
"""Detect if distribution has shifted significantly"""
key = f"{niche}_{platform}_{feature_name}"
if key not in self.distributions or not new_values:
return None
old_dist = self.distributions[key]
new_mean = np.mean(new_values)
new_std = np.std(new_values)
# Compute shift
mean_shift = new_mean - old_dist['mean']
std_ratio = new_std / (old_dist['std'] + 1e-6)
# Classify shift
if abs(mean_shift) > old_dist['std'] * 0.5:
direction = 'increasing' if mean_shift > 0 else 'decreasing'
else:
direction = 'stable'
if std_ratio > 1.3:
volatility = 'high'
elif std_ratio > 1.1:
volatility = 'medium'
else:
volatility = 'low'
return {
'feature': feature_name,
'trend_direction': direction,
'delta': float(mean_shift),
'volatility': volatility,
'std_ratio': float(std_ratio)
}
# ============================================================================
# NEAR-MISS & FAILURE ATTRIBUTION ENGINE
# ============================================================================
class FailureAttributionEngine:
"""
Aggressively catalog videos that "almost worked".
Track timing errors and attribute failure reasons.
"""
def __init__(self):
self.near_misses: List[NearMissRecord] = []
self.failure_patterns: Dict[str, int] = defaultdict(int)
def catalog_near_miss(self, video_id: str, views: int, retention_2s: float,
timing_data: MicroTimingData, performance_threshold: int = 1_000_000):
"""
Catalog a video that had early traction but failed to hit viral threshold.
"""
# Check if it's a near-miss (high early retention but low final views)
if retention_2s > 0.75 and views < performance_threshold:
# Analyze timing to attribute failure
failure_reason = self._attribute_failure(timing_data, retention_2s)
# Estimate offset error
offset_error = self._estimate_offset_error(timing_data)
record = NearMissRecord(
video_id=video_id,
views=views,
retention_2s=retention_2s,
failure_reason=failure_reason,
estimated_offset_error_ms=offset_error,
actual_hook_timing_ms=timing_data.hook_onsets_ms[0] if timing_data.hook_onsets_ms else 0,
actual_drop_timing_ms=timing_data.drop_onsets_ms[0] if timing_data.drop_onsets_ms else 0,
beat_alignment_error=np.mean(timing_data.offset_distribution_ms) if timing_data.offset_distribution_ms else 0,
niche="", # Would be passed in
platform=""
)
self.near_misses.append(record)
self.failure_patterns[failure_reason] += 1
def _attribute_failure(self, timing_data: MicroTimingData, retention: float) -> str:
"""Determine likely failure reason from timing analysis"""
# Hook timing analysis
if timing_data.hook_onsets_ms and timing_data.hook_onsets_ms[0] > 2500:
return "hook_late"
if timing_data.hook_onsets_ms and timing_data.hook_onsets_ms[0] < 800:
return "hook_early"
# Drop timing
if timing_data.pre_drop_silence_ms and np.mean(timing_data.pre_drop_silence_ms) > 250:
return "drop_late"
# Beat alignment
if timing_data.offset_distribution_ms:
mean_offset = np.mean(np.abs(timing_data.offset_distribution_ms))
if mean_offset > 80:
return "beat_misalignment"
# Fatigue onset (declining energy)
if len(timing_data.post_hook_decay_ms) > 2:
if timing_data.post_hook_decay_ms[1] < timing_data.post_hook_decay_ms[0] * 0.7:
return "fatigue_onset"
return "unknown"
def _estimate_offset_error(self, timing_data: MicroTimingData) -> float:
"""Estimate how many ms off the timing was"""
if not timing_data.offset_distribution_ms:
return 0.0
# Mean absolute offset from perfect beat alignment
return float(np.mean(np.abs(timing_data.offset_distribution_ms)))
def get_common_failure_patterns(self, top_k: int = 5) -> List[Tuple[str, int]]:
"""Return most common failure reasons"""
sorted_failures = sorted(self.failure_patterns.items(), key=lambda x: x[1], reverse=True)
return sorted_failures[:top_k]
# ============================================================================
# PHASE-SHIFT DETECTION ENGINE
# ============================================================================
class PhaseShiftDetector:
"""
Early warning system for timing distribution drift.
Detects when patterns are changing BEFORE metrics confirm it.
"""
def __init__(self):
self.historical_distributions: Dict[str, deque] = defaultdict(lambda: deque(maxlen=50))
self.shift_signals: List[PhaseShiftSignal] = []
def track_distribution(self, feature_name: str, values: List[float],
niche: str = "global", timestamp: datetime = None):
"""Track distribution over time for drift detection"""
if not values:
return
key = f"{niche}_{feature_name}"
snapshot = {
'mean': np.mean(values),
'std': np.std(values),
'median': np.median(values),
'timestamp': timestamp or datetime.now()
}
self.historical_distributions[key].append(snapshot)
def detect_phase_shift(self, feature_name: str, niche: str = "global",
window_size: int = 10) -> Optional[PhaseShiftSignal]:
"""
Detect if timing distribution has drifted significantly.
Returns early warning signal.
"""
key = f"{niche}_{feature_name}"
if key not in self.historical_distributions:
return None
history = list(self.historical_distributions[key])
if len(history) < window_size:
return None
# Compare recent vs older distributions
recent = history[-window_size:]
older = history[-2*window_size:-window_size] if len(history) >= 2*window_size else history[:window_size]
recent_means = [h['mean'] for h in recent]
older_means = [h['mean'] for h in older]
recent_stds = [h['std'] for h in recent]
older_stds = [h['std'] for h in older]
# Detect mean shift
mean_delta = np.mean(recent_means) - np.mean(older_means)
# Detect variance change
std_ratio = np.mean(recent_stds) / (np.mean(older_stds) + 1e-6)
# Classify trend
if abs(mean_delta) > np.std(older_means) * 0.5:
if mean_delta > 0:
trend = f"{feature_name}_increasing"
else:
trend = f"{feature_name}_decreasing"
elif std_ratio > 1.5:
trend = f"{feature_name}_variance_exploding"
elif std_ratio < 0.7:
trend = f"{feature_name}_variance_collapsing"
else:
return None # No significant shift
# Determine volatility
if std_ratio > 1.5 or std_ratio < 0.7:
volatility = 'high'
elif abs(mean_delta) > np.std(older_means):
volatility = 'medium'
else:
volatility = 'low'
# Create signal
signal = PhaseShiftSignal(
signal_id=f"shift_{niche}_{feature_name}_{int(datetime.now().timestamp())}",
trend=trend,
delta_ms=float(mean_delta),
volatility=volatility,
affected_niches=[niche],
confidence=min(len(recent) / window_size, 1.0)
)
self.shift_signals.append(signal)
return signal
# ============================================================================
# MULTI-VARIANT CANDIDATE GENERATOR
# ============================================================================
class CandidateGenerator:
"""
Generate 3-10 timing candidates per segment with confidence scoring.
This is the EXPERIMENTAL proposal engine.
"""
def generate_candidates(self, base_timing_ms: float, distribution_ranges: List[Tuple[float, float]],
volatility: str = "medium", num_candidates: int = 5) -> List[TimingHypothesis]:
"""
Generate multiple timing candidates around base timing and distribution ranges.
"""
candidates = []
# Variation magnitudes based on volatility
if volatility == 'high':
variations = [10, 20, 40, 60, 80]
elif volatility == 'medium':
variations = [10, 20, 40]
else:
variations = [5, 10, 20]
for i, var in enumerate(variations[:num_candidates]):
# Generate positive and negative variants
for direction in [-1, 1]:
offset = base_timing_ms + (var * direction)
# Confidence decreases with larger variations
confidence = 1.0 - (i / len(variations)) * 0 # ========================================================================
# CRITICAL: CONTINUOUS FEEDBACK & CALIBRATION
# ========================================================================
def update_pattern_performance(self, pattern_id: str, actual_metrics: PerformanceMetrics):
"""
Update pattern with actual performance metrics (CLOSES THE FEEDBACK LOOP).
This is called AFTER video posts to calibrate predictions.
"""
if pattern_id not in self.discovered_patterns:
self.logger.warning(f"Pattern {pattern_id} not found for update")
return
pattern = self.discovered_patterns[pattern_id]
# Update pattern statistics with actual performance
n = pattern.sample_count
pattern.sample_count += 1
# Running average update
pattern.avg_views = ((n * pattern.avg_views) + actual_metrics.views_total) / (n + 1)
pattern.avg_completion = ((n * pattern.avg_completion) + actual_metrics.completion_rate) / (n + 1)
# Recalculate viral efficacy with new data
actual_viral_score = actual_metrics.viral_score
pattern.viral_efficacy_score = (
(pattern.avg_views / 10_000_000) * 0.4 +
pattern.avg_completion * 0.3 +
(pattern.sample_count / (pattern.sample_count + 10)) * 0.3 # Sample size confidence
)
# Update confidence based on prediction accuracy
if hasattr(pattern, 'recent_predictions'):
pattern.recent_predictions.append(actual_viral_score)
if len(pattern.recent_predictions) > 10:
pattern.recent_predictions.pop(0)
# Compute prediction variance (lower = more confident)
variance = np.var(pattern.recent_predictions) if len(pattern.recent_predictions) > 2 else 1.0
pattern.confidence = np.clip(1.0 - (variance / 10.0), 0.3, 0.95)
else:
pattern.recent_predictions = [actual_viral_score]
# Boost weight if exceeded 5M threshold
if actual_metrics.views_total >= self.config['viral_threshold']:
pattern.weight *= 1.15
pattern.trend_status = 'rising'
self.logger.info(f"โœ… Pattern {pattern_id} reinforced: {actual_metrics.views_total:,} views")
else:
# Penalize if underperformed
pattern.weight *= 0.90
if pattern.weight < 0.5:
pattern.trend_status = 'declining'
pattern.last_validated = datetime.now()
# Store in performance history
self.pattern_history.append({
'pattern_id': pattern_id,
'predicted': pattern.viral_efficacy_score,
'actual': actual_viral_score,
'error': abs(pattern.viral_efficacy_score - actual_viral_score),
'timestamp': datetime.now()
})
self.logger.info(f"Pattern {pattern_id} updated: efficacy={pattern.viral_efficacy_score:.3f}, "
f"confidence={pattern.confidence:.2%}, weight={pattern.weight:.2f}")
def reinforce_pattern(self, pattern_id: str, reward_value: float):
"""
RL-style pattern reinforcement based on reward signal.
Reward > 0 = exceeded expectations, < 0 = underperformed.
"""
if pattern_id not in self.discovered_patterns:
return
pattern = self.discovered_patterns[pattern_id]
# Apply reward to weight (exponential moving average)
learning_rate = 0.1
pattern.weight = pattern.weight * (1 - learning_rate) + reward_value * learning_rate
pattern.weight = np.clip(pattern.weight, 0.1, 3.0)
# Update trend status based on reward
if reward_value > 1.5:
pattern.trend_status = 'rising'
elif reward_value > 0.8:
pattern.trend_status = 'stable'
else:
pattern.trend_status = 'declining'
# Store reward in RL history
self.rl_reward_history.append({
'pattern_id': pattern_id,
'reward': reward_value,
'weight_after': pattern.weight,
'timestamp': datetime.now()
})
self.logger.info(f"Pattern {pattern_id} reinforced: reward={reward_value:.2f}, new_weight={pattern.weight:.2f}")
def predict_viral_probability(self, audio_features: AudioFeatures,
context: Dict[str, Any]) -> Dict[str, Any]:
"""
CRITICAL: Pre-post viral prediction with decision recommendation.
Returns: predicted_views, confidence, recommendation (POST/REVISE/HOLD).
"""
# Get base prediction
viral_score, confidence, breakdown = self.predict_viral_success(audio_features, return_confidence=True)
# Get recommended profile for comparison
recommended = self.get_recommended_audio_profile(
audio_features.niche,
audio_features.platform,
audio_features.beat_type,
use_rl=True
)
# Compute alignment score with optimal pattern
alignment_score = 0.5
if recommended:
pace_alignment = 1.0 - abs(audio_features.pace_wpm - recommended.pace_wpm) / 50.0
pitch_alignment = 1.0 - abs(audio_features.pitch_mean - recommended.pitch_base) / 100.0
energy_alignment = 1.0 - abs(audio_features.energy_mean - recommended.energy_level) / 0.3
alignment_score = np.clip((pace_alignment + pitch_alignment + energy_alignment) / 3, 0, 1)
# Factor in trend context
trend_boost = 1.0
if context.get('trending_beat', False) or audio_features.is_trending_beat:
trend_boost = 1.2
if context.get('viral_window', False): # Peak posting hours
trend_boost *= 1.1
# Final adjusted prediction
adjusted_score = viral_score * trend_boost * (0.7 + 0.3 * alignment_score)
# Decision logic
if adjusted_score >= 5.0 and confidence >= 0.7:
decision = "POST"
reason = "High viral probability with strong confidence"
elif adjusted_score >= 3.5 and confidence >= 0.6:
decision = "POST"
reason = "Good viral potential, acceptable confidence"
elif adjusted_score >= 2.0 and alignment_score < 0.6:
decision = "REVISE"
reason = "Moderate potential but poor alignment with optimal pattern"
elif confidence < 0.5:
decision = "REVISE"
reason = "Low confidence in prediction"
else:
decision = "HOLD"
reason = "Insufficient viral probability"
return {
'predicted_views': breakdown['predicted_views'],
'predicted_viral_score': adjusted_score,
'confidence': confidence,
'alignment_score': alignment_score,
'trend_boost': trend_boost,
'decision': decision,
'reason': reason,
'breakdown': breakdown,
'recommended_profile': recommended
}
# ========================================================================
# CRITICAL: TREND-AWARE MULTIMODAL CONTEXT
# ========================================================================
def update_trend_context(self, pattern_id: str, trend_data: Dict[str, Any]):
"""
Update pattern with current platform trends and temporal signals.
Trend data includes: trending_sounds, viral_spike_windows, cultural_signals.
"""
if pattern_id not in self.discovered_patterns:
return
pattern = self.discovered_patterns[pattern_id]
# Update trend status based on data
if trend_data.get('is_trending', False):
pattern.weight *= 1.2
pattern.trend_status = 'rising'
if trend_data.get('sound_trending', False):
pattern.weight *= 1.15
# Temporal boost (hour of day, day of week)
hour_boost = trend_data.get('hour_boost', 1.0)
day_boost = trend_data.get('day_boost', 1.0)
pattern.weight *= (hour_boost * day_boost)
# Store trend context in pattern metadata
if not hasattr(pattern, 'trend_metadata'):
pattern.trend_metadata = {}
pattern.trend_metadata.update({
'last_trend_update': datetime.now(),
'trending_score': trend_data.get('trending_score', 0.0),
'platform_saturation': trend_data.get('saturation', 0.0),
'viral_window_active': trend_data.get('viral_window', False)
})
self.logger.info(f"Pattern {pattern_id} trend context updated: weight={pattern.weight:.2f}")
def get_similar_patterns(self, audio_features: AudioFeatures, niche: str,
platform: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
CRITICAL: Cross-video pattern learning via similarity search.
Returns top-K similar high-performing patterns.
"""
# Filter patterns by niche/platform
candidate_patterns = [
p for p in self.discovered_patterns.values()
if p.niche == niche and p.platform == platform and p.avg_views >= 1_000_000
]
if not candidate_patterns:
return []
# Compute similarity scores
similarities = []
for pattern in candidate_patterns:
# Simple similarity based on key features
pace_sim = 1.0 - abs(audio_features.pace_wpm - pattern.optimal_pace) / 100.0
pitch_sim = 1.0 - abs(audio_features.pitch_mean - pattern.optimal_pitch_range[0]) / 150.0
energy_sim = 1.0 - abs(audio_features.energy_mean - pattern.optimal_energy) / 0.5
# Weighted similarity
similarity = np.clip(
pace_sim * 0.35 + pitch_sim * 0.35 + energy_sim * 0.30,
0, 1
)
# Boost by viral efficacy and weight
weighted_similarity = similarity * pattern.viral_efficacy_score * pattern.weight
"""
audio_pattern_learner.py
Autonomous machine learning system for identifying and predicting viral audio patterns.
Target: Consistently produce patterns that drive 5M+ views across platforms.
Core Features:
- High-resolution audio feature ingestion & normalization
- Advanced feature engineering (beat, hook, emotion, spectral)
- Multi-model ML pipeline (XGBoost, LSTM, clustering)
- Pattern discovery & ranking by viral efficacy
- RL integration for continuous optimization
- Real-time API for TTS/voice-sync integration
- Cross-platform, cross-niche pattern learning
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime, timedelta
from collections import defaultdict, deque
import json
import pickle
from pathlib import Path
import logging
# ML/DL imports
try:
import xgboost as xgb
from sklearn.cluster import KMeans, HDBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.ensemble import IsolationForest
import torch
import torch.nn as nn
import torch.optim as optim
except ImportError:
print("Warning: Some ML libraries not installed. Install: xgboost, scikit-learn, torch")
# Audio processing
try:
import librosa
from scipy import signal, stats
except ImportError:
print("Warning: Audio libraries not installed. Install: librosa, scipy")
# ============================================================================
# DATA STRUCTURES
# ============================================================================
@dataclass
class AudioFeatures:
"""Comprehensive audio feature set for virality prediction"""
# Basic features
pace_wpm: float
pitch_mean: float
pitch_variance: float
energy_mean: float
energy_variance: float
tempo_bpm: float
# Hook & timing
hook_timing_seconds: List[float]
hook_emphasis_amplitude: List[float]
hook_pitch_jump: List[float]
pause_durations: List[float]
pause_positions: List[float]
beat_alignment_error: float
syllable_timing: List[float]
# Spectral & timbre
mfcc: np.ndarray
spectral_centroid: np.ndarray
spectral_rolloff: np.ndarray
zero_crossing_rate: np.ndarray
chroma: np.ndarray
harmonic_noise_ratio: float
# Emotion & dynamics
emotion_trajectory: List[str] # ['building', 'peak', 'sustain', 'release']
emotion_intensity: List[float]
voice_tone: str
phoneme_timing: Dict[str, float]
# Context
niche: str
platform: str
beat_type: str
voice_style: str
language: str
music_track: Optional[str]
is_trending_beat: bool
trend_timestamp: datetime
# Embeddings
audio_embedding: Optional[np.ndarray] = None
def to_feature_vector(self) -> np.ndarray:
"""Convert to flat feature vector for ML models"""
features = [
self.pace_wpm,
self.pitch_mean,
self.pitch_variance,
self.energy_mean,
self.energy_variance,
self.tempo_bpm,
np.mean(self.hook_emphasis_amplitude) if self.hook_emphasis_amplitude else 0,
np.mean(self.hook_pitch_jump) if self.hook_pitch_jump else 0,
np.mean(self.pause_durations) if self.pause_durations else 0,
self.beat_alignment_error,
self.harmonic_noise_ratio,
len(self.hook_timing_seconds),
len(self.pause_durations),
np.mean(self.emotion_intensity) if self.emotion_intensity else 0,
]
# Add aggregated spectral features
if self.mfcc is not None:
features.extend(np.mean(self.mfcc, axis=1).tolist()[:13])
else:
features.extend([0] * 13)
return np.array(features)
@dataclass
class PerformanceMetrics:
"""Video performance metrics for learning"""
video_id: str
views_total: int
retention_2s: float
retention_15s: float
completion_rate: float
replay_rate: float
velocity_per_hour: float
velocity_per_day: float
# Social engagement
likes: int
comments: int
shares: int
saves: int
# Platform
platform: str
upload_timestamp: datetime
# Derived metrics
viral_score: float = 0.0
velocity_score: float = 0.0
engagement_ratio: float = 0.0
def __post_init__(self):
"""Calculate derived metrics"""
self.viral_score = (
self.views_total / 1_000_000 * 0.3 +
self.completion_rate * 0.2 +
self.retention_2s * 0.15 +
self.replay_rate * 0.15 +
(self.shares / max(self.views_total, 1)) * 1000 * 0.2
)
self.velocity_score = (
self.velocity_per_hour * 0.4 +
self.velocity_per_day / 24 * 0.6
)
if self.views_total > 0:
self.engagement_ratio = (self.likes + self.comments * 2 + self.shares * 3) / self.views_total
@dataclass
class AudioPattern:
"""Discovered audio pattern with viral efficacy"""
pattern_id: str
niche: str
platform: str
# Pattern characteristics
optimal_pace: float
optimal_pitch_range: Tuple[float, float]
optimal_energy: float
hook_timings: List[float]
pause_pattern: List[Tuple[float, float]] # (position, duration)
beat_alignment_target: float
emotion_arc: List[str]
# Efficacy metrics
viral_efficacy_score: float
sample_count: int
avg_views: float
avg_completion: float
confidence: float
# Temporal
discovered_at: datetime
last_validated: datetime
trend_status: str # 'rising', 'peaked', 'declining', 'stable'
# Weights for RL
weight: float = 1.0
decay_rate: float = 0.95
@dataclass
class PatternRecommendation:
"""Recommendation for TTS/voice-sync engines"""
niche: str
platform: str
beat_type: str
# TTS parameters
pace_wpm: float
pitch_base: float
pitch_variance: float
energy_level: float
voice_style: str
# Timing patterns
hook_placements: List[float]
pause_placements: List[Tuple[float, float]]
emphasis_words: List[str]
# Beat sync
beat_alignment_rules: Dict[str, float]
syllable_timing_guide: Dict[str, float]
# Predicted performance
predicted_viral_score: float
confidence: float
# ============================================================================
# FEATURE ENGINEERING
# ============================================================================
class AudioFeatureEngineering:
"""Advanced feature engineering for virality prediction"""
@staticmethod
def extract_beat_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract beat-related viral signals"""
features = {}
# Beat alignment score
features['beat_alignment_score'] = 1.0 - audio_features.beat_alignment_error
# Off-beat ratio (syllables that don't align)
if audio_features.syllable_timing:
beat_interval = 60.0 / audio_features.tempo_bpm
off_beats = sum(1 for t in audio_features.syllable_timing
if (t % beat_interval) > beat_interval * 0.3)
features['off_beat_ratio'] = off_beats / len(audio_features.syllable_timing)
else:
features['off_beat_ratio'] = 0.0
# Hook-to-beat correlation
if audio_features.hook_timing_seconds:
features['hook_beat_sync'] = np.mean([
1.0 - (t % (60.0 / audio_features.tempo_bpm)) / (60.0 / audio_features.tempo_bpm)
for t in audio_features.hook_timing_seconds
])
else:
features['hook_beat_sync'] = 0.0
# Beat trend match (is this a trending beat pattern?)
features['beat_trend_match'] = 1.0 if audio_features.is_trending_beat else 0.0
# Beat innovation score (novelty detection)
features['beat_innovation'] = audio_features.beat_alignment_error * 0.5 # Placeholder
return features
@staticmethod
def extract_hook_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract hook-related viral signals"""
features = {}
if not audio_features.hook_timing_seconds:
return {k: 0.0 for k in ['hook_count', 'hook_early_placement', 'hook_amplitude_avg',
'hook_pitch_jump_avg', 'hook_spacing_variance']}
features['hook_count'] = len(audio_features.hook_timing_seconds)
features['hook_early_placement'] = 1.0 if audio_features.hook_timing_seconds[0] < 3.0 else 0.0
features['hook_amplitude_avg'] = np.mean(audio_features.hook_emphasis_amplitude)
features['hook_pitch_jump_avg'] = np.mean(audio_features.hook_pitch_jump)
# Hook spacing consistency
if len(audio_features.hook_timing_seconds) > 1:
spacings = np.diff(audio_features.hook_timing_seconds)
features['hook_spacing_variance'] = np.var(spacings)
else:
features['hook_spacing_variance'] = 0.0
return features
@staticmethod
def extract_emotion_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract emotion trajectory features"""
features = {}
if not audio_features.emotion_trajectory or not audio_features.emotion_intensity:
return {k: 0.0 for k in ['emotion_arc_score', 'emotion_peak_early',
'emotion_intensity_avg', 'emotion_variance']}
# Emotion arc score (building -> peak is viral)
arc_map = {'building': 0.3, 'peak': 1.0, 'sustain': 0.7, 'release': 0.5}
arc_scores = [arc_map.get(e, 0.5) for e in audio_features.emotion_trajectory]
features['emotion_arc_score'] = np.mean(arc_scores)
# Peak placement
if 'peak' in audio_features.emotion_trajectory:
peak_idx = audio_features.emotion_trajectory.index('peak')
features['emotion_peak_early'] = 1.0 if peak_idx < len(arc_scores) * 0.4 else 0.0
else:
features['emotion_peak_early'] = 0.0
features['emotion_intensity_avg'] = np.mean(audio_features.emotion_intensity)
features['emotion_variance'] = np.var(audio_features.emotion_intensity)
return features
@staticmethod
def extract_pause_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract pause placement patterns"""
features = {}
if not audio_features.pause_durations or not audio_features.pause_positions:
return {k: 0.0 for k in ['pause_count', 'pause_avg_duration',
'pause_placement_score', 'pause_rewatch_correlation']}
features['pause_count'] = len(audio_features.pause_durations)
features['pause_avg_duration'] = np.mean(audio_features.pause_durations)
# Strategic pause placement (after hooks, before reveals)
strategic_positions = [p for p in audio_features.pause_positions if 2 < p < 8]
features['pause_placement_score'] = len(strategic_positions) / max(len(audio_features.pause_positions), 1)
# Pause-rewatch correlation (placeholder - would need replay timestamp data)
features['pause_rewatch_correlation'] = performance.replay_rate * 0.5
return features
@staticmethod
def extract_spectral_features(audio_features: AudioFeatures) -> Dict[str, float]:
"""Extract spectral/timbre quality features"""
features = {}
if audio_features.spectral_centroid is not None:
features['spectral_centroid_mean'] = np.mean(audio_features.spectral_centroid)
features['spectral_centroid_var'] = np.var(audio_features.spectral_centroid)
else:
features['spectral_centroid_mean'] = 0.0
features['spectral_centroid_var'] = 0.0
if audio_features.spectral_rolloff is not None:
features['spectral_rolloff_mean'] = np.mean(audio_features.spectral_rolloff)
else:
features['spectral_rolloff_mean'] = 0.0
if audio_features.zero_crossing_rate is not None:
features['zero_crossing_mean'] = np.mean(audio_features.zero_crossing_rate)
else:
features['zero_crossing_mean'] = 0.0
features['harmonic_noise_ratio'] = audio_features.harmonic_noise_ratio
return features
@staticmethod
def extract_velocity_features(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Extract velocity and virality signals"""
features = {}
features['velocity_per_hour'] = performance.velocity_per_hour
features['velocity_per_day'] = performance.velocity_per_day
features['velocity_score'] = performance.velocity_score
# Retention correlation
features['retention_2s_rate'] = performance.retention_2s
features['retention_completion_ratio'] = performance.completion_rate / max(performance.retention_2s, 0.01)
# Hook to retention correlation (placeholder)
if audio_features.hook_timing_seconds:
first_hook = audio_features.hook_timing_seconds[0]
features['hook_retention_correlation'] = performance.retention_2s if first_hook < 2.0 else performance.retention_2s * 0.8
else:
features['hook_retention_correlation'] = 0.0
return features
@staticmethod
def compute_full_feature_set(audio_features: AudioFeatures, performance: PerformanceMetrics) -> Dict[str, float]:
"""Compute complete engineered feature set"""
all_features = {}
# Base features
all_features['pace_wpm'] = audio_features.pace_wpm
all_features['pitch_mean'] = audio_features.pitch_mean
all_features['pitch_variance'] = audio_features.pitch_variance
all_features['energy_mean'] = audio_features.energy_mean
all_features['energy_variance'] = audio_features.energy_variance
all_features['tempo_bpm'] = audio_features.tempo_bpm
# Engineered features
all_features.update(AudioFeatureEngineering.extract_beat_features(audio_features, performance))
all_features.update(AudioFeatureEngineering.extract_hook_features(audio_features, performance))
all_features.update(AudioFeatureEngineering.extract_emotion_features(audio_features, performance))
all_features.update(AudioFeatureEngineering.extract_pause_features(audio_features, performance))
all_features.update(AudioFeatureEngineering.extract_spectral_features(audio_features))
all_features.update(AudioFeatureEngineering.extract_velocity_features(audio_features, performance))
# Target metrics
all_features['views_total'] = performance.views_total
all_features['viral_score'] = performance.viral_score
all_features['engagement_ratio'] = performance.engagement_ratio
return all_features
# ============================================================================
# MACHINE LEARNING MODELS
# ============================================================================
class ViralityPredictor:
"""Multi-model ensemble for predicting viral performance with full training"""
def __init__(self):
self.xgb_views_model = None
self.xgb_retention_model = None
self.xgb_velocity_model = None
self.xgb_engagement_model = None
self.scaler = StandardScaler()
self.feature_importance = {}
self.is_trained = False
self.evaluation_metrics = {}
def train(self, X: np.ndarray, y_views: np.ndarray, y_viral: np.ndarray,
y_retention: np.ndarray, y_velocity: np.ndarray, y_engagement: np.ndarray):
"""Train XGBoost models on audio features with full multi-target prediction"""
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
# Scale features
X_scaled = self.scaler.fit_transform(X)
# Split for validation
X_train, X_val, y_v_train, y_v_val = train_test_split(
X_scaled, y_views, test_size=0.2, random_state=42
)
_, _, y_r_train, y_r_val = train_test_split(
X_scaled, y_retention, test_size=0.2, random_state=42
)
_, _, y_vel_train, y_vel_val = train_test_split(
X_scaled, y_velocity, test_size=0.2, random_state=42
)
_, _, y_eng_train, y_eng_val = train_test_split(
X_scaled, y_engagement, test_size=0.2, random_state=42
)
# Train view predictor
self.xgb_views_model = xgb.XGBRegressor(
n_estimators=300,
max_depth=10,
learning_rate=0.03,
subsample=0.8,
colsample_bytree=0.8,
objective='reg:squarederror',
tree_method='hist',
early_stopping_rounds=20
)
self.xgb_views_model.fit(
X_train, y_v_train,
eval_set=[(X_val, y_v_val)],
verbose=False
)
# Train retention predictor
self.xgb_retention_model = xgb.XGBRegressor(
n_estimators=200,
max_depth=8,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8
)
self.xgb_retention_model.fit(X_train, y_r_train)
# Train velocity predictor
self.xgb_velocity_model = xgb.XGBRegressor(
n_estimators=200,
max_depth=8,
learning_rate=0.05
)
self.xgb_velocity_model.fit(X_train, y_vel_train)
# Train engagement predictor
self.xgb_engagement_model = xgb.XGBRegressor(
n_estimators=200,
max_depth=8,
learning_rate=0.05
)
self.xgb_engagement_model.fit(X_train, y_eng_train)
# Store feature importance
if hasattr(self.xgb_views_model, 'feature_importances_'):
self.feature_importance = dict(enumerate(self.xgb_views_model.feature_importances_))
# Compute evaluation metrics
y_v_pred = self.xgb_views_model.predict(X_val)
y_r_pred = self.xgb_retention_model.predict(X_val)
self.evaluation_metrics = {
'views_rmse': np.sqrt(mean_squared_error(y_v_val, y_v_pred)),
'views_mae': mean_absolute_error(y_v_val, y_v_pred),
'views_r2': r2_score(y_v_val, y_v_pred),
'retention_rmse': np.sqrt(mean_squared_error(y_r_val, y_r_pred)),
'retention_r2': r2_score(y_r_val, y_r_pred)
}
self.is_trained = True
def predict_views(self, X: np.ndarray) -> np.ndarray:
"""Predict view count"""
if not self.is_trained:
return np.zeros(X.shape[0])
X_scaled = self.scaler.transform(X)
return self.xgb_views_model.predict(X_scaled)
def predict_retention(self, X: np.ndarray) -> np.ndarray:
"""Predict completion rate"""
if not self.is_trained:
return np.zeros(X.shape[0])
X_scaled = self.scaler.transform(X)
return self.xgb_retention_model.predict(X_scaled)
def predict_velocity(self, X: np.ndarray) -> np.ndarray:
"""Predict velocity score"""
if not self.is_trained:
return np.zeros(X.shape[0])
X_scaled = self.scaler.transform(X)
return self.xgb_velocity_model.predict(X_scaled)
def predict_engagement(self, X: np.ndarray) -> np.ndarray:
"""Predict engagement ratio"""
if not self.is_trained:
return np.zeros(X.shape[0])
X_scaled = self.scaler.transform(X)
return self.xgb_engagement_model.predict(X_scaled)
def predict_viral_score(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Predict comprehensive viral score with confidence intervals"""
views = self.predict_views(X)
retention = self.predict_retention(X)
velocity = self.predict_velocity(X)
engagement = self.predict_engagement(X)
# Composite viral score
viral_score = (
(views / 1_000_000) * 0.35 +
retention * 20 * 0.25 +
velocity / 1000 * 0.20 +
engagement * 1000 * 0.20
)
# Confidence based on model agreement and prediction variance
# Use bootstrap-style confidence estimation
confidence = np.clip(
1.0 - (np.abs(views - np.median(views)) / (np.std(views) + 1e-6)) * 0.1,
0.3, 1.0
)
return viral_score, confidence
def get_top_features(self, n: int = 10) -> List[Tuple[int, float]]:
"""Get top N most important features"""
if not self.feature_importance:
return []
sorted_features = sorted(self.feature_importance.items(), key=lambda x: x[1], reverse=True)
return sorted_features[:n]
def get_evaluation_metrics(self) -> Dict[str, float]:
"""Get model performance metrics"""
return self.evaluation_metrics
class SequenceModel(nn.Module):
"""LSTM for temporal audio pattern learning"""
def __init__(self, input_size: int, hidden_size: int = 128, num_layers: int = 2):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.3)
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
lstm_out, _ = self.lstm(x)
return self.fc(lstm_out[:, -1, :])
class PatternClusterer:
"""Discover audio pattern clusters using embeddings"""
def __init__(self, n_clusters: int = 20):
self.n_clusters = n_clusters
self.kmeans = None
self.cluster_profiles = {}
def fit(self, embeddings: np.ndarray, performance_scores: np.ndarray):
"""Cluster audio patterns and compute cluster performance profiles"""
self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = self.kmeans.fit_predict(embeddings)
# Compute cluster profiles
for cluster_id in range(self.n_clusters):
mask = labels == cluster_id
cluster_scores = performance_scores[mask]
self.cluster_profiles[cluster_id] = {
'count': np.sum(mask),
'avg_score': np.mean(cluster_scores),
'std_score': np.std(cluster_scores),
'viral_probability': np.mean(cluster_scores > 5.0), # >5M views threshold
'centroid': self.kmeans.cluster_centers_[cluster_id]
}
return labels
def get_high_viral_clusters(self, threshold: float = 0.3) -> List[int]:
"""Get cluster IDs with high viral probability"""
return [cid for cid, profile in self.cluster_profiles.items()
if profile['viral_probability'] > threshold]
def predict_cluster(self, embedding: np.ndarray) -> int:
"""Predict cluster for new audio pattern"""
if self.kmeans is None:
return -1
return self.kmeans.predict(embedding.reshape(1, -1))[0]
class AnomalyDetector:
"""Detect unusual audio patterns (overperformers and underperformers)"""
def __init__(self):
self.model = IsolationForest(contamination=0.1, random_state=42)
def fit(self, X: np.ndarray):
"""Train anomaly detector"""
self.model.fit(X)
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict anomalies (-1 = anomaly, 1 = normal)"""
return self.model.predict(X)
def score(self, X: np.ndarray) -> np.ndarray:
"""Get anomaly scores (more negative = more anomalous)"""
return self.model.score_samples(X)
# ============================================================================
# REINFORCEMENT LEARNING COMPONENTS
# ============================================================================
class RLAudioPolicy:
"""
Reinforcement Learning policy for audio parameter optimization.
Maps audio features -> parameter adjustments that maximize viral score.
"""
def __init__(self, state_dim: int = 30, action_dim: int = 20, hidden_dim: int = 128):
self.state_dim = state_dim
self.action_dim = action_dim
# Simple feedforward policy network
self.policy_net = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, action_dim),
nn.Tanh() # Actions in [-1, 1] range
)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=0.001)
self.action_history = []
self.reward_history = []
def select_action(self, state: np.ndarray, explore: bool = True) -> np.ndarray:
"""Select audio parameter adjustments"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
action = self.policy_net(state_tensor).squeeze(0).numpy()
# Add exploration noise
if explore:
action += np.random.normal(0, 0.1, size=action.shape)
action = np.clip(action, -1, 1)
return action
def update_policy(self, states: np.ndarray, actions: np.ndarray, rewards: np.ndarray):
"""Update policy using REINFORCE-style gradient"""
states_tensor = torch.FloatTensor(states)
actions_tensor = torch.FloatTensor(actions)
rewards_tensor = torch.FloatTensor(rewards)
# Normalize rewards
rewards_tensor = (rewards_tensor - rewards_tensor.mean()) / (rewards_tensor.std() + 1e-8)
# Forward pass
predicted_actions = self.policy_net(states_tensor)
# Policy gradient loss
loss = -torch.mean(rewards_tensor.unsqueeze(1) * torch.log(
1 - torch.abs(predicted_actions - actions_tensor) + 1e-8
))
# Backward pass
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0)
self.optimizer.step()
return loss.item()
def decode_action_to_params(self, action: np.ndarray) -> Dict[str, float]:
"""Convert action vector to TTS/voice-sync parameters"""
return {
'pace_adjustment': action[0] * 30, # ยฑ30 WPM
'pitch_adjustment': action[1] * 50, # ยฑ50 Hz
'energy_adjustment': action[2] * 0.2, # ยฑ0.2
'hook_timing_shift': action[3] * 2.0, # ยฑ2 seconds
'pause_duration_mult': 1.0 + action[4] * 0.3, # 0.7x - 1.3x
'emphasis_strength': 0.5 + action[5] * 0.5, # 0-1.0
'beat_alignment_target': 0.9 + action[6] * 0.1, # 0.9-1.0
'emotion_intensity': 0.5 + action[7] * 0.5, # 0-1.0
}
class EmbeddingNetwork(nn.Module):
"""Neural network for learning audio pattern embeddings"""
def __init__(self, input_dim: int, embedding_dim: int = 64):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.BatchNorm1d(128),
nn.Dropout(0.3),
nn.Linear(128, embedding_dim),
)
def forward(self, x):
return self.encoder(x)
# ============================================================================
# CORE PATTERN LEARNER (ENHANCED WITH MISSING FEATURES)
# ============================================================================
class AudioPatternLearner:
"""
Main orchestrator for autonomous viral audio pattern learning.
Continuously learns from video performance and adapts to trends.
"""
def __init__(self, storage_path: str = "./pattern_learner_data"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(exist_ok=True)
# ML models
self.virality_predictor = ViralityPredictor()
self.pattern_clusterer = PatternClusterer(n_clusters=25)
self.anomaly_detector = AnomalyDetector()
# Stratified models (per platform/niche)
self.stratified_models: Dict[str, ViralityPredictor] = {}
# Embedding network for similarity search
self.embedding_model = None
self.embedding_dim = 64
# RL components
self.rl_policy = RLAudioPolicy(action_dim=20)
self.rl_reward_history: deque = deque(maxlen=1000)
# Pattern storage
self.discovered_patterns: Dict[str, AudioPattern] = {}
self.pattern_history: deque = deque(maxlen=10000)
# Feature tracking
self.feature_names: List[str] = []
self.niche_performance: Dict[str, Dict] = defaultdict(lambda: {
'total_videos': 0,
'avg_views': 0.0,
'top_patterns': [],
'model': None
})
# Reinforcement learning components
self.pattern_weights: Dict[str, float] = {}
self.replay_buffer: deque = deque(maxlen=5000)
# Caching
self.embedding_cache: Dict[str, np.ndarray] = {}
# Configuration
self.config = {
'viral_threshold': 5_000_000,
'min_sample_size': 10,
'pattern_decay_rate': 0.95,
'trend_window_days': 30,
'confidence_threshold': 0.7,
'update_frequency_hours': 6
}
# Logging
self.setup_logging()
def setup_logging(self):
"""Setup logging system"""
log_file = self.storage_path / "pattern_learner.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('AudioPatternLearner')
def ingest_video_batch(self, audio_features_list: List[AudioFeatures],
performance_list: List[PerformanceMetrics]):
"""
Ingest batch of video audio records with performance metrics.
Main entry point for continuous learning.
"""
self.logger.info(f"Ingesting batch of {len(audio_features_list)} videos")
if len(audio_features_list) != len(performance_list):
raise ValueError("Audio features and performance lists must be same length")
# Process each video
for audio_feat, perf in zip(audio_features_list, performance_list):
# Compute full feature set
engineered_features = AudioFeatureEngineering.compute_full_feature_set(audio_feat, perf)
# Store in replay buffer
self.replay_buffer.append({
'audio_features': audio_feat,
'performance': perf,
'engineered_features': engineered_features,
'timestamp': datetime.now()
})
# Update niche stats
self._update_niche_stats(audio_feat, perf)
# Trigger learning pipeline
self._train_models()
self._discover_patterns()
self._update_pattern_weights()
self._update_rl_policy()
self._detect_anomalies()
self.logger.info(f"Batch processing complete. Total patterns: {len(self.discovered_patterns)}")
def _train_models(self):
"""Train/update all ML models with full implementation"""
if len(self.replay_buffer) < self.config['min_sample_size']:
self.logger.warning("Insufficient data for training")
return
self.logger.info(f"Training ML models on {len(self.replay_buffer)} samples...")
# Prepare training data with platform/niche stratification
X_list = []
y_views = []
y_viral = []
y_retention = []
y_velocity = []
y_engagement = []
platform_niche_encodings = []
for record in self.replay_buffer:
features = record['engineered_features']
feature_vector = [features.get(k, 0.0) for k in sorted(features.keys())
if k not in ['views_total', 'viral_score', 'engagement_ratio']]
X_list.append(feature_vector)
y_views.append(record['performance'].views_total)
y_viral.append(record['performance'].viral_score)
y_retention.append(record['performance'].completion_rate)
y_velocity.append(record['performance'].velocity_score)
y_engagement.append(record['performance'].engagement_ratio)
# Encode platform + niche for stratified learning
pn_key = f"{record['audio_features'].platform}_{record['audio_features'].niche}"
platform_niche_encodings.append(pn_key)
X = np.array(X_list)
y_views = np.array(y_views)
y_viral = np.array(y_viral)
y_retention = np.array(y_retention)
y_velocity = np.array(y_velocity)
y_engagement = np.array(y_engagement)
# Store feature names
sample_features = self.replay_buffer[0]['engineered_features']
self.feature_names = sorted([k for k in sample_features.keys()
if k not in ['views_total', 'viral_score', 'engagement_ratio']])
# Train virality predictor with multi-target
self.virality_predictor.train(X, y_views, y_viral, y_retention, y_velocity, y_engagement)
# Train platform/niche-specific models
self._train_stratified_models(X, y_views, platform_niche_encodings)
# Train embeddings for similarity search
self._train_embeddings(X, y_views)
# Train pattern clustering
if len(X) >= 50:
embeddings = self._compute_embeddings(X)
self.pattern_clusterer.fit(embeddings, y_views / 1_000_000) # Normalize to millions
self.logger.info(f"Discovered {len(self.pattern_clusterer.cluster_profiles)} clusters")
# Train anomaly detector
self.anomaly_detector.fit(X)
# Evaluate model performance
self._evaluate_models(X, y_views, y_viral)
self.logger.info("โœ… Model training complete")
def _train_stratified_models(self, X: np.ndarray, y_views: np.ndarray,
platform_niche_keys: List[str]):
"""Train separate models for each platform/niche combination"""
from collections import Counter
# Group samples by platform/niche
key_counts = Counter(platform_niche_keys)
for pn_key, count in key_counts.items():
if count < 20: # Need minimum samples
continue
# Extract samples for this platform/niche
indices = [i for i, k in enumerate(platform_niche_keys) if k == pn_key]
X_subset = X[indices]
y_subset = y_views[indices]
# Train specialized model
model = ViralityPredictor()
model.train(
X_subset, y_subset, y_subset / 1_000_000,
np.zeros(len(y_subset)), np.zeros(len(y_subset)), np.zeros(len(y_subset))
)
self.stratified_models[pn_key] = model
# Update niche stats
parts = pn_key.split('_')
if len(parts) >= 2:
niche_key = f"{parts[1]}_{parts[0]}" # niche_platform
self.niche_performance[niche_key]['model'] = model
self.logger.info(f"Trained {len(self.stratified_models)} stratified models")
def _train_embeddings(self, X: np.ndarray, y_views: np.ndarray):
"""Train embedding network for similarity search"""
if self.embedding_model is None:
self.embedding_model = EmbeddingNetwork(X.shape[1], self.embedding_dim)
# Prepare triplet training data (anchor, positive, negative)
optimizer = optim.Adam(self.embedding_model.parameters(), lr=0.001)
criterion = nn.TripletMarginLoss(margin=1.0)
# Simple training: high views = similar embeddings
threshold_5m = 5_000_000
high_performers = X[y_views >= threshold_5m]
low_performers = X[y_views < threshold_5m]
if len(high_performers) > 2 and len(low_performers) > 2:
for epoch in range(50):
# Sample triplets
n_triplets = min(32, len(high_performers) - 1)
anchor_idx = np.random.choice(len(high_performers), n_triplets)
positive_idx = np.random.choice(len(high_performers), n_triplets)
negative_idx = np.random.choice(len(low_performers), n_triplets)
anchors = torch.FloatTensor(high_performers[anchor_idx])
positives = torch.FloatTensor(high_performers[positive_idx])
negatives = torch.FloatTensor(low_performers[negative_idx])
optimizer.zero_grad()
anchor_emb = self.embedding_model(anchors)
positive_emb = self.embedding_model(positives)
negative_emb = self.embedding_model(negatives)
loss = criterion(anchor_emb, positive_emb, negative_emb)
loss.backward()
optimizer.step()
self.logger.info("Embedding model training complete")
def _compute_embeddings(self, X: np.ndarray) -> np.ndarray:
"""Compute embeddings for audio features"""
if self.embedding_model is None:
return X # Fallback to raw features
with torch.no_grad():
X_tensor = torch.FloatTensor(X)
embeddings = self.embedding_model(X_tensor).numpy()
return embeddings
def _evaluate_models(self, X: np.ndarray, y_views: np.ndarray, y_viral: np.ndarray):
"""Evaluate model performance and log metrics"""
predictions = self.virality_predictor.predict_views(X)
# Compute metrics
from sklearn.metrics import mean_absolute_error, r2_score
mae = mean_absolute_error(y_views, predictions)
r2 = r2_score(y_views, predictions)
# Check 5M+ prediction accuracy
threshold = 5_000_000
true_viral = y_views >= threshold
pred_viral = predictions >= threshold
accuracy = np.mean(true_viral == pred_viral)
metrics = {
'mae': float(mae),
'r2': float(r2),
'viral_accuracy': float(accuracy),
'timestamp': datetime.now().isoformat()
}
self.logger.info(f"Model Evaluation: MAE={mae:.0f}, Rยฒ={r2:.3f}, Viral Acc={accuracy:.2%}")
# Log to file
metrics_file = self.storage_path / "model_metrics.jsonl"
with open(metrics_file, 'a') as f:
f.write(json.dumps(metrics) + '\n')
def _update_rl_policy(self):
"""Update RL policy based on recent performance feedback"""
if len(self.replay_buffer) < 50:
return
# Collect recent experiences
recent = list(self.replay_buffer)[-100:]
states = []
actions = []
rewards = []
for record in recent:
# State = audio features
features = record['engineered_features']
state = [features.get(k, 0.0) for k in self.feature_names]
states.append(state)
# Action = deviation from optimal pattern
audio = record['audio_features']
optimal_pattern = self.get_recommended_audio_profile(
audio.niche, audio.platform, audio.beat_type
)
if optimal_pattern:
action = [
(audio.pace_wpm - optimal_pattern.pace_wpm) / 30.0,
(audio.pitch_mean - optimal_pattern.pitch_base) / 50.0,
(audio.energy_mean - optimal_pattern.energy_level) / 0.2,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
]
actions.append(action[:20])
else:
actions.append([0] * 20)
# Reward = viral score (normalized to 0-1)
reward = min(record['performance'].viral_score / 10.0, 1.0)
rewards.append(reward)
# Store for history
self.rl_reward_history.append(reward)
if len(states) > 10:
states = np.array(states[:, :self.rl_policy.state_dim] if states else states)
actions = np.array(actions)
rewards = np.array(rewards)
# Update policy
loss = self.rl_policy.update_policy(states, actions, rewards)
self.logger.info(f"RL policy updated: loss={loss:.4f}, avg_reward={np.mean(rewards):.3f}")
def _discover_patterns(self):
"""Discover viral audio patterns through clustering and analysis"""
if len(self.replay_buffer) < self.config['min_sample_size']:
return
self.logger.info("Discovering audio patterns...")
# Group by niche + platform
niche_platform_groups = defaultdict(list)
for record in self.replay_buffer:
audio = record['audio_features']
key = f"{audio.niche}_{audio.platform}_{audio.beat_type}"
niche_platform_groups[key].append(record)
# Analyze each group
for group_key, records in niche_platform_groups.items():
if len(records) < 5:
continue
# Filter for high performers (>5M views)
high_performers = [r for r in records if r['performance'].views_total >= self.config['viral_threshold']]
if len(high_performers) < 3:
continue
# Extract common patterns
pattern = self._extract_pattern_from_group(group_key, high_performers, records)
if pattern:
self.discovered_patterns[pattern.pattern_id] = pattern
self.logger.info(f"Discovered pattern: {pattern.pattern_id} (efficacy: {pattern.viral_efficacy_score:.3f})")
def _extract_pattern_from_group(self, group_key: str, high_performers: List[Dict],
all_records: List[Dict]) -> Optional[AudioPattern]:
"""Extract common audio pattern from high-performing videos"""
if not high_performers:
return None
# Compute median/mean characteristics
pace_vals = [r['audio_features'].pace_wpm for r in high_performers]
pitch_vals = [r['audio_features'].pitch_mean for r in high_performers]
energy_vals = [r['audio_features'].energy_mean for r in high_performers]
# Hook timing patterns
hook_timings = []
for r in high_performers:
if r['audio_features'].hook_timing_seconds:
hook_timings.extend(r['audio_features'].hook_timing_seconds[:3])
# Pause patterns
pause_patterns = []
for r in high_performers:
audio = r['audio_features']
if audio.pause_positions and audio.pause_durations:
for pos, dur in zip(audio.pause_positions[:5], audio.pause_durations[:5]):
pause_patterns.append((pos, dur))
# Emotion arcs
emotion_arcs = [r['audio_features'].emotion_trajectory for r in high_performers
if r['audio_features'].emotion_trajectory]
# Compute efficacy score
avg_views = np.mean([r['performance'].views_total for r in high_performers])
avg_completion = np.mean([r['performance'].completion_rate for r in high_performers])
viral_efficacy = (
(avg_views / 10_000_000) * 0.4 + # Normalize to 10M
avg_completion * 0.3 +
(len(high_performers) / len(all_records)) * 0.3
)
# Extract niche/platform from group key
parts = group_key.split('_')
niche = parts[0] if len(parts) > 0 else 'unknown'
platform = parts[1] if len(parts) > 1 else 'unknown'
# Create pattern
pattern = AudioPattern(
pattern_id=f"pattern_{group_key}_{int(datetime.now().timestamp())}",
niche=niche,
platform=platform,
optimal_pace=float(np.median(pace_vals)),
optimal_pitch_range=(float(np.percentile(pitch_vals, 25)), float(np.percentile(pitch_vals, 75))),
optimal_energy=float(np.median(energy_vals)),
hook_timings=hook_timings[:5] if hook_timings else [],
pause_pattern=pause_patterns[:5] if pause_patterns else [],
beat_alignment_target=0.95,
emotion_arc=emotion_arcs[0] if emotion_arcs else ['building', 'peak'],
viral_efficacy_score=viral_efficacy,
sample_count=len(high_performers),
avg_views=avg_views,
avg_completion=avg_completion,
confidence=min(len(high_performers) / 20, 1.0),
discovered_at=datetime.now(),
last_validated=datetime.now(),
trend_status='stable',
weight=1.0,
decay_rate=self.config['pattern_decay_rate']
)
return pattern
def _update_pattern_weights(self):
"""Update pattern weights based on recent performance (RL mechanism)"""
current_time = datetime.now()
for pattern_id, pattern in list(self.discovered_patterns.items()):
# Apply temporal decay
days_old = (current_time - pattern.last_validated).days
decay_factor = pattern.decay_rate ** days_old
pattern.weight *= decay_factor
# Remove patterns with very low weight
if pattern.weight < 0.1:
del self.discovered_patterns[pattern_id]
self.logger.info(f"Removed low-weight pattern: {pattern_id}")
continue
# Boost patterns that are still performing
recent_matches = self._find_recent_pattern_matches(pattern)
if recent_matches:
avg_recent_views = np.mean([m['performance'].views_total for m in recent_matches])
if avg_recent_views >= self.config['viral_threshold']:
pattern.weight *= 1.1
pattern.last_validated = current_time
def _find_recent_pattern_matches(self, pattern: AudioPattern, window_days: int = 7) -> List[Dict]:
"""Find recent videos matching this pattern"""
cutoff = datetime.now() - timedelta(days=window_days)
matches = []
for record in self.replay_buffer:
if record['timestamp'] < cutoff:
continue
audio = record['audio_features']
if audio.niche != pattern.niche or audio.platform != pattern.platform:
continue
# Check similarity
if abs(audio.pace_wpm - pattern.optimal_pace) < 20:
if pattern.optimal_pitch_range[0] <= audio.pitch_mean <= pattern.optimal_pitch_range[1]:
matches.append(record)
return matches
def _detect_anomalies(self):
"""Detect anomalous patterns (novel viral strategies)"""
if not self.anomaly_detector.model:
return
X_list = []
records = []
for record in list(self.replay_buffer)[-1000:]:
features = record['engineered_features']
feature_vector = [features.get(k, 0.0) for k in self.feature_names]
X_list.append(feature_vector)
records.append(record)
if not X_list:
return
X = np.array(X_list)
anomaly_scores = self.anomaly_detector.score(X)
# Find overperforming anomalies
for i, (score, record) in enumerate(zip(anomaly_scores, records)):
if score < -0.5 and record['performance'].views_total >= self.config['viral_threshold']:
self.logger.info(f"Anomalous high performer detected: {record['performance'].video_id}")
# Log for further analysis
self._log_anomaly(record, score)
def _log_anomaly(self, record: Dict, anomaly_score: float):
"""Log anomalous pattern for analysis"""
anomaly_log = {
'video_id': record['performance'].video_id,
'views': record['performance'].views_total,
'anomaly_score': float(anomaly_score),
'niche': record['audio_features'].niche,
'platform': record['audio_features'].platform,
'timestamp': datetime.now().isoformat()
}
log_path = self.storage_path / "anomalies.jsonl"
with open(log_path, 'a') as f:
f.write(json.dumps(anomaly_log) + '\n')
def _update_niche_stats(self, audio: AudioFeatures, perf: PerformanceMetrics):
"""Update performance statistics per niche"""
key = f"{audio.niche}_{audio.platform}"
stats = self.niche_performance[key]
stats['total_videos'] += 1
# Running average
n = stats['total_videos']
stats['avg_views'] = ((n - 1) * stats['avg_views'] + perf.views_total) / n
# ========================================================================
# PUBLIC API FOR TTS/VOICE-SYNC INTEGRATION
# ========================================================================
def get_recommended_audio_profile(self, niche: str, platform: str,
beat_type: str, use_rl: bool = True) -> Optional[PatternRecommendation]:
"""
Get recommended audio profile for TTS/voice-sync engines WITH RL optimization.
Returns optimal parameters for generating viral audio.
"""
# Find matching patterns
matching_patterns = [
p for p in self.discovered_patterns.values()
if p.niche == niche and p.platform == platform
]
if not matching_patterns:
self.logger.warning(f"No patterns found for {niche}/{platform}, using global patterns")
# Fallback to any patterns from this platform
matching_patterns = [p for p in self.discovered_patterns.values() if p.platform == platform]
if not matching_patterns:
return None
# Get highest efficacy pattern (weighted by recency)
best_pattern = max(matching_patterns, key=lambda p: p.viral_efficacy_score * p.weight * p.confidence)
# Base recommendation from pattern
base_recommendation = PatternRecommendation(
niche=niche,
platform=platform,
beat_type=beat_type,
pace_wpm=best_pattern.optimal_pace,
pitch_base=best_pattern.optimal_pitch_range[0],
pitch_variance=(best_pattern.optimal_pitch_range[1] - best_pattern.optimal_pitch_range[0]) / 2,
energy_level=best_pattern.optimal_energy,
voice_style='dynamic',
hook_placements=best_pattern.hook_timings[:5],
pause_placements=best_pattern.pause_pattern[:5],
emphasis_words=[],
beat_alignment_rules={'target_error': best_pattern.beat_alignment_target},
syllable_timing_guide={},
predicted_viral_score=best_pattern.viral_efficacy_score,
confidence=best_pattern.confidence
)
# Apply RL policy adjustments
if use_rl and self.rl_policy and len(self.rl_reward_history) > 20:
# Build state from pattern features
state = np.array([
best_pattern.optimal_pace / 200.0,
best_pattern.optimal_pitch_range[0] / 300.0,
best_pattern.optimal_energy,
len(best_pattern.hook_timings) / 10.0,
best_pattern.viral_efficacy_score / 10.0,
best_pattern.confidence,
best_pattern.weight,
*([0] * (self.rl_policy.state_dim - 7)) # Pad to state_dim
])[:self.rl_policy.state_dim]
# Get RL action
action = self.rl_policy.select_action(state, explore=False)
adjustments = self.rl_policy.decode_action_to_params(action)
# Apply adjustments
base_recommendation.pace_wpm += adjustments['pace_adjustment']
base_recommendation.pitch_base += adjustments['pitch_adjustment']
base_recommendation.energy_level += adjustments['energy_adjustment']
# Adjust hook timings
if base_recommendation.hook_placements:
base_recommendation.hook_placements = [
max(0, h + adjustments['hook_timing_shift'])
for h in base_recommendation.hook_placements
]
# Adjust pause durations
if base_recommendation.pause_placements:
base_recommendation.pause_placements = [
(pos, dur * adjustments['pause_duration_mult'])
for pos, dur in base_recommendation.pause_placements
]
# Update beat alignment target
base_recommendation.beat_alignment_rules['target_error'] = adjustments['beat_alignment_target']
self.logger.info(f"Applied RL adjustments: pace={adjustments['pace_adjustment']:.1f}, "
f"pitch={adjustments['pitch_adjustment']:.1f}")
# Clamp to reasonable ranges
base_recommendation.pace_wpm = np.clip(base_recommendation.pace_wpm, 120, 200)
base_recommendation.pitch_base = np.clip(base_recommendation.pitch_base, 100, 400)
base_recommendation.energy_level = np.clip(base_recommendation.energy_level, 0.4, 0.9)
self.logger.info(f"Generated recommendation for {niche}/{platform}: "
f"score={base_recommendation.predicted_viral_score:.3f}, "
f"confidence={base_recommendation.confidence:.2%}")
return base_recommendation
def find_similar_viral_patterns(self, audio_features: AudioFeatures, top_k: int = 5) -> List[Tuple[str, float]]:
"""
Find top K most similar historical viral patterns using embeddings.
Returns list of (pattern_id, similarity_score) tuples.
"""
if not self.discovered_patterns:
return []
# Compute embedding for query audio
dummy_perf = PerformanceMetrics(
video_id='query',
views_total=0,
retention_2s=0,
retention_15s=0,
completion_rate=0,
replay_rate=0,
velocity_per_hour=0,
velocity_per_day=0,
likes=0,
comments=0,
shares=0,
saves=0,
platform=audio_features.platform,
upload_timestamp=datetime.now()
)
query_features = AudioFeatureEngineering.compute_full_feature_set(audio_features, dummy_perf)
query_vector = np.array([query_features.get(k, 0.0) for k in self.feature_names]).reshape(1, -1)
query_embedding = self._compute_embeddings(query_vector)[0]
# Compute similarities to all patterns
similarities = []
for pattern_id, pattern in self.discovered_patterns.items():
# Build pattern feature vector
pattern_features = {
'pace_wpm': pattern.optimal_pace,
'pitch_mean': pattern.optimal_pitch_range[0],
'pitch_variance': (pattern.optimal_pitch_range[1] - pattern.optimal_pitch_range[0]) / 2,
'energy_mean': pattern.optimal_energy,
'beat_alignment_score': pattern.beat_alignment_target,
**{k: 0.0 for k in self.feature_names if k not in ['pace_wpm', 'pitch_mean', 'pitch_variance', 'energy_mean']}
}
pattern_vector = np.array([pattern_features.get(k, 0.0) for k in self.feature_names]).reshape(1, -1)
pattern_embedding = self._compute_embeddings(pattern_vector)[0]
# Cosine similarity
similarity = np.dot(query_embedding, pattern_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(pattern_embedding) + 1e-8
)
similarities.append((pattern_id, float(similarity)))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
def get_optimization_suggestions(self, audio_features: AudioFeatures) -> Dict[str, Any]:
"""
Analyze audio features and provide specific optimization suggestions.
Returns actionable recommendations to improve viral potential.
"""
suggestions = {
'current_score': 0.0,
'potential_score': 0.0,
'improvements': [],
'warnings': [],
'similar_successful': []
}
# Get current prediction
current_score, confidence, breakdown = self.predict_viral_success(audio_features, return_confidence=True)
suggestions['current_score'] = current_score
suggestions['confidence'] = confidence
# Find similar successful patterns
similar = self.find_similar_viral_patterns(audio_features, top_k=3)
suggestions['similar_successful'] = [
{
'pattern_id': pid,
'similarity': sim,
'efficacy': self.discovered_patterns[pid].viral_efficacy_score
}
for pid, sim in similar if pid in self.discovered_patterns
]
# Get recommended profile
recommended = self.get_recommended_audio_profile(
audio_features.niche,
audio_features.platform,
audio_features.beat_type
)
if recommended:
# Compare and suggest improvements
if abs(audio_features.pace_wpm - recommended.pace_wpm) > 15:
suggestions['improvements'].append({
'parameter': 'pace',
'current': audio_features.pace_wpm,
'recommended': recommended.pace_wpm,
'impact': 'high',
'reason': f"Optimal pace for {audio_features.niche} is {recommended.pace_wpm:.0f} WPM"
})
if abs(audio_features.pitch_mean - recommended.pitch_base) > 30:
suggestions['improvements'].append({
'parameter': 'pitch',
'current': audio_features.pitch_mean,
'recommended': recommended.pitch_base,
'impact': 'medium',
'reason': f"Pitch should be around {recommended.pitch_base:.0f} Hz for better retention"
})
if abs(audio_features.energy_mean - recommended.energy_level) > 0.15:
suggestions['improvements'].append({
'parameter': 'energy',
'current': audio_features.energy_mean,
'recommended': recommended.energy_level,
'impact': 'medium',
'reason': f"Energy level should be {recommended.energy_level:.2f} for maximum engagement"
})
# Check hook timing
if audio_features.hook_timing_seconds and audio_features.hook_timing_seconds[0] > 3.0:
suggestions['warnings'].append({
'issue': 'late_first_hook',
'severity': 'high',
'message': f"First hook at {audio_features.hook_timing_seconds[0]:.1f}s - should be <2s for viral potential"
})
# Estimate potential score with improvements
suggestions['potential_score'] = current_score * 1.3 # Rough estimate
return suggestions
def predict_viral_success(self, audio_features: AudioFeatures,
return_confidence: bool = True) -> Union[float, Tuple[float, float, Dict]]:
"""
Predict viral success score for given audio features with confidence and breakdown.
Returns score 0-10+ (higher = more likely to hit 5M+ views).
"""
if not self.virality_predictor.is_trained:
self.logger.warning("Model not trained yet")
return 0.0 if not return_confidence else (0.0, 0.0, {})
# Create dummy performance for feature engineering
dummy_perf = PerformanceMetrics(
video_id='prediction',
views_total=0,
retention_2s=0.8,
retention_15s=0.5,
completion_rate=0.3,
replay_rate=0.1,
velocity_per_hour=1000,
velocity_per_day=10000,
likes=0,
comments=0,
shares=0,
saves=0,
platform=audio_features.platform,
upload_timestamp=datetime.now()
)
# Compute features
engineered_features = AudioFeatureEngineering.compute_full_feature_set(audio_features, dummy_perf)
feature_vector = [engineered_features.get(k, 0.0) for k in self.feature_names]
X = np.array(feature_vector).reshape(1, -1)
# Try stratified model first
pn_key = f"{audio_features.platform}_{audio_features.niche}"
if pn_key in self.stratified_models:
predicted_views = self.stratified_models[pn_key].predict_views(X)[0]
predicted_retention = self.stratified_models[pn_key].predict_retention(X)[0]
else:
predicted_views = self.virality_predictor.predict_views(X)[0]
predicted_retention = self.virality_predictor.predict_retention(X)[0]
predicted_velocity = self.virality_predictor.predict_velocity(X)[0]
predicted_engagement = self.virality_predictor.predict_engagement(X)[0]
# Compute composite viral score
viral_score = (
(predicted_views / 1_000_000) * 0.40 +
predicted_retention * 15 * 0.30 +
predicted_velocity / 500 * 0.15 +
predicted_engagement * 500 * 0.15
)
if not return_confidence:
return float(viral_score)
# Compute confidence based on pattern similarity
confidence = 0.5 # Default
# Check if similar patterns exist
matching_patterns = [
p for p in self.discovered_patterns.values()
if p.niche == audio_features.niche and p.platform == audio_features.platform
]
if matching_patterns:
# High confidence if features match known patterns
best_pattern = max(matching_patterns, key=lambda p: p.viral_efficacy_score)
pace_match = 1.0 - abs(audio_features.pace_wpm - best_pattern.optimal_pace) / 50.0
pitch_match = 1.0 if best_pattern.optimal_pitch_range[0] <= audio_features.pitch_mean <= best_pattern.optimal_pitch_range[1] else 0.5
confidence = np.clip((pace_match + pitch_match) / 2, 0.3, 0.95)
# Build detailed breakdown
breakdown = {
'predicted_views': float(predicted_views),
'predicted_retention': float(predicted_retention),
'predicted_velocity': float(predicted_velocity),
'predicted_engagement': float(predicted_engagement),
'model_used': 'stratified' if pn_key in self.stratified_models else 'global',
'similar_patterns_found': len(matching_patterns),
'confidence_level': 'high' if confidence > 0.7 else 'medium' if confidence > 0.5 else 'low'
}
return float(viral_score), float(confidence), breakdown
def get_top_patterns(self, n: int = 10, niche: Optional[str] = None) -> List[AudioPattern]:
"""Get top N patterns by viral efficacy"""
patterns = list(self.discovered_patterns.values())
if niche:
patterns = [p for p in patterns if p.niche == niche]
patterns.sort(key=lambda p: p.viral_efficacy_score * p.weight, reverse=True)
return patterns[:n]
def get_feature_importance(self) -> Dict[str, float]:
"""Get feature importance rankings"""
if not self.virality_predictor.feature_importance:
return {}
importance_dict = {}
for idx, importance in self.virality_predictor.feature_importance.items():
if idx < len(self.feature_names):
importance_dict[self.feature_names[idx]] = float(importance)
return dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True))
def get_niche_performance_summary(self) -> Dict[str, Dict]:
"""Get performance summary for all niches"""
return dict(self.niche_performance)
def schedule_continuous_learning(self, check_interval_hours: int = 6):
"""
Schedule continuous learning updates.
In production, this would be called by a scheduler (cron, Airflow, etc.)
"""
from datetime import timedelta
last_update = datetime.now()
def check_and_update():
nonlocal last_update
current_time = datetime.now()
if (current_time - last_update).total_hours() >= check_interval_hours:
self.logger.info("๐Ÿ”„ Running scheduled model update...")
# Retrain models with latest data
if len(self.replay_buffer) >= self.config['min_sample_size']:
self._train_models()
self._discover_patterns()
self._update_pattern_weights()
self._update_rl_policy()
# Save updated state
self.save_state()
last_update = current_time
self.logger.info("โœ… Scheduled update complete")
return check_and_update
def evaluate_recent_predictions(self, hours_back: int = 24) -> Dict[str, float]:
"""
Evaluate how well recent predictions matched actual performance.
Critical for maintaining 5M+ baseline accuracy.
"""
cutoff = datetime.now() - timedelta(hours=hours_back)
recent_records = [r for r in self.replay_buffer if r['timestamp'] >= cutoff]
if not recent_records:
return {'error': 'No recent data'}
# Compare predictions vs actuals
predictions = []
actuals = []
for record in recent_records:
audio = record['audio_features']
perf = record['performance']
# Get prediction
predicted_score, confidence, _ = self.predict_viral_success(audio, return_confidence=True)
actual_score = perf.views_total / 1_000_000
predictions.append(predicted_score)
actuals.append(actual_score)
predictions = np.array(predictions)
actuals = np.array(actuals)
# Compute metrics
from sklearn.metrics import mean_absolute_error, r2_score
mae = mean_absolute_error(actuals, predictions)
r2 = r2_score(actuals, predictions)
# 5M+ classification accuracy
threshold = 5.0
pred_viral = predictions >= threshold
actual_viral = actuals >= threshold
viral_accuracy = np.mean(pred_viral == actual_viral)
metrics = {
'samples': len(recent_records),
'mae_millions': float(mae),
'r2_score': float(r2),
'viral_5m_accuracy': float(viral_accuracy),
'false_positives': int(np.sum((predictions >= threshold) & (actuals < threshold))),
'false_negatives': int(np.sum((predictions < threshold) & (actuals >= threshold))),
'time_window_hours': hours_back
}
self.logger.info(f"Prediction Evaluation ({hours_back}h): MAE={mae:.2f}M, Rยฒ={r2:.3f}, "
f"5M+ Acc={viral_accuracy:.2%}")
return metrics
# ========================================================================
# PERSISTENCE & STATE MANAGEMENT
# ========================================================================
def save_state(self):
"""Save learner state to disk"""
self.logger.info("Saving learner state...")
state = {
'discovered_patterns': {k: self._pattern_to_dict(v) for k, v in self.discovered_patterns.items()},
'niche_performance': dict(self.niche_performance),
'feature_names': self.feature_names,
'config': self.config,
'timestamp': datetime.now().isoformat()
}
with open(self.storage_path / 'learner_state.json', 'w') as f:
json.dump(state, f, indent=2)
# Save models
if self.virality_predictor.is_trained:
with open(self.storage_path / 'virality_predictor.pkl', 'wb') as f:
pickle.dump(self.virality_predictor, f)
self.logger.info("State saved successfully")
def load_state(self):
"""Load learner state from disk"""
state_file = self.storage_path / 'learner_state.json'
if not state_file.exists():
self.logger.warning("No saved state found")
return
self.logger.info("Loading learner state...")
with open(state_file, 'r') as f:
state = json.load(f)
self.discovered_patterns = {k: self._dict_to_pattern(v) for k, v in state['discovered_patterns'].items()}
self.niche_performance = defaultdict(lambda: {'total_videos': 0, 'avg_views': 0.0, 'top_patterns': []},
state['niche_performance'])
self.feature_names = state['feature_names']
self.config.update(state['config'])
# Load models
model_file = self.storage_path / 'virality_predictor.pkl'
if model_file.exists():
with open(model_file, 'rb') as f:
self.virality_predictor = pickle.load(f)
self.logger.info(f"State loaded: {len(self.discovered_patterns)} patterns")
def _pattern_to_dict(self, pattern: AudioPattern) -> Dict:
"""Convert AudioPattern to JSON-serializable dict"""
return {
'pattern_id': pattern.pattern_id,
'niche': pattern.niche,
'platform': pattern.platform,
'optimal_pace': pattern.optimal_pace,
'optimal_pitch_range': pattern.optimal_pitch_range,
'optimal_energy': pattern.optimal_energy,
'hook_timings': pattern.hook_timings,
'pause_pattern': pattern.pause_pattern,
'beat_alignment_target': pattern.beat_alignment_target,
'emotion_arc': pattern.emotion_arc,
'viral_efficacy_score': pattern.viral_efficacy_score,
'sample_count': pattern.sample_count,
'avg_views': pattern.avg_views,
'avg_completion': pattern.avg_completion,
'confidence': pattern.confidence,
'discovered_at': pattern.discovered_at.isoformat(),
'last_validated': pattern.last_validated.isoformat(),
'trend_status': pattern.trend_status,
'weight': pattern.weight,
'decay_rate': pattern.decay_rate
}
def _dict_to_pattern(self, d: Dict) -> AudioPattern:
"""Convert dict back to AudioPattern"""
return AudioPattern(
pattern_id=d['pattern_id'],
niche=d['niche'],
platform=d['platform'],
optimal_pace=d['optimal_pace'],
optimal_pitch_range=tuple(d['optimal_pitch_range']),
optimal_energy=d['optimal_energy'],
hook_timings=d['hook_timings'],
pause_pattern=[tuple(p) for p in d['pause_pattern']],
beat_alignment_target=d['beat_alignment_target'],
emotion_arc=d['emotion_arc'],
viral_efficacy_score=d['viral_efficacy_score'],
sample_count=d['sample_count'],
avg_views=d['avg_views'],
avg_completion=d['avg_completion'],
confidence=d['confidence'],
discovered_at=datetime.fromisoformat(d['discovered_at']),
last_validated=datetime.fromisoformat(d['last_validated']),
trend_status=d['trend_status'],
weight=d['weight'],
decay_rate=d['decay_rate']
)
# ============================================================================
# EXAMPLE USAGE & INTEGRATION
# ============================================================================
if __name__ == "__main__":
print("=" * 80)
print("๐ŸŽฏ AUDIO PATTERN LEARNER - Production Ready for 5M+ Views")
print("=" * 80)
# Initialize learner
learner = AudioPatternLearner(storage_path="./viral_audio_learner")
# Load previous state if exists
learner.load_state()
# === SIMULATION: Ingest batch of videos ===
print("\n๐Ÿ“Š Simulating video batch ingestion...")
# Create sample high-performer (7.5M views)
sample_audio_viral = AudioFeatures(
pace_wpm=165,
pitch_mean=220.0,
pitch_variance=50.0,
energy_mean=0.75,
energy_variance=0.15,
tempo_bpm=128,
hook_timing_seconds=[1.2, 7.8, 14.5],
hook_emphasis_amplitude=[0.95, 0.88, 0.82],
hook_pitch_jump=[55, 48, 42],
pause_durations=[0.35, 0.5, 0.4],
pause_positions=[4.8, 11.5, 19.2],
beat_alignment_error=0.03,
syllable_timing=[0.12, 0.28, 0.45, 0.62, 0.78],
mfcc=np.random.randn(13, 100),
spectral_centroid=np.random.randn(100) + 2000,
spectral_rolloff=np.random.randn(100) + 4000,
zero_crossing_rate=np.random.randn(100) * 0.1 + 0.15,
chroma=np.random.randn(12, 100),
harmonic_noise_ratio=0.88,
emotion_trajectory=['building', 'peak', 'sustain', 'peak'],
emotion_intensity=[0.65, 0.92, 0.78, 0.88],
voice_tone='energetic',
phoneme_timing={'a': 0.1, 'e': 0.15, 'i': 0.12},
niche='finance',
platform='tiktok',
beat_type='trap',
voice_style='male_young',
language='en',
music_track='trending_beat_001',
is_trending_beat=True,
trend_timestamp=datetime.now()
)
sample_perf_viral = PerformanceMetrics(
video_id='vid_viral_001',
views_total=7_500_000,
retention_2s=0.87,
retention_15s=0.58,
completion_rate=0.38,
replay_rate=0.14,
velocity_per_hour=18500,
velocity_per_day=225000,
likes=520000,
comments=15000,
shares=92000,
saves=135000,
platform='tiktok',
upload_timestamp=datetime.now()
)
# Create sample low-performer (100K views)
sample_audio_low = AudioFeatures(
pace_wpm=140,
pitch_mean=180.0,
pitch_variance=30.0,
energy_mean=0.55,
energy_variance=0.08,
tempo_bpm=100,
hook_timing_seconds=[5.2, 15.8],
hook_emphasis_amplitude=[0.65, 0.60],
hook_pitch_jump=[25, 20],
pause_durations=[0.2],
pause_positions=[10.0],
beat_alignment_error=0.15,
syllable_timing=[0.2, 0.4, 0.6],
mfcc=np.random.randn(13, 100),
spectral_centroid=np.random.randn(100) + 1500,
spectral_rolloff=np.random.randn(100) + 3000,
zero_crossing_rate=np.random.randn(100) * 0.1 + 0.12,
chroma=np.random.randn(12, 100),
harmonic_noise_ratio=0.72,
emotion_trajectory=['steady', 'building'],
emotion_intensity=[0.5, 0.6],
voice_tone='calm',
phoneme_timing={'a': 0.15, 'e': 0.18},
niche='finance',
platform='tiktok',
beat_type='lofi',
voice_style='male_mature',
language='en',
music_track=None,
is_trending_beat=False,
trend_timestamp=datetime.now()
)
sample_perf_low = PerformanceMetrics(
video_id='vid_low_001',
views_total=120_000,
retention_2s=0.62,
retention_15s=0.35,
completion_rate=0.18,
replay_rate=0.04,
velocity_per_hour=250,
velocity_per_day=3500,
likes=3200,
comments=150,
shares=280,
saves=420,
platform='tiktok',
upload_timestamp=datetime.now()
)
# Ingest batch (simulate 50 videos)
audio_batch = [sample_audio_viral] * 30 + [sample_audio_low] * 20
perf_batch = [sample_perf_viral] * 30 + [sample_perf_low] * 20
learner.ingest_video_batch(audio_batch, perf_batch)
# === TEST 1: Get recommendation ===
print("\n" + "=" * 80)
print("๐ŸŽค TEST 1: Get Recommended Audio Profile")
print("=" * 80)
recommendation = learner.get_recommended_audio_profile('finance', 'tiktok', 'trap', use_rl=True)
if recommendation:
print(f"\nโœ… Recommendation for finance/tiktok/trap:")
print(f" ๐Ÿ“ˆ Predicted Viral Score: {recommendation.predicted_viral_score:.2f}/10")
print(f" ๐ŸŽฏ Confidence: {recommendation.confidence:.1%}")
print(f" ๐Ÿ—ฃ๏ธ Pace: {recommendation.pace_wpm:.0f} WPM")
print(f" ๐ŸŽต Pitch: {recommendation.pitch_base:.0f} Hz ยฑ {recommendation.pitch_variance:.0f}")
print(f" โšก Energy: {recommendation.energy_level:.2f}")
print(f" ๐ŸŽฃ Hook Placements: {[f'{h:.1f}s' for h in recommendation.hook_placements[:3]]}")
print(f" โธ๏ธ Pause Placements: {[(f'{p:.1f}s', f'{d:.2f}s') for p, d in recommendation.pause_placements[:2]]}")
# === TEST 2: Predict viral success ===
print("\n" + "=" * 80)
print("๐Ÿ”ฎ TEST 2: Predict Viral Success for New Audio")
print("=" * 80)
test_audio = sample_audio_viral
viral_score, confidence, breakdown = learner.predict_viral_success(test_audio, return_confidence=True)
print(f"\nโœ… Prediction Results:")
print(f" ๐Ÿ“Š Viral Score: {viral_score:.2f}/10 (Target: 5+ for 5M+ views)")
print(f" ๐ŸŽฏ Confidence: {confidence:.1%} ({breakdown['confidence_level']})")
print(f" ๐Ÿ‘๏ธ Predicted Views: {breakdown['predicted_views']:,.0f}")
print(f" โฑ๏ธ Predicted Retention: {breakdown['predicted_retention']:.1%}")
print(f" ๐Ÿš€ Predicted Velocity: {breakdown['predicted_velocity']:.0f}/hour")
print(f" ๐Ÿ’ฌ Predicted Engagement: {breakdown['predicted_engagement']:.4f}")
print(f" ๐Ÿค– Model Used: {breakdown['model_used']}")
print(f" ๐Ÿ“‹ Similar Patterns: {breakdown['similar_patterns_found']}")
# === TEST 3: Get optimization suggestions ===
print("\n" + "=" * 80)
print("๐Ÿ’ก TEST 3: Get Optimization Suggestions")
print("=" * 80)
suggestions = learner.get_optimization_suggestions(sample_audio_low)
print(f"\n๐Ÿ“‰ Current Score: {suggestions['current_score']:.2f}/10")
print(f"๐Ÿ“ˆ Potential Score: {suggestions['potential_score']:.2f}/10 (with improvements)")
print(f"๐ŸŽฏ Confidence: {suggestions['confidence']:.1%}")
if suggestions['improvements']:
print(f"\n๐Ÿ”ง Recommended Improvements:")
for imp in suggestions['improvements']:
print(f" โ€ข {imp['parameter'].upper()}: {imp['current']:.1f} โ†’ {imp['recommended']:.1f}")
print(f" Impact: {imp['impact']} | {imp['reason']}")
if suggestions['warnings']:
print(f"\nโš ๏ธ Warnings:")
for warn in suggestions['warnings']:
print(f" โ€ข [{warn['severity'].upper()}] {warn['message']}")
if suggestions['similar_successful']:
print(f"\n๐ŸŽฏ Similar Successful Patterns:")
for sim in suggestions['similar_successful']:
print(f" โ€ข Pattern {sim['pattern_id']}: similarity={sim['similarity']:.2f}, efficacy={sim['efficacy']:.2f}")
# === TEST 4: Top patterns ===
print("\n" + "=" * 80)
print("๐Ÿ† TEST 4: Top Viral Patterns")
print("=" * 80)
top_patterns = learner.get_top_patterns(n=5, niche='finance')
print(f"\n๐Ÿ“Š Top 5 Patterns for Finance:")
for i, pattern in enumerate(top_patterns, 1):
print(f" {i}. {pattern.pattern_id}")
print(f" Efficacy: {pattern.viral_efficacy_score:.3f} | Samples: {pattern.sample_count}")
print(f" Pace: {pattern.optimal_pace:.0f} WPM | Pitch: {pattern.optimal_pitch_range}")
print(f" Weight: {pattern.weight:.2f} | Confidence: {pattern.confidence:.1%}")
# === TEST 5: Feature importance ===
print("\n" + "=" * 80)
print("๐Ÿ“Š TEST 5: Feature Importance Analysis")
print("=" * 80)
importance = learner.get_feature_importance()
print(f"\n๐ŸŽฏ Top 10 Features Driving Virality:")
for i, (feat, imp) in enumerate(list(importance.items())[:10], 1):
print(f" {i}. {feat}: {imp:.4f}")
# === TEST 6: Model evaluation ===
print("\n" + "=" * 80)
print("๐Ÿ“ˆ TEST 6: Model Performance Metrics")
print("=" * 80)
metrics = learner.virality_predictor.get_evaluation_metrics()
if metrics:
print(f"\nโœ… Model Performance:")
print(f" Views RMSE: {metrics['views_rmse']:,.0f}")
print(f" Views MAE: {metrics['views_mae']:,.0f}")
print(f" Views Rยฒ: {metrics['views_r2']:.3f}")
print(f" Retention RMSE: {metrics['retention_rmse']:.3f}")
print(f" Retention Rยฒ: {metrics['retention_r2']:.3f}")
# === Save state ===
learner.save_state()
print("\n" + "=" * 80)
print("โœ… ALL TESTS COMPLETE - Pattern Learner Ready for Production")
print("=" * 80)
print("\n๐Ÿš€ Integration Ready:")
print(" โ€ข Pull data from: audio_performance_store.py")
print(" โ€ข Push recommendations to: tts_engine.py, voice_sync.py")
print(" โ€ข Log anomalies to: audio_anomaly_logger.py")
print(" โ€ข Schedule continuous learning every 6 hours")
print("\n๐Ÿ’ก Key Features:")
print(" โœ“ Multi-target ML prediction (views, retention, velocity, engagement)")
print(" โœ“ Stratified learning per platform/niche")
print(" โœ“ RL policy for continuous optimization")
print(" โœ“ Embedding-based similarity search")
print(" โœ“ Confidence scoring & uncertainty quantification")
print(" โœ“ Real-time API for TTS/voice-sync")
print(" โœ“ Continuous pattern decay & reinforcement")
print(" โœ“ Anomaly detection for novel strategies")
print(" โœ“ Cross-platform normalization")
print("\n๐ŸŽฏ Target: Consistently predict and generate 5M+ view patterns")
print("=" * 80)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment