Created
June 24, 2025 06:21
-
-
Save SWORDIntel/de76c4e9b279628af083c3b005409bc6 to your computer and use it in GitHub Desktop.
Enhanced RAM-fucking toolkit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
enhanced_audio_toolkit.py | |
Advanced audio analysis toolkit for comprehensive batch processing of audio files. | |
Supports multiple formats, generates detailed statistics, visualizations, and reports. | |
Features: | |
- Multi-format support (MP3, WAV, FLAC, M4A, OGG, etc.) | |
- Advanced audio metrics and spectral analysis | |
- Multiple visualization types (waveform, spectrogram, frequency analysis) | |
- Batch processing with parallel execution | |
- Comprehensive reporting (JSON, CSV, HTML) | |
- Memory-efficient chunked processing | |
- Configurable analysis parameters | |
Author: Enhanced by ARCHITECT v6.2 | |
""" | |
import os | |
import sys | |
import subprocess | |
import logging | |
import json | |
import csv | |
import hashlib | |
import multiprocessing as mp | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
from pathlib import Path | |
from datetime import datetime | |
from typing import Dict, List, Optional, Tuple, Any | |
import warnings | |
warnings.filterwarnings("ignore") | |
# Dependency management with version pinning | |
REQUIRED_PACKAGES = [ | |
"pydub>=0.25.1", | |
"numpy>=1.21.0", | |
"matplotlib>=3.5.0", | |
"scipy>=1.7.0", | |
"librosa>=0.9.0", | |
"tqdm>=4.62.0", | |
"jinja2>=3.0.0", | |
"seaborn>=0.11.0" | |
] | |
def install_dependencies(): | |
"""Install required packages with error handling.""" | |
for pkg in REQUIRED_PACKAGES: | |
try: | |
__import__(pkg.split('>=')[0].replace('-', '_')) | |
except ImportError: | |
print(f"Installing {pkg}...") | |
try: | |
subprocess.check_call([ | |
sys.executable, "-m", "pip", "install", pkg, "--quiet" | |
]) | |
except subprocess.CalledProcessError as e: | |
print(f"Failed to install {pkg}: {e}") | |
sys.exit(1) | |
# Install dependencies before importing | |
install_dependencies() | |
# Import audio processing libraries | |
from pydub import AudioSegment | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import matplotlib.patches as patches | |
from matplotlib.backends.backend_pdf import PdfPages | |
import seaborn as sns | |
from scipy import signal, stats | |
from scipy.fft import fft, fftfreq | |
import librosa | |
from tqdm import tqdm | |
from jinja2 import Template | |
# Configure matplotlib for better plots | |
plt.style.use('seaborn-v0_8-darkgrid') | |
sns.set_palette("husl") | |
# Logging configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(funcName)s:%(lineno)d - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S" | |
) | |
logger = logging.getLogger(__name__) | |
class AudioAnalysisConfig: | |
"""Configuration class for audio analysis parameters.""" | |
def __init__(self): | |
# Supported audio formats | |
self.supported_formats = {'.mp3', '.wav', '.flac', '.m4a', '.aac', '.ogg', '.wma'} | |
# Analysis parameters | |
self.chunk_size = 1024 * 1024 # 1MB chunks for memory efficiency | |
self.spectral_bands = 10 # Number of frequency bands for analysis | |
self.window_size = 2048 # FFT window size | |
self.hop_length = 512 # Hop length for STFT | |
self.fmax = 8000 # Maximum frequency for analysis | |
# Visualization parameters | |
self.figsize = (12, 8) | |
self.dpi = 300 | |
self.waveform_downsample = 10000 # Max points for waveform plot | |
# Processing parameters | |
self.max_workers = min(mp.cpu_count(), 8) | |
self.enable_parallel = True | |
class AudioMetrics: | |
"""Class to compute comprehensive audio metrics.""" | |
@staticmethod | |
def compute_basic_stats(audio: AudioSegment) -> Dict[str, Any]: | |
"""Compute basic audio statistics.""" | |
return { | |
"duration_seconds": len(audio) / 1000.0, | |
"channels": audio.channels, | |
"frame_rate": audio.frame_rate, | |
"sample_width": audio.sample_width, | |
"rms": audio.rms, | |
"average_loudness_dBFS": audio.dBFS, | |
"max_loudness_dBFS": audio.max_dBFS, | |
"file_size_mb": 0 # Will be filled by caller | |
} | |
@staticmethod | |
def compute_spectral_features(y: np.ndarray, sr: int, config: AudioAnalysisConfig) -> Dict[str, Any]: | |
"""Compute advanced spectral features using librosa.""" | |
try: | |
# Spectral centroid | |
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0] | |
# Spectral rolloff | |
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0] | |
# Zero crossing rate | |
zcr = librosa.feature.zero_crossing_rate(y)[0] | |
# Spectral bandwidth | |
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0] | |
# MFCC features | |
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) | |
# Chroma features | |
chroma = librosa.feature.chroma_stft(y=y, sr=sr) | |
# Tempo estimation | |
tempo, beats = librosa.beat.beat_track(y=y, sr=sr) | |
# RMS energy | |
rms_energy = librosa.feature.rms(y=y)[0] | |
return { | |
"spectral_centroid_mean": float(np.mean(spectral_centroids)), | |
"spectral_centroid_std": float(np.std(spectral_centroids)), | |
"spectral_rolloff_mean": float(np.mean(spectral_rolloff)), | |
"spectral_rolloff_std": float(np.std(spectral_rolloff)), | |
"zero_crossing_rate_mean": float(np.mean(zcr)), | |
"zero_crossing_rate_std": float(np.std(zcr)), | |
"spectral_bandwidth_mean": float(np.mean(spectral_bandwidth)), | |
"spectral_bandwidth_std": float(np.std(spectral_bandwidth)), | |
"tempo_bpm": float(tempo), | |
"beat_count": len(beats), | |
"rms_energy_mean": float(np.mean(rms_energy)), | |
"rms_energy_std": float(np.std(rms_energy)), | |
"mfcc_means": [float(np.mean(mfcc)) for mfcc in mfccs], | |
"chroma_means": [float(np.mean(chroma_band)) for chroma_band in chroma], | |
"dynamic_range_db": float(np.max(rms_energy) - np.min(rms_energy)) if len(rms_energy) > 0 else 0 | |
} | |
except Exception as e: | |
logger.warning(f"Error computing spectral features: {e}") | |
return {} | |
@staticmethod | |
def compute_frequency_analysis(samples: np.ndarray, sr: int, config: AudioAnalysisConfig) -> Dict[str, Any]: | |
"""Compute frequency domain analysis.""" | |
try: | |
# Compute FFT | |
fft_vals = fft(samples) | |
fft_freqs = fftfreq(len(samples), 1/sr) | |
# Only use positive frequencies | |
positive_freqs = fft_freqs[:len(fft_freqs)//2] | |
positive_fft = np.abs(fft_vals[:len(fft_vals)//2]) | |
# Find dominant frequencies | |
peak_indices = signal.find_peaks(positive_fft, height=np.max(positive_fft)*0.1)[0] | |
dominant_freqs = positive_freqs[peak_indices] | |
dominant_powers = positive_fft[peak_indices] | |
# Sort by power | |
sorted_indices = np.argsort(dominant_powers)[::-1] | |
top_freqs = dominant_freqs[sorted_indices[:10]] # Top 10 frequencies | |
# Frequency band analysis | |
bands = { | |
"sub_bass": (20, 60), | |
"bass": (60, 250), | |
"low_midrange": (250, 500), | |
"midrange": (500, 2000), | |
"upper_midrange": (2000, 4000), | |
"presence": (4000, 6000), | |
"brilliance": (6000, 20000) | |
} | |
band_powers = {} | |
for band_name, (low, high) in bands.items(): | |
band_mask = (positive_freqs >= low) & (positive_freqs <= high) | |
band_power = np.sum(positive_fft[band_mask]) | |
band_powers[f"{band_name}_power"] = float(band_power) | |
return { | |
"fundamental_frequency": float(positive_freqs[np.argmax(positive_fft)]), | |
"dominant_frequencies": [float(f) for f in top_freqs], | |
"spectral_centroid_fft": float(np.sum(positive_freqs * positive_fft) / np.sum(positive_fft)), | |
"spectral_spread": float(np.sqrt(np.sum(((positive_freqs - np.sum(positive_freqs * positive_fft) / np.sum(positive_fft))**2) * positive_fft) / np.sum(positive_fft))), | |
**band_powers | |
} | |
except Exception as e: | |
logger.warning(f"Error in frequency analysis: {e}") | |
return {} | |
class AudioVisualizer: | |
"""Class for generating audio visualizations.""" | |
def __init__(self, config: AudioAnalysisConfig): | |
self.config = config | |
def create_waveform_plot(self, samples: np.ndarray, sr: int, title: str, output_path: Path): | |
"""Create and save waveform visualization.""" | |
try: | |
# Downsample for visualization if needed | |
if len(samples) > self.config.waveform_downsample: | |
step = len(samples) // self.config.waveform_downsample | |
samples = samples[::step] | |
time_axis = np.linspace(0, len(samples) / sr, len(samples)) | |
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=self.config.figsize) | |
# Full waveform | |
ax1.plot(time_axis, samples, linewidth=0.5, alpha=0.8) | |
ax1.set_title(f"Waveform: {title}", fontsize=14, fontweight='bold') | |
ax1.set_xlabel("Time (seconds)") | |
ax1.set_ylabel("Amplitude") | |
ax1.grid(True, alpha=0.3) | |
# Zoomed section (first 10 seconds or full duration if shorter) | |
zoom_duration = min(10, len(samples) / sr) | |
zoom_samples = int(zoom_duration * sr) | |
if zoom_samples < len(samples): | |
ax2.plot(time_axis[:zoom_samples], samples[:zoom_samples], linewidth=0.8) | |
ax2.set_title(f"Waveform Detail (First {zoom_duration:.1f}s)", fontsize=12) | |
else: | |
ax2.plot(time_axis, samples, linewidth=0.8) | |
ax2.set_title("Waveform Detail (Full Duration)", fontsize=12) | |
ax2.set_xlabel("Time (seconds)") | |
ax2.set_ylabel("Amplitude") | |
ax2.grid(True, alpha=0.3) | |
plt.tight_layout() | |
plt.savefig(output_path, dpi=self.config.dpi, bbox_inches='tight') | |
plt.close() | |
except Exception as e: | |
logger.error(f"Error creating waveform plot: {e}") | |
def create_spectrogram(self, y: np.ndarray, sr: int, title: str, output_path: Path): | |
"""Create and save spectrogram visualization.""" | |
try: | |
# Compute spectrogram | |
D = librosa.amplitude_to_db(np.abs(librosa.stft(y, hop_length=self.config.hop_length)), ref=np.max) | |
fig, ax = plt.subplots(figsize=self.config.figsize) | |
img = librosa.display.specshow(D, y_axis='hz', x_axis='time', sr=sr, | |
hop_length=self.config.hop_length, ax=ax) | |
ax.set_title(f"Spectrogram: {title}", fontsize=14, fontweight='bold') | |
fig.colorbar(img, ax=ax, format="%+2.0f dB") | |
plt.tight_layout() | |
plt.savefig(output_path, dpi=self.config.dpi, bbox_inches='tight') | |
plt.close() | |
except Exception as e: | |
logger.error(f"Error creating spectrogram: {e}") | |
def create_frequency_analysis_plot(self, samples: np.ndarray, sr: int, title: str, output_path: Path): | |
"""Create frequency domain analysis visualization.""" | |
try: | |
# Compute FFT | |
fft_vals = fft(samples) | |
fft_freqs = fftfreq(len(samples), 1/sr) | |
# Only positive frequencies | |
positive_freqs = fft_freqs[:len(fft_freqs)//2] | |
positive_fft = np.abs(fft_vals[:len(fft_vals)//2]) | |
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=self.config.figsize) | |
# Full spectrum | |
ax1.semilogx(positive_freqs[1:], 20 * np.log10(positive_fft[1:])) | |
ax1.set_title(f"Frequency Spectrum: {title}", fontsize=14, fontweight='bold') | |
ax1.set_xlabel("Frequency (Hz)") | |
ax1.set_ylabel("Magnitude (dB)") | |
ax1.grid(True, alpha=0.3) | |
ax1.set_xlim([20, sr//2]) | |
# Frequency bands visualization | |
bands = { | |
"Sub Bass": (20, 60), | |
"Bass": (60, 250), | |
"Low Mid": (250, 500), | |
"Midrange": (500, 2000), | |
"Upper Mid": (2000, 4000), | |
"Presence": (4000, 6000), | |
"Brilliance": (6000, 20000) | |
} | |
band_powers = [] | |
band_names = [] | |
colors = plt.cm.Set3(np.linspace(0, 1, len(bands))) | |
for i, (band_name, (low, high)) in enumerate(bands.items()): | |
band_mask = (positive_freqs >= low) & (positive_freqs <= high) | |
if np.any(band_mask): | |
band_power = np.mean(positive_fft[band_mask]) | |
band_powers.append(20 * np.log10(band_power) if band_power > 0 else -60) | |
band_names.append(band_name) | |
# Add colored regions to spectrum plot | |
ax1.axvspan(low, min(high, sr//2), alpha=0.2, color=colors[i], label=band_name) | |
# Band power bar chart | |
bars = ax2.bar(band_names, band_powers, color=colors[:len(band_powers)]) | |
ax2.set_title("Frequency Band Analysis", fontsize=12) | |
ax2.set_ylabel("Average Power (dB)") | |
ax2.tick_params(axis='x', rotation=45) | |
ax2.grid(True, alpha=0.3) | |
# Add value labels on bars | |
for bar, power in zip(bars, band_powers): | |
height = bar.get_height() | |
ax2.text(bar.get_x() + bar.get_width()/2., height + 1, | |
f'{power:.1f}', ha='center', va='bottom', fontsize=9) | |
ax1.legend(loc='upper right', fontsize=8) | |
plt.tight_layout() | |
plt.savefig(output_path, dpi=self.config.dpi, bbox_inches='tight') | |
plt.close() | |
except Exception as e: | |
logger.error(f"Error creating frequency analysis plot: {e}") | |
class AudioAnalyzer: | |
"""Main audio analysis class.""" | |
def __init__(self, config: AudioAnalysisConfig): | |
self.config = config | |
self.visualizer = AudioVisualizer(config) | |
def analyze_single_file(self, file_path: Path, output_dir: Path) -> Optional[Dict[str, Any]]: | |
"""Analyze a single audio file comprehensively.""" | |
try: | |
logger.info(f"Processing: {file_path.name}") | |
# Create output directory for this file | |
safe_name = "".join(c for c in file_path.stem if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
track_folder = output_dir / safe_name | |
track_folder.mkdir(parents=True, exist_ok=True) | |
# Load audio file | |
audio = AudioSegment.from_file(str(file_path)) | |
# Convert to mono numpy array for analysis | |
samples = np.array(audio.get_array_of_samples()) | |
if audio.channels == 2: | |
samples = samples.reshape((-1, 2)).mean(axis=1) | |
# Normalize samples | |
if len(samples) > 0: | |
samples = samples.astype(np.float32) | |
if np.max(np.abs(samples)) > 0: | |
samples = samples / np.max(np.abs(samples)) | |
# Use librosa for more accurate sample rate | |
try: | |
y_librosa, sr_librosa = librosa.load(str(file_path), sr=None, mono=True) | |
except: | |
y_librosa, sr_librosa = samples, audio.frame_rate | |
# Compute comprehensive metrics | |
results = { | |
"filename": file_path.name, | |
"file_path": str(file_path), | |
"analysis_timestamp": datetime.now().isoformat(), | |
"file_hash": self._compute_file_hash(file_path), | |
"file_size_mb": round(file_path.stat().st_size / (1024 * 1024), 2) | |
} | |
# Basic audio statistics | |
basic_stats = AudioMetrics.compute_basic_stats(audio) | |
basic_stats["file_size_mb"] = results["file_size_mb"] | |
results.update(basic_stats) | |
# Advanced spectral features | |
spectral_features = AudioMetrics.compute_spectral_features(y_librosa, sr_librosa, self.config) | |
results.update(spectral_features) | |
# Frequency analysis | |
freq_analysis = AudioMetrics.compute_frequency_analysis(samples, audio.frame_rate, self.config) | |
results.update(freq_analysis) | |
# Generate visualizations | |
self._generate_visualizations(samples, y_librosa, audio.frame_rate, sr_librosa, | |
file_path.stem, track_folder) | |
# Save detailed results | |
self._save_results(results, track_folder) | |
logger.info(f"β Completed analysis: {file_path.name}") | |
return results | |
except Exception as e: | |
logger.error(f"Failed to analyze {file_path.name}: {e}") | |
return None | |
def _compute_file_hash(self, file_path: Path) -> str: | |
"""Compute SHA-256 hash of file for integrity checking.""" | |
hash_sha256 = hashlib.sha256() | |
try: | |
with open(file_path, "rb") as f: | |
for chunk in iter(lambda: f.read(4096), b""): | |
hash_sha256.update(chunk) | |
return hash_sha256.hexdigest() | |
except: | |
return "unknown" | |
def _generate_visualizations(self, samples: np.ndarray, y_librosa: np.ndarray, | |
sr: int, sr_librosa: int, title: str, output_dir: Path): | |
"""Generate all visualizations for the audio file.""" | |
try: | |
# Waveform plot | |
waveform_path = output_dir / "waveform.png" | |
self.visualizer.create_waveform_plot(samples, sr, title, waveform_path) | |
# Spectrogram | |
spectrogram_path = output_dir / "spectrogram.png" | |
self.visualizer.create_spectrogram(y_librosa, sr_librosa, title, spectrogram_path) | |
# Frequency analysis | |
frequency_path = output_dir / "frequency_analysis.png" | |
self.visualizer.create_frequency_analysis_plot(samples, sr, title, frequency_path) | |
except Exception as e: | |
logger.error(f"Error generating visualizations for {title}: {e}") | |
def _save_results(self, results: Dict[str, Any], output_dir: Path): | |
"""Save analysis results in multiple formats.""" | |
# JSON format | |
json_path = output_dir / "analysis.json" | |
with open(json_path, 'w') as f: | |
json.dump(results, f, indent=2, default=str) | |
# Summary text file | |
summary_path = output_dir / "summary.txt" | |
with open(summary_path, 'w') as f: | |
f.write(f"Audio Analysis Summary\n") | |
f.write(f"=" * 50 + "\n\n") | |
f.write(f"File: {results['filename']}\n") | |
f.write(f"Duration: {results['duration_seconds']:.2f} seconds\n") | |
f.write(f"Sample Rate: {results['frame_rate']} Hz\n") | |
f.write(f"Channels: {results['channels']}\n") | |
f.write(f"Average Loudness: {results['average_loudness_dBFS']:.2f} dBFS\n") | |
f.write(f"Max Loudness: {results['max_loudness_dBFS']:.2f} dBFS\n") | |
if 'tempo_bpm' in results: | |
f.write(f"Tempo: {results['tempo_bpm']:.1f} BPM\n") | |
if 'spectral_centroid_mean' in results: | |
f.write(f"Spectral Centroid: {results['spectral_centroid_mean']:.1f} Hz\n") | |
class BatchProcessor: | |
"""Handles batch processing of multiple audio files.""" | |
def __init__(self, config: AudioAnalysisConfig): | |
self.config = config | |
self.analyzer = AudioAnalyzer(config) | |
def process_directory(self, input_dir: Path, output_dir: Path) -> List[Dict[str, Any]]: | |
"""Process all supported audio files in a directory.""" | |
# Find all supported audio files | |
audio_files = [] | |
for ext in self.config.supported_formats: | |
audio_files.extend(list(input_dir.glob(f"*{ext}"))) | |
audio_files.extend(list(input_dir.glob(f"*{ext.upper()}"))) | |
audio_files = sorted(set(audio_files)) # Remove duplicates and sort | |
if not audio_files: | |
logger.warning(f"No supported audio files found in {input_dir}") | |
return [] | |
logger.info(f"Found {len(audio_files)} audio files to process") | |
# Process files | |
all_results = [] | |
if self.config.enable_parallel and len(audio_files) > 1: | |
# Parallel processing | |
with ProcessPoolExecutor(max_workers=self.config.max_workers) as executor: | |
future_to_file = { | |
executor.submit(self.analyzer.analyze_single_file, file_path, output_dir): file_path | |
for file_path in audio_files | |
} | |
for future in tqdm(as_completed(future_to_file), total=len(audio_files), | |
desc="Processing files"): | |
result = future.result() | |
if result: | |
all_results.append(result) | |
else: | |
# Sequential processing | |
for file_path in tqdm(audio_files, desc="Processing files"): | |
result = self.analyzer.analyze_single_file(file_path, output_dir) | |
if result: | |
all_results.append(result) | |
# Generate batch report | |
self._generate_batch_report(all_results, output_dir) | |
return all_results | |
def _generate_batch_report(self, results: List[Dict[str, Any]], output_dir: Path): | |
"""Generate comprehensive batch analysis report.""" | |
if not results: | |
return | |
# CSV summary | |
csv_path = output_dir / "batch_summary.csv" | |
fieldnames = set() | |
for result in results: | |
fieldnames.update(result.keys()) | |
with open(csv_path, 'w', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=sorted(fieldnames)) | |
writer.writeheader() | |
writer.writerows(results) | |
# HTML Report | |
html_path = output_dir / "batch_report.html" | |
self._generate_html_report(results, html_path) | |
# Statistical summary | |
stats_path = output_dir / "batch_statistics.json" | |
batch_stats = self._compute_batch_statistics(results) | |
with open(stats_path, 'w') as f: | |
json.dump(batch_stats, f, indent=2, default=str) | |
logger.info(f"Batch report generated: {html_path}") | |
def _compute_batch_statistics(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
"""Compute statistical summary of the batch.""" | |
numeric_fields = ['duration_seconds', 'average_loudness_dBFS', 'max_loudness_dBFS', | |
'spectral_centroid_mean', 'tempo_bpm', 'file_size_mb'] | |
stats = { | |
"total_files": len(results), | |
"total_duration_hours": sum(r.get('duration_seconds', 0) for r in results) / 3600, | |
"analysis_timestamp": datetime.now().isoformat() | |
} | |
for field in numeric_fields: | |
values = [r.get(field) for r in results if r.get(field) is not None] | |
if values: | |
stats[field] = { | |
"mean": float(np.mean(values)), | |
"median": float(np.median(values)), | |
"std": float(np.std(values)), | |
"min": float(np.min(values)), | |
"max": float(np.max(values)) | |
} | |
return stats | |
def _generate_html_report(self, results: List[Dict[str, Any]], output_path: Path): | |
"""Generate an HTML report with embedded visualizations.""" | |
html_template = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Audio Analysis Batch Report</title> | |
<style> | |
body { font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; } | |
.header { background-color: #2c3e50; color: white; padding: 20px; border-radius: 5px; } | |
.summary { background-color: white; padding: 15px; margin: 20px 0; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } | |
.file-entry { background-color: white; margin: 10px 0; padding: 15px; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } | |
.stats-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 10px; } | |
.stat-item { background-color: #ecf0f1; padding: 10px; border-radius: 3px; text-align: center; } | |
.stat-value { font-size: 1.5em; font-weight: bold; color: #2c3e50; } | |
.stat-label { font-size: 0.9em; color: #7f8c8d; } | |
table { width: 100%; border-collapse: collapse; margin-top: 10px; } | |
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; } | |
th { background-color: #34495e; color: white; } | |
.file-name { font-weight: bold; color: #2c3e50; } | |
</style> | |
</head> | |
<body> | |
<div class="header"> | |
<h1>π΅ Audio Analysis Batch Report</h1> | |
<p>Generated on {{ timestamp }}</p> | |
</div> | |
<div class="summary"> | |
<h2>π Summary Statistics</h2> | |
<div class="stats-grid"> | |
<div class="stat-item"> | |
<div class="stat-value">{{ total_files }}</div> | |
<div class="stat-label">Total Files</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{{ "%.1f"|format(total_duration) }}</div> | |
<div class="stat-label">Total Duration (hours)</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{{ "%.1f"|format(avg_duration) }}</div> | |
<div class="stat-label">Avg Duration (min)</div> | |
</div> | |
<div class="stat-item"> | |
<div class="stat-value">{{ "%.1f"|format(total_size) }}</div> | |
<div class="stat-label">Total Size (MB)</div> | |
</div> | |
</div> | |
</div> | |
<div class="summary"> | |
<h2>π File Details</h2> | |
<table> | |
<thead> | |
<tr> | |
<th>File Name</th> | |
<th>Duration</th> | |
<th>Loudness (dBFS)</th> | |
<th>Tempo (BPM)</th> | |
<th>Size (MB)</th> | |
</tr> | |
</thead> | |
<tbody> | |
{% for file in files %} | |
<tr> | |
<td class="file-name">{{ file.filename }}</td> | |
<td>{{ "%.1f"|format(file.duration_seconds) }}s</td> | |
<td>{{ "%.1f"|format(file.average_loudness_dBFS) }}</td> | |
<td>{{ "%.1f"|format(file.get('tempo_bpm', 0)) }}</td> | |
<td>{{ "%.1f"|format(file.file_size_mb) }}</td> | |
</tr> | |
{% endfor %} | |
</tbody> | |
</table> | |
</div> | |
</body> | |
</html> | |
""" | |
# Prepare template data | |
total_duration = sum(r.get('duration_seconds', 0) for r in results) / 3600 | |
avg_duration = sum(r.get('duration_seconds', 0) for r in results) / len(results) / 60 | |
total_size = sum(r.get('file_size_mb', 0) for r in results) | |
template = Template(html_template) | |
html_content = template.render( | |
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
total_files=len(results), | |
total_duration=total_duration, | |
avg_duration=avg_duration, | |
total_size=total_size, | |
files=results | |
) | |
with open(output_path, 'w') as f: | |
f.write(html_content) | |
def main(): | |
"""Main entry point with enhanced argument parsing.""" | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Enhanced Audio Analysis Toolkit - Comprehensive batch audio file analysis", | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Examples: | |
%(prog)s /path/to/music --output results | |
%(prog)s /path/to/music -o results --parallel --workers 4 | |
%(prog)s /path/to/music --no-parallel --verbose | |
""" | |
) | |
parser.add_argument("input_dir", help="Directory containing audio files to analyze") | |
parser.add_argument("-o", "--output-dir", default="audio_analysis_output", | |
help="Output directory for analysis results (default: audio_analysis_output)") | |
parser.add_argument("--parallel", action="store_true", default=True, | |
help="Enable parallel processing (default)") | |
parser.add_argument("--no-parallel", action="store_false", dest="parallel", | |
help="Disable parallel processing") | |
parser.add_argument("--workers", type=int, default=None, | |
help="Number of parallel workers (default: auto)") | |
parser.add_argument("--verbose", "-v", action="store_true", | |
help="Enable verbose logging") | |
parser.add_argument("--formats", nargs="+", default=None, | |
help="Specific audio formats to process (e.g., --formats mp3 wav)") | |
args = parser.parse_args() | |
# Configure logging | |
if args.verbose: | |
logging.getLogger().setLevel(logging.DEBUG) | |
# Validate input directory | |
input_dir = Path(args.input_dir) | |
if not input_dir.is_dir(): | |
logger.error(f"Input directory not found: {input_dir}") | |
sys.exit(1) | |
# Setup configuration | |
config = AudioAnalysisConfig() | |
config.enable_parallel = args.parallel | |
if args.workers: | |
config.max_workers = args.workers | |
if args.formats: | |
# Filter supported formats | |
requested_formats = {f".{fmt.lower().lstrip('.')}" for fmt in args.formats} | |
config.supported_formats = config.supported_formats.intersection(requested_formats) | |
if not config.supported_formats: | |
logger.error("No valid audio formats specified") | |
sys.exit(1) | |
# Create output directory | |
output_dir = Path(args.output_dir) | |
output_dir.mkdir(exist_ok=True) | |
# Log configuration | |
logger.info(f"π΅ Enhanced Audio Analysis Toolkit v2.0") | |
logger.info(f"Input directory: {input_dir}") | |
logger.info(f"Output directory: {output_dir}") | |
logger.info(f"Supported formats: {', '.join(sorted(config.supported_formats))}") | |
logger.info(f"Parallel processing: {'Enabled' if config.enable_parallel else 'Disabled'}") | |
if config.enable_parallel: | |
logger.info(f"Max workers: {config.max_workers}") | |
# Process files | |
try: | |
processor = BatchProcessor(config) | |
results = processor.process_directory(input_dir, output_dir) | |
if results: | |
logger.info(f"β Analysis complete! Processed {len(results)} files") | |
logger.info(f"π Results saved to: {output_dir}") | |
logger.info(f"π View batch report: {output_dir / 'batch_report.html'}") | |
else: | |
logger.warning("No files were successfully processed") | |
except KeyboardInterrupt: | |
logger.info("Analysis interrupted by user") | |
sys.exit(1) | |
except Exception as e: | |
logger.error(f"Analysis failed: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment