Skip to content

Instantly share code, notes, and snippets.

@bogged-broker
Created December 30, 2025 18:08
Show Gist options
  • Select an option

  • Save bogged-broker/c42662fd0e787b8c6f20291a979c58aa to your computer and use it in GitHub Desktop.

Select an option

Save bogged-broker/c42662fd0e787b8c6f20291a979c58aa to your computer and use it in GitHub Desktop.
import sqlite3
import json
import numpy as np
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, asdict, field
from datetime import datetime, timedelta
from collections import defaultdict
import pickle
import threading
from contextlib import contextmanager
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
@dataclass
class AudioFeatures:
"""Comprehensive audio feature representation"""
# Temporal features
words_per_sec: float
syllable_density: float # syllables per second
avg_word_duration: float # seconds
# Pitch characteristics
pitch_mean: float # Hz
pitch_variance: float
pitch_range: Tuple[float, float] # (min, max) Hz
pitch_contour: List[float] # time-series of pitch values
pitch_jump_count: int # number of significant pitch changes
pitch_jump_magnitude: float # average jump size in Hz
# Energy/Intensity
energy_mean: float # dB
energy_variance: float
energy_curve: List[float] # time-series
dynamic_range: float # dB difference between loudest and quietest
# Beat alignment
beat_alignment_error_ms: float # avg deviation from beat
beat_alignment_variance: float
syllable_beat_match_score: float # 0-1, accuracy of syllable-to-beat timing
off_beat_ratio: float # percentage of syllables off-beat
# Hook characteristics
hook_emphasis_amplitude: float # dB increase at hook
hook_emphasis_pitch: float # Hz jump at hook
hook_timing_sec: float # when hook occurs
hook_duration_sec: float
# Pause analysis
pause_count: int
avg_pause_duration: float # seconds
pause_timing_pattern: List[float] # timestamps of pauses
pause_variance: float
strategic_pause_score: float # 0-1, effectiveness of pause placement
# Emotion & Expression
emotion_intensity_mean: float # 0-1
emotion_intensity_variance: float
emotion_per_word: List[float] # intensity scores per word
emotion_per_phoneme: List[float] # fine-grained emotion tracking
emotion_trajectory: str # e.g., "building", "steady", "declining"
# Spectral analysis
formant_f1_mean: float # Hz
formant_f2_mean: float # Hz
formant_f3_mean: float # Hz
frequency_bands: Dict[str, float] # energy in different bands
spectral_centroid: float # Hz, brightness measure
spectral_rolloff: float # Hz
zero_crossing_rate: float
harmonic_ratio: float # harmonics vs noise
timbre_signature: List[float] # MFCC-style features
# Voice characteristics
voice_style: str # "energetic", "calm", "dramatic", etc.
voice_tone: str # "warm", "crisp", "breathy", etc.
accent: str
language: str
speaking_register: str # "chest", "head", "mixed"
# Phoneme-level timing
phoneme_durations: List[Tuple[str, float]] # (phoneme, duration_ms)
phoneme_timing_accuracy: float # 0-1
# Music integration
music_track_id: Optional[str]
music_genre: str
music_tempo_bpm: float
music_key: str
music_energy: float # 0-1
trending_beat_signature: str # identifier for viral beat pattern
beat_drop_timing: float # seconds
music_vocal_balance: float # -1 to 1, negative = music louder
# Cross-beat correlation
beat_trend_match_score: float # 0-1, how well it matches trending patterns
popular_beat_correlation: float # correlation with top performers
beat_innovation_score: float # 0-1, uniqueness within trend
# Lip-sync (for avatar/character content)
lipsync_accuracy: Optional[float] # 0-1, if applicable
visual_audio_sync_error_ms: Optional[float]
# Metadata
total_duration_sec: float
feature_extraction_timestamp: str
feature_version: str # for tracking algorithm updates
@dataclass
class PerformanceMetrics:
"""Comprehensive performance tracking"""
# Core engagement
retention_2s: float # 0-1
completion_rate: float # 0-1
replay_rate: float # 0-1
avg_watch_time_sec: float
# Social engagement
likes: int
comments: int
shares: int
saves: int
profile_visits: int
# Platform-specific ratios
like_view_ratio: float
comment_view_ratio: float
share_view_ratio: float
save_view_ratio: float
# Retention analysis
retention_curve: List[float] # retention at each second
drop_off_points: List[float] # timestamps where viewers leave
rewind_points: List[float] # timestamps where viewers rewind
watch_loop_count: int # how many times avg viewer rewatches
# Viral indicators
views_24h: int
views_48h: int
views_7d: int
views_total: int
velocity_score: float # views per hour in first 24h
# Trend signals
trend_growth_rate: float # % growth rate
trend_decay_rate: float # % decay rate
peak_views_hour: int # which hour had peak views
longevity_score: float # 0-1, sustained performance
# Audience behavior
avg_rewatch_count: float
scroll_back_rate: float # how often viewers scroll back to rewatch
screenshot_rate: float # platform-specific
# Comparative metrics
percentile_rank: float # 0-100, vs similar content
category_rank: int
platform_rank: int
# Time-based performance
performance_by_hour: Dict[int, int] # views by hour of day
performance_by_day: Dict[str, int] # views by day of week
# Update tracking
last_updated: str
metrics_version: str
@dataclass
class VideoRecord:
"""Complete video record with audio features and performance"""
video_id: str
audio_features: AudioFeatures
performance_metrics: PerformanceMetrics
# Metadata & Tagging
platform: str # "tiktok", "youtube_shorts", "instagram_reels"
niche: str # "finance", "fitness", "education", etc.
vertical: str # sub-category within niche
beat_type: str # "drill", "trap", "pop", "electronic", etc.
tempo_class: str # "slow", "medium", "fast", "variable"
# Trend tracking
upload_timestamp: str
trend_timestamp: str # when it started trending (if applicable)
trend_status: str # "rising", "peaked", "declining", "stable"
# Campaign & Testing
campaign_id: Optional[str]
ab_test_group: Optional[str]
content_variation: Optional[str]
# Cross-video learning
similar_video_ids: List[str] # IDs of similar performing videos
anomaly_flag: bool # true if performance was unexpected
confidence_score: float # 0-1, confidence in feature extraction
# Feature embeddings (for similarity search)
audio_embedding: Optional[List[float]] # high-dimensional representation
performance_embedding: Optional[List[float]]
# Failure logging
failure_reasons: List[str] # if underperformed
success_factors: List[str] # if overperformed
# Record metadata
created_at: str
updated_at: str
record_version: int
class AudioPerformanceStore:
"""
Production-grade storage system for audio features and performance metrics.
Designed for 20k+ videos/day ingestion with real-time query capabilities.
"""
def __init__(self, db_path: str = "audio_performance.db", cache_size: int = 10000):
self.db_path = Path(db_path)
self.cache_size = cache_size
self._local = threading.local()
self._cache = {}
self._cache_lock = threading.Lock()
self._init_database()
logger.info(f"AudioPerformanceStore initialized at {db_path}")
def _get_connection(self):
"""Thread-safe connection management"""
if not hasattr(self._local, 'conn'):
self._local.conn = sqlite3.connect(
str(self.db_path),
check_same_thread=False,
timeout=30.0
)
self._local.conn.row_factory = sqlite3.Row
# Performance optimizations
self._local.conn.execute("PRAGMA journal_mode=WAL")
self._local.conn.execute("PRAGMA synchronous=NORMAL")
self._local.conn.execute("PRAGMA cache_size=-64000") # 64MB cache
self._local.conn.execute("PRAGMA temp_store=MEMORY")
return self._local.conn
@contextmanager
def _transaction(self):
"""Context manager for transactions"""
conn = self._get_connection()
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"Transaction failed: {e}")
raise
def _init_database(self):
"""Initialize database schema with comprehensive indexing"""
conn = self._get_connection()
# Main videos table
conn.execute("""
CREATE TABLE IF NOT EXISTS videos (
video_id TEXT PRIMARY KEY,
platform TEXT NOT NULL,
niche TEXT NOT NULL,
vertical TEXT,
beat_type TEXT,
tempo_class TEXT,
upload_timestamp TEXT NOT NULL,
trend_timestamp TEXT,
trend_status TEXT,
campaign_id TEXT,
ab_test_group TEXT,
content_variation TEXT,
anomaly_flag INTEGER DEFAULT 0,
confidence_score REAL DEFAULT 1.0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
record_version INTEGER DEFAULT 1
)
""")
# Audio features table
conn.execute("""
CREATE TABLE IF NOT EXISTS audio_features (
video_id TEXT PRIMARY KEY,
words_per_sec REAL,
syllable_density REAL,
avg_word_duration REAL,
pitch_mean REAL,
pitch_variance REAL,
pitch_range_min REAL,
pitch_range_max REAL,
pitch_contour BLOB,
pitch_jump_count INTEGER,
pitch_jump_magnitude REAL,
energy_mean REAL,
energy_variance REAL,
energy_curve BLOB,
dynamic_range REAL,
beat_alignment_error_ms REAL,
beat_alignment_variance REAL,
syllable_beat_match_score REAL,
off_beat_ratio REAL,
hook_emphasis_amplitude REAL,
hook_emphasis_pitch REAL,
hook_timing_sec REAL,
hook_duration_sec REAL,
pause_count INTEGER,
avg_pause_duration REAL,
pause_timing_pattern BLOB,
pause_variance REAL,
strategic_pause_score REAL,
emotion_intensity_mean REAL,
emotion_intensity_variance REAL,
emotion_per_word BLOB,
emotion_per_phoneme BLOB,
emotion_trajectory TEXT,
formant_f1_mean REAL,
formant_f2_mean REAL,
formant_f3_mean REAL,
frequency_bands BLOB,
spectral_centroid REAL,
spectral_rolloff REAL,
zero_crossing_rate REAL,
harmonic_ratio REAL,
timbre_signature BLOB,
voice_style TEXT,
voice_tone TEXT,
accent TEXT,
language TEXT,
speaking_register TEXT,
phoneme_durations BLOB,
phoneme_timing_accuracy REAL,
music_track_id TEXT,
music_genre TEXT,
music_tempo_bpm REAL,
music_key TEXT,
music_energy REAL,
trending_beat_signature TEXT,
beat_drop_timing REAL,
music_vocal_balance REAL,
beat_trend_match_score REAL,
popular_beat_correlation REAL,
beat_innovation_score REAL,
lipsync_accuracy REAL,
visual_audio_sync_error_ms REAL,
total_duration_sec REAL,
feature_extraction_timestamp TEXT,
feature_version TEXT,
audio_embedding BLOB,
FOREIGN KEY (video_id) REFERENCES videos(video_id)
)
""")
# Performance metrics table
conn.execute("""
CREATE TABLE IF NOT EXISTS performance_metrics (
video_id TEXT PRIMARY KEY,
retention_2s REAL,
completion_rate REAL,
replay_rate REAL,
avg_watch_time_sec REAL,
likes INTEGER,
comments INTEGER,
shares INTEGER,
saves INTEGER,
profile_visits INTEGER,
like_view_ratio REAL,
comment_view_ratio REAL,
share_view_ratio REAL,
save_view_ratio REAL,
retention_curve BLOB,
drop_off_points BLOB,
rewind_points BLOB,
watch_loop_count INTEGER,
views_24h INTEGER,
views_48h INTEGER,
views_7d INTEGER,
views_total INTEGER,
velocity_score REAL,
trend_growth_rate REAL,
trend_decay_rate REAL,
peak_views_hour INTEGER,
longevity_score REAL,
avg_rewatch_count REAL,
scroll_back_rate REAL,
screenshot_rate REAL,
percentile_rank REAL,
category_rank INTEGER,
platform_rank INTEGER,
performance_by_hour BLOB,
performance_by_day BLOB,
last_updated TEXT,
metrics_version TEXT,
performance_embedding BLOB,
FOREIGN KEY (video_id) REFERENCES videos(video_id)
)
""")
# Similar videos mapping
conn.execute("""
CREATE TABLE IF NOT EXISTS similar_videos (
video_id TEXT,
similar_video_id TEXT,
similarity_score REAL,
PRIMARY KEY (video_id, similar_video_id)
)
""")
# Failure/success factors
conn.execute("""
CREATE TABLE IF NOT EXISTS performance_factors (
video_id TEXT,
factor_type TEXT, -- 'failure' or 'success'
factor TEXT,
PRIMARY KEY (video_id, factor_type, factor)
)
""")
# Create comprehensive indexes for fast queries
indexes = [
"CREATE INDEX IF NOT EXISTS idx_platform ON videos(platform)",
"CREATE INDEX IF NOT EXISTS idx_niche ON videos(niche)",
"CREATE INDEX IF NOT EXISTS idx_beat_type ON videos(beat_type)",
"CREATE INDEX IF NOT EXISTS idx_upload_timestamp ON videos(upload_timestamp)",
"CREATE INDEX IF NOT EXISTS idx_trend_status ON videos(trend_status)",
"CREATE INDEX IF NOT EXISTS idx_campaign ON videos(campaign_id)",
"CREATE INDEX IF NOT EXISTS idx_anomaly ON videos(anomaly_flag)",
"CREATE INDEX IF NOT EXISTS idx_retention_2s ON performance_metrics(retention_2s)",
"CREATE INDEX IF NOT EXISTS idx_completion_rate ON performance_metrics(completion_rate)",
"CREATE INDEX IF NOT EXISTS idx_views_total ON performance_metrics(views_total)",
"CREATE INDEX IF NOT EXISTS idx_velocity ON performance_metrics(velocity_score)",
"CREATE INDEX IF NOT EXISTS idx_percentile ON performance_metrics(percentile_rank)",
"CREATE INDEX IF NOT EXISTS idx_beat_alignment ON audio_features(beat_alignment_error_ms)",
"CREATE INDEX IF NOT EXISTS idx_beat_match ON audio_features(syllable_beat_match_score)",
"CREATE INDEX IF NOT EXISTS idx_beat_trend ON audio_features(beat_trend_match_score)",
"CREATE INDEX IF NOT EXISTS idx_music_track ON audio_features(music_track_id)",
"CREATE INDEX IF NOT EXISTS idx_trending_beat ON audio_features(trending_beat_signature)",
]
for idx_sql in indexes:
conn.execute(idx_sql)
conn.commit()
logger.info("Database schema initialized with comprehensive indexing")
def add_video(self, record: VideoRecord) -> bool:
"""
Add a new video record to the store.
Supports high-throughput ingestion (20k+ videos/day).
"""
try:
with self._transaction() as conn:
# Insert main video record
conn.execute("""
INSERT OR REPLACE INTO videos VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
""", (
record.video_id,
record.platform,
record.niche,
record.vertical,
record.beat_type,
record.tempo_class,
record.upload_timestamp,
record.trend_timestamp,
record.trend_status,
record.campaign_id,
record.ab_test_group,
record.content_variation,
int(record.anomaly_flag),
record.confidence_score,
record.created_at,
record.updated_at,
record.record_version
))
# Insert audio features
af = record.audio_features
conn.execute("""
INSERT OR REPLACE INTO audio_features VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?
)
""", (
record.video_id,
af.words_per_sec,
af.syllable_density,
af.avg_word_duration,
af.pitch_mean,
af.pitch_variance,
af.pitch_range[0],
af.pitch_range[1],
pickle.dumps(af.pitch_contour),
af.pitch_jump_count,
af.pitch_jump_magnitude,
af.energy_mean,
af.energy_variance,
pickle.dumps(af.energy_curve),
af.dynamic_range,
af.beat_alignment_error_ms,
af.beat_alignment_variance,
af.syllable_beat_match_score,
af.off_beat_ratio,
af.hook_emphasis_amplitude,
af.hook_emphasis_pitch,
af.hook_timing_sec,
af.hook_duration_sec,
af.pause_count,
af.avg_pause_duration,
pickle.dumps(af.pause_timing_pattern),
af.pause_variance,
af.strategic_pause_score,
af.emotion_intensity_mean,
af.emotion_intensity_variance,
pickle.dumps(af.emotion_per_word),
pickle.dumps(af.emotion_per_phoneme),
af.emotion_trajectory,
af.formant_f1_mean,
af.formant_f2_mean,
af.formant_f3_mean,
pickle.dumps(af.frequency_bands),
af.spectral_centroid,
af.spectral_rolloff,
af.zero_crossing_rate,
af.harmonic_ratio,
pickle.dumps(af.timbre_signature),
af.voice_style,
af.voice_tone,
af.accent,
af.language,
af.speaking_register,
pickle.dumps(af.phoneme_durations),
af.phoneme_timing_accuracy,
af.music_track_id,
af.music_genre,
af.music_tempo_bpm,
af.music_key,
af.music_energy,
af.trending_beat_signature,
af.beat_drop_timing,
af.music_vocal_balance,
af.beat_trend_match_score,
af.popular_beat_correlation,
af.beat_innovation_score,
af.lipsync_accuracy,
af.visual_audio_sync_error_ms,
af.total_duration_sec,
af.feature_extraction_timestamp,
af.feature_version,
pickle.dumps(record.audio_embedding) if record.audio_embedding else None
))
# Insert performance metrics
pm = record.performance_metrics
conn.execute("""
INSERT OR REPLACE INTO performance_metrics VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?
)
""", (
record.video_id,
pm.retention_2s,
pm.completion_rate,
pm.replay_rate,
pm.avg_watch_time_sec,
pm.likes,
pm.comments,
pm.shares,
pm.saves,
pm.profile_visits,
pm.like_view_ratio,
pm.comment_view_ratio,
pm.share_view_ratio,
pm.save_view_ratio,
pickle.dumps(pm.retention_curve),
pickle.dumps(pm.drop_off_points),
pickle.dumps(pm.rewind_points),
pm.watch_loop_count,
pm.views_24h,
pm.views_48h,
pm.views_7d,
pm.views_total,
pm.velocity_score,
pm.trend_growth_rate,
pm.trend_decay_rate,
pm.peak_views_hour,
pm.longevity_score,
pm.avg_rewatch_count,
pm.scroll_back_rate,
pm.screenshot_rate,
pm.percentile_rank,
pm.category_rank,
pm.platform_rank,
pickle.dumps(pm.performance_by_hour),
pickle.dumps(pm.performance_by_day),
pm.last_updated,
pm.metrics_version,
pickle.dumps(record.performance_embedding) if record.performance_embedding else None
))
# Insert similar videos
for sim_id in record.similar_video_ids:
conn.execute("""
INSERT OR REPLACE INTO similar_videos VALUES (?, ?, ?)
""", (record.video_id, sim_id, 0.0)) # similarity score calculated separately
# Insert performance factors
for failure in record.failure_reasons:
conn.execute("""
INSERT OR REPLACE INTO performance_factors VALUES (?, 'failure', ?)
""", (record.video_id, failure))
for success in record.success_factors:
conn.execute("""
INSERT OR REPLACE INTO performance_factors VALUES (?, 'success', ?)
""", (record.video_id, success))
# Invalidate cache
with self._cache_lock:
self._cache.pop(record.video_id, None)
logger.info(f"Added video record: {record.video_id}")
return True
except Exception as e:
logger.error(f"Failed to add video {record.video_id}: {e}")
return False
def update_performance(self, video_id: str, metrics: PerformanceMetrics) -> bool:
"""Update performance metrics for existing video"""
try:
with self._transaction() as conn:
pm = metrics
conn.execute("""
UPDATE performance_metrics SET
retention_2s = ?, completion_rate = ?, replay_rate = ?,
avg_watch_time_sec = ?, likes = ?, comments = ?, shares = ?,
saves = ?, profile_visits = ?, like_view_ratio = ?,
comment_view_ratio = ?, share_view_ratio = ?, save_view_ratio = ?,
retention_curve = ?, drop_off_points = ?, rewind_points = ?,
watch_loop_count = ?, views_24h = ?, views_48h = ?, views_7d = ?,
views_total = ?, velocity_score = ?, trend_growth_rate = ?,
trend_decay_rate = ?, peak_views_hour = ?, longevity_score = ?,
avg_rewatch_count = ?, scroll_back_rate = ?, screenshot_rate = ?,
percentile_rank = ?, category_rank = ?, platform_rank = ?,
performance_by_hour = ?, performance_by_day = ?,
last_updated = ?, metrics_version = ?
WHERE video_id = ?
""", (
pm.retention_2s, pm.completion_rate, pm.replay_rate,
pm.avg_watch_time_sec, pm.likes, pm.comments, pm.shares,
pm.saves, pm.profile_visits, pm.like_view_ratio,
pm.comment_view_ratio, pm.share_view_ratio, pm.save_view_ratio,
pickle.dumps(pm.retention_curve), pickle.dumps(pm.drop_off_points),
pickle.dumps(pm.rewind_points), pm.watch_loop_count,
pm.views_24h, pm.views_48h, pm.views_7d, pm.views_total,
pm.velocity_score, pm.trend_growth_rate, pm.trend_decay_rate,
pm.peak_views_hour, pm.longevity_score, pm.avg_rewatch_count,
pm.scroll_back_rate, pm.screenshot_rate, pm.percentile_rank,
pm.category_rank, pm.platform_rank,
pickle.dumps(pm.performance_by_hour),
pickle.dumps(pm.performance_by_day),
pm.last_updated, pm.metrics_version, video_id
))
# Update video timestamp
conn.execute("""
UPDATE videos SET updated_at = ?, record_version = record_version + 1
WHERE video_id = ?
""", (datetime.now().isoformat(), video_id))
with self._cache_lock:
self._cache.pop(video_id, None)
logger.info(f"Updated performance for: {video_id}")
return True
except Exception as e:
logger.error(f"Failed to update performance for {video_id}: {e}")
return False
def get_video(self, video_id: str) -> Optional[VideoRecord]:
"""Retrieve complete video record"""
# Check cache
with self._cache_lock:
if video_id in self._cache:
return self._cache[video_id]
try:
conn = self._get_connection()
# Get main record
video_row = conn.execute(
"SELECT * FROM videos WHERE video_id = ?", (video_id,)
).fetchone()
if not video_row:
return None
# Get audio features
audio_row = conn.execute(
"SELECT * FROM audio_features WHERE video_id = ?", (video_id,)
).fetchone()
# Get performance metrics
perf_row = conn.execute(
"SELECT * FROM performance_metrics WHERE video_id = ?", (video_id,)
).fetchone()
# Get similar videos
similar = conn.execute(
"SELECT similar_video_id FROM similar_videos WHERE video_id = ?",
(video_id,)
).fetchall()
similar_ids = [row[0] for row in similar]
# Get factors
failures = conn.execute(
"SELECT factor FROM performance_factors WHERE video_id = ? AND factor_type = 'failure'",
(video_id,)
).fetchall()
successes = conn.execute(
"SELECT factor FROM performance_factors WHERE video_id = ? AND factor_type = 'success'",
(video_id,)
).fetchall()
# Reconstruct record
record = self._reconstruct_record(
video_row, audio_row, perf_row, similar_ids,
[f[0] for f in failures], [s[0] for s in successes]
)
# Cache it
with self._cache_lock:
if len(self._cache) >= self.cache_size:
self._cache.pop(next(iter(self._cache)))
self._cache[video_id] = record
return record
except Exception as e:
logger.error(f"Failed to get video {video_id}: {e}")
return None
def _reconstruct_record(self, video_row, audio_row, perf_row,
similar_ids, failures, successes) -> VideoRecord:
"""Reconstruct VideoRecord from database rows"""
# Reconstruct AudioFeatures
audio_features = AudioFeatures(
words_per_sec=audio_row['words_per_sec'],
syllable_density=audio_row['syllable_density'],
avg_word_duration=audio_row['avg_word_duration'],
pitch_mean=audio_row['pitch_mean'],
pitch_variance=audio_row['pitch_variance'],
pitch_range=(audio_row['pitch_range_min'], audio_row['pitch_range_max']),
pitch_contour=pickle.loads(audio_row['pitch_contour']),
pitch_jump_count=audio_row['pitch_jump_count'],
pitch_jump_magnitude=audio_row['pitch_jump_magnitude'],
energy_mean=audio_row['energy_mean'],
energy_variance=audio_row['energy_variance'],
energy_curve=pickle.loads(audio_row['energy_curve']),
dynamic_range=audio_row['dynamic_range'],
beat_alignment_error_ms=audio_row['beat_alignment_error_ms'],
beat_alignment_variance=audio_row['beat_alignment_variance'],
syllable_beat_match
_score=audio_row['syllable_beat_match_score'],
off_beat_ratio=audio_row['off_beat_ratio'],
hook_emphasis_amplitude=audio_row['hook_emphasis_amplitude'],
hook_emphasis_pitch=audio_row['hook_emphasis_pitch'],
hook_timing_sec=audio_row['hook_timing_sec'],
hook_duration_sec=audio_row['hook_duration_sec'],
pause_count=audio_row['pause_count'],
avg_pause_duration=audio_row['avg_pause_duration'],
pause_timing_pattern=pickle.loads(audio_row['pause_timing_pattern']),
pause_variance=audio_row['pause_variance'],
strategic_pause_score=audio_row['strategic_pause_score'],
emotion_intensity_mean=audio_row['emotion_intensity_mean'],
emotion_intensity_variance=audio_row['emotion_intensity_variance'],
emotion_per_word=pickle.loads(audio_row['emotion_per_word']),
emotion_per_phoneme=pickle.loads(audio_row['emotion_per_phoneme']),
emotion_trajectory=audio_row['emotion_trajectory'],
formant_f1_mean=audio_row['formant_f1_mean'],
formant_f2_mean=audio_row['formant_f2_mean'],
formant_f3_mean=audio_row['formant_f3_mean'],
frequency_bands=pickle.loads(audio_row['frequency_bands']),
spectral_centroid=audio_row['spectral_centroid'],
spectral_rolloff=audio_row['spectral_rolloff'],
zero_crossing_rate=audio_row['zero_crossing_rate'],
harmonic_ratio=audio_row['harmonic_ratio'],
timbre_signature=pickle.loads(audio_row['timbre_signature']),
voice_style=audio_row['voice_style'],
voice_tone=audio_row['voice_tone'],
accent=audio_row['accent'],
language=audio_row['language'],
speaking_register=audio_row['speaking_register'],
phoneme_durations=pickle.loads(audio_row['phoneme_durations']),
phoneme_timing_accuracy=audio_row['phoneme_timing_accuracy'],
music_track_id=audio_row['music_track_id'],
music_genre=audio_row['music_genre'],
music_tempo_bpm=audio_row['music_tempo_bpm'],
music_key=audio_row['music_key'],
music_energy=audio_row['music_energy'],
trending_beat_signature=audio_row['trending_beat_signature'],
beat_drop_timing=audio_row['beat_drop_timing'],
music_vocal_balance=audio_row['music_vocal_balance'],
beat_trend_match_score=audio_row['beat_trend_match_score'],
popular_beat_correlation=audio_row['popular_beat_correlation'],
beat_innovation_score=audio_row['beat_innovation_score'],
lipsync_accuracy=audio_row['lipsync_accuracy'],
visual_audio_sync_error_ms=audio_row['visual_audio_sync_error_ms'],
total_duration_sec=audio_row['total_duration_sec'],
feature_extraction_timestamp=audio_row['feature_extraction_timestamp'],
feature_version=audio_row['feature_version']
)
# Reconstruct PerformanceMetrics
performance_metrics = PerformanceMetrics(
retention_2s=perf_row['retention_2s'],
completion_rate=perf_row['completion_rate'],
replay_rate=perf_row['replay_rate'],
avg_watch_time_sec=perf_row['avg_watch_time_sec'],
likes=perf_row['likes'],
comments=perf_row['comments'],
shares=perf_row['shares'],
saves=perf_row['saves'],
profile_visits=perf_row['profile_visits'],
like_view_ratio=perf_row['like_view_ratio'],
comment_view_ratio=perf_row['comment_view_ratio'],
share_view_ratio=perf_row['share_view_ratio'],
save_view_ratio=perf_row['save_view_ratio'],
retention_curve=pickle.loads(perf_row['retention_curve']),
drop_off_points=pickle.loads(perf_row['drop_off_points']),
rewind_points=pickle.loads(perf_row['rewind_points']),
watch_loop_count=perf_row['watch_loop_count'],
views_24h=perf_row['views_24h'],
views_48h=perf_row['views_48h'],
views_7d=perf_row['views_7d'],
views_total=perf_row['views_total'],
velocity_score=perf_row['velocity_score'],
trend_growth_rate=perf_row['trend_growth_rate'],
trend_decay_rate=perf_row['trend_decay_rate'],
peak_views_hour=perf_row['peak_views_hour'],
longevity_score=perf_row['longevity_score'],
avg_rewatch_count=perf_row['avg_rewatch_count'],
scroll_back_rate=perf_row['scroll_back_rate'],
screenshot_rate=perf_row['screenshot_rate'],
percentile_rank=perf_row['percentile_rank'],
category_rank=perf_row['category_rank'],
platform_rank=perf_row['platform_rank'],
performance_by_hour=pickle.loads(perf_row['performance_by_hour']),
performance_by_day=pickle.loads(perf_row['performance_by_day']),
last_updated=perf_row['last_updated'],
metrics_version=perf_row['metrics_version']
)
# Reconstruct VideoRecord
return VideoRecord(
video_id=video_row['video_id'],
audio_features=audio_features,
performance_metrics=performance_metrics,
platform=video_row['platform'],
niche=video_row['niche'],
vertical=video_row['vertical'],
beat_type=video_row['beat_type'],
tempo_class=video_row['tempo_class'],
upload_timestamp=video_row['upload_timestamp'],
trend_timestamp=video_row['trend_timestamp'],
trend_status=video_row['trend_status'],
campaign_id=video_row['campaign_id'],
ab_test_group=video_row['ab_test_group'],
content_variation=video_row['content_variation'],
similar_video_ids=similar_ids,
anomaly_flag=bool(video_row['anomaly_flag']),
confidence_score=video_row['confidence_score'],
audio_embedding=pickle.loads(audio_row['audio_embedding']) if audio_row['audio_embedding'] else None,
performance_embedding=pickle.loads(perf_row['performance_embedding']) if perf_row['performance_embedding'] else None,
failure_reasons=failures,
success_factors=successes,
created_at=video_row['created_at'],
updated_at=video_row['updated_at'],
record_version=video_row['record_version']
)
def get_top_performers(self,
metric: str = 'views_total',
limit: int = 100,
filters: Optional[Dict[str, Any]] = None) -> List[VideoRecord]:
"""
Get top performing videos based on specified metric.
Args:
metric: Performance metric to rank by
limit: Number of records to return
filters: Dictionary of filters (platform, niche, beat_type, etc.)
"""
try:
conn = self._get_connection()
# Build query
query = """
SELECT v.video_id FROM videos v
JOIN performance_metrics pm ON v.video_id = pm.video_id
"""
conditions = []
params = []
if filters:
if 'platform' in filters:
conditions.append("v.platform = ?")
params.append(filters['platform'])
if 'niche' in filters:
conditions.append("v.niche = ?")
params.append(filters['niche'])
if 'beat_type' in filters:
conditions.append("v.beat_type = ?")
params.append(filters['beat_type'])
if 'min_retention' in filters:
conditions.append("pm.retention_2s >= ?")
params.append(filters['min_retention'])
if 'days_ago' in filters:
cutoff = (datetime.now() - timedelta(days=filters['days_ago'])).isoformat()
conditions.append("v.upload_timestamp >= ?")
params.append(cutoff)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += f" ORDER BY pm.{metric} DESC LIMIT ?"
params.append(limit)
rows = conn.execute(query, params).fetchall()
video_ids = [row[0] for row in rows]
return [self.get_video(vid) for vid in video_ids if self.get_video(vid)]
except Exception as e:
logger.error(f"Failed to get top performers: {e}")
return []
def get_bottom_performers(self,
metric: str = 'views_total',
limit: int = 100,
filters: Optional[Dict[str, Any]] = None) -> List[VideoRecord]:
"""Get bottom performing videos (for failure analysis)"""
try:
conn = self._get_connection()
query = """
SELECT v.video_id FROM videos v
JOIN performance_metrics pm ON v.video_id = pm.video_id
"""
conditions = []
params = []
if filters:
if 'platform' in filters:
conditions.append("v.platform = ?")
params.append(filters['platform'])
if 'niche' in filters:
conditions.append("v.niche = ?")
params.append(filters['niche'])
if 'days_ago' in filters:
cutoff = (datetime.now() - timedelta(days=filters['days_ago'])).isoformat()
conditions.append("v.upload_timestamp >= ?")
params.append(cutoff)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += f" ORDER BY pm.{metric} ASC LIMIT ?"
params.append(limit)
rows = conn.execute(query, params).fetchall()
video_ids = [row[0] for row in rows]
return [self.get_video(vid) for vid in video_ids if self.get_video(vid)]
except Exception as e:
logger.error(f"Failed to get bottom performers: {e}")
return []
def get_recent_records(self, n: int = 1000) -> List[VideoRecord]:
"""Get N most recent records"""
try:
conn = self._get_connection()
rows = conn.execute("""
SELECT video_id FROM videos
ORDER BY upload_timestamp DESC LIMIT ?
""", (n,)).fetchall()
video_ids = [row[0] for row in rows]
return [self.get_video(vid) for vid in video_ids if self.get_video(vid)]
except Exception as e:
logger.error(f"Failed to get recent records: {e}")
return []
def get_similar_audio_profiles(self,
audio_features: AudioFeatures,
limit: int = 50,
threshold: float = 0.8) -> List[Tuple[VideoRecord, float]]:
"""
Find videos with similar audio profiles using feature similarity.
Returns list of (VideoRecord, similarity_score) tuples.
"""
try:
# Key features for similarity
query_vector = np.array([
audio_features.words_per_sec,
audio_features.syllable_density,
audio_features.pitch_mean,
audio_features.pitch_variance,
audio_features.energy_mean,
audio_features.beat_alignment_error_ms,
audio_features.syllable_beat_match_score,
audio_features.hook_emphasis_amplitude,
audio_features.pause_count,
audio_features.emotion_intensity_mean,
audio_features.spectral_centroid,
audio_features.harmonic_ratio,
audio_features.music_tempo_bpm,
audio_features.beat_trend_match_score,
audio_features.popular_beat_correlation
])
conn = self._get_connection()
rows = conn.execute("""
SELECT video_id, words_per_sec, syllable_density, pitch_mean,
pitch_variance, energy_mean, beat_alignment_error_ms,
syllable_beat_match_score, hook_emphasis_amplitude,
pause_count, emotion_intensity_mean, spectral_centroid,
harmonic_ratio, music_tempo_bpm, beat_trend_match_score,
popular_beat_correlation
FROM audio_features
""").fetchall()
similarities = []
for row in rows:
video_id = row[0]
feature_vector = np.array(row[1:])
# Cosine similarity
similarity = np.dot(query_vector, feature_vector) / (
np.linalg.norm(query_vector) * np.linalg.norm(feature_vector)
)
if similarity >= threshold:
similarities.append((video_id, similarity))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
similarities = similarities[:limit]
# Get full records
results = []
for video_id, sim_score in similarities:
record = self.get_video(video_id)
if record:
results.append((record, sim_score))
return results
except Exception as e:
logger.error(f"Failed to find similar audio profiles: {e}")
return []
def get_statistics(self, filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Get aggregate statistics across stored videos"""
try:
conn = self._get_connection()
where_clause = ""
params = []
if filters:
conditions = []
if 'platform' in filters:
conditions.append("v.platform = ?")
params.append(filters['platform'])
if 'niche' in filters:
conditions.append("v.niche = ?")
params.append(filters['niche'])
if conditions:
where_clause = "WHERE " + " AND ".join(conditions)
stats = conn.execute(f"""
SELECT
COUNT(*) as total_videos,
AVG(pm.retention_2s) as avg_retention,
AVG(pm.completion_rate) as avg_completion,
AVG(pm.views_total) as avg_views,
MAX(pm.views_total) as max_views,
AVG(af.beat_alignment_error_ms) as avg_beat_error,
AVG(af.syllable_beat_match_score) as avg_beat_match,
AVG(pm.velocity_score) as avg_velocity
FROM videos v
JOIN performance_metrics pm ON v.video_id = pm.video_id
JOIN audio_features af ON v.video_id = af.video_id
{where_clause}
""", params).fetchone()
return {
'total_videos': stats[0],
'avg_retention_2s': stats[1],
'avg_completion_rate': stats[2],
'avg_views': stats[3],
'max_views': stats[4],
'avg_beat_alignment_error_ms': stats[5],
'avg_beat_match_score': stats[6],
'avg_velocity_score': stats[7]
}
except Exception as e:
logger.error(f"Failed to get statistics: {e}")
return {}
def bulk_add(self, records: List[VideoRecord]) -> int:
"""Bulk insert for high-throughput scenarios. Returns count of successful inserts."""
success_count = 0
for record in records:
if self.add_video(record):
success_count += 1
logger.info(f"Bulk added {success_count}/{len(records)} videos")
return success_count
def close(self):
"""Close database connections"""
if hasattr(self._local, 'conn'):
self._local.conn.close()
logger.info("AudioPerformanceStore closed")
Convenience functions for RL integration
def get_top_audio_patterns(store: AudioPerformanceStore,
niche: str,
platform: str,
top_n: int = 20) -> List[Dict[str, Any]]:
"""Extract audio patterns from top performers for RL training"""
top_videos = store.get_top_performers(
metric='views_total',
limit=top_n,
filters={'niche': niche, 'platform': platform}
)
patterns = []
for video in top_videos:
af = video.audio_features
pm = video.performance_metrics
patterns.append({
'video_id': video.video_id,
'audio_features': {
'pace': af.words_per_sec,
'syllable_density': af.syllable_density,
'pitch_variance': af.pitch_variance,
'energy_mean': af.energy_mean,
'beat_alignment': af.beat_alignment_error_ms,
'beat_match_score': af.syllable_beat_match_score,
'hook_emphasis': af.hook_emphasis_amplitude,
'pause_pattern': af.pause_timing_pattern,
'emotion_intensity': af.emotion_intensity_mean,
'beat_trend_match': af.beat_trend_match_score
},
'performance': {
'retention_2s': pm.retention_2s,
'completion_rate': pm.completion_rate,
'views': pm.views_total,
'velocity': pm.velocity_score
}
})
return patterns
if name == "main":
# Example usage and testing
store = AudioPerformanceStore("test_audio_performance.db")
# Create sample record
sample_audio = AudioFeatures(
words_per_sec=3.2,
syllable_density=4.8,
avg_word_duration=0.31,
pitch_mean=220.0,
pitch_variance=150.0,
pitch_range=(180.0, 300.0),
pitch_contour=[200, 220, 240, 230, 210],
pitch_jump_count=8,
pitch_jump_magnitude=25.0,
energy_mean=-20.0,
energy_variance=5.0,
energy_curve=[-22, -20, -18, -20, -21],
dynamic_range=15.0,
beat_alignment_error_ms=12.5,
beat_alignment_variance=8.0,
syllable_beat_match_score=0.92,
off_beat_ratio=0.08,
hook_emphasis_amplitude=6.0,
hook_emphasis_pitch=40.0,
hook_timing_sec=3.5,
hook_duration_sec=2.0,
pause_count=4,
avg_pause_duration=0.3,
pause_timing_pattern=[2.0, 5.5, 8.0, 11.0],
pause_variance=0.15,
strategic_pause_score=0.88,
emotion_intensity_mean=0.75,
emotion_intensity_variance=0.12,
emotion_per_word=[0.7, 0.8, 0.9, 0.75, 0.7],
emotion_per_phoneme=[0.7] * 20,
emotion_trajectory="building",
formant_f1_mean=700.0,
formant_f2_mean=1220.0,
formant_f3_mean=2600.0,
frequency_bands={'low': 0.3, 'mid': 0.5, 'high': 0.2},
spectral_centroid=1500.0,
spectral_rolloff=3000.0,
zero_crossing_rate=0.15,
harmonic_ratio=0.85,
timbre_signature=[0.5, 0.3, 0.2, 0.15, 0.1],
voice_style="energetic",
voice_tone="warm",
accent="neutral",
language="en-US",
speaking_register="mixed",
phoneme_durations=[("AH", 80), ("T", 40)],
phoneme_timing_accuracy=0.94,
music_track_id="track_12345",
music_genre="trap",
music_tempo_bpm=140.0,
music_key="C#m",
music_energy=0.88,
trending_beat_signature="drill_2024_v3",
beat_drop_timing=4.0,
music_vocal_balance=0.2,
beat_trend_match_score=0.91,
popular_beat_correlation=0.87,
beat_innovation_score=0.65,
lipsync_accuracy=None,
visual_audio_sync_error_ms=None,
total_duration_sec=15.0,
feature_extraction_timestamp=datetime.now().isoformat(),
feature_version="v2.1"
)
sample_performance = PerformanceMetrics(
retention_2s=0.92,
completion_rate=0.78,
replay_rate=0.45,
avg_watch_time_sec=12.5,
likes=125000,
comments=8500,
shares=32000,
saves=18000,
profile_visits=5000,
like_view_ratio=0.025,
comment_view_ratio=0.0017,
share_view_ratio=0.0064,
save_view_ratio=0.0036,
retention_curve=[0.95, 0.92, 0.88, 0.85, 0.80, 0.78],
drop_off_points=[2.5, 8.0],
rewind_points=[4.0, 9.5],
watch_loop_count=2,
views_24h=500000,
views_48h=1200000,
views_7d=5000000,
views_total=8500000,
velocity_score=20833.0,
trend_growth_rate=1.4,
trend_decay_rate=0.15,
peak_views_hour=6,
longevity_score=0.82,
avg_rewatch_count=1.8,
scroll_back_rate=0.25,
screenshot_rate=0.05,
percentile_rank=95.5,
category_rank=12,
platform_rank=450,
performance_by_hour={i: 50000 + i*10000 for i in range(24)},
performance_by_day={'Mon': 700000, 'Tue': 800000, 'Wed': 1200000},
last_updated=datetime.now().isoformat(),
metrics_version="v1.5"
)
sample_record = VideoRecord(
video_id="test_video_001",
audio_features=sample_audio,
performance_metrics=sample_performance,
platform="tiktok",
niche="finance",
vertical="stock_trading",
beat_type="drill",
tempo_class="fast",
upload_timestamp=datetime.now().isoformat(),
trend_timestamp=(datetime.now() + timedelta(hours=6)).isoformat(),
trend_status="rising",
campaign_id="campaign_q4_2024",
ab_test_group="variant_a",
content_variation="hook_test_1",
similar_video_ids=["video_002", "video_003"],
anomaly_flag=False,
confidence_score=0.96,
audio_embedding=None,
performance_embedding=None,
failure_reasons=[],
success_factors=["strong_hook", "perfect_beat_timing", "trending_sound"],
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat(),
record_version=1
)
# Test operations
print("Testing AudioPerformanceStore...")
# Add video
success = store.add_video(sample_record)
print(f"Add video: {'✓' if success else '✗'}")
# Retrieve video
retrieved = store.get_video("test_video_001")
print(f"Retrieve video: {'✓' if retrieved else '✗'}")
# Get statistics
stats = store.get_statistics()
print(f"Statistics: {stats}")
# Get top performers (will be empty in this test)
top = store.get_top_performers(limit=10)
print(f"Top performers: {len(top)} videos")
store.close()
print("Test complete!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment