Skip to content

Instantly share code, notes, and snippets.

@SWORDIntel
Created June 24, 2025 06:21
Show Gist options
  • Save SWORDIntel/de76c4e9b279628af083c3b005409bc6 to your computer and use it in GitHub Desktop.
Save SWORDIntel/de76c4e9b279628af083c3b005409bc6 to your computer and use it in GitHub Desktop.
Enhanced RAM-fucking toolkit
#!/usr/bin/env python3
"""
enhanced_audio_toolkit.py
Advanced audio analysis toolkit for comprehensive batch processing of audio files.
Supports multiple formats, generates detailed statistics, visualizations, and reports.
Features:
- Multi-format support (MP3, WAV, FLAC, M4A, OGG, etc.)
- Advanced audio metrics and spectral analysis
- Multiple visualization types (waveform, spectrogram, frequency analysis)
- Batch processing with parallel execution
- Comprehensive reporting (JSON, CSV, HTML)
- Memory-efficient chunked processing
- Configurable analysis parameters
Author: Enhanced by ARCHITECT v6.2
"""
import os
import sys
import subprocess
import logging
import json
import csv
import hashlib
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
import warnings
warnings.filterwarnings("ignore")
# Dependency management with version pinning
REQUIRED_PACKAGES = [
"pydub>=0.25.1",
"numpy>=1.21.0",
"matplotlib>=3.5.0",
"scipy>=1.7.0",
"librosa>=0.9.0",
"tqdm>=4.62.0",
"jinja2>=3.0.0",
"seaborn>=0.11.0"
]
def install_dependencies():
"""Install required packages with error handling."""
for pkg in REQUIRED_PACKAGES:
try:
__import__(pkg.split('>=')[0].replace('-', '_'))
except ImportError:
print(f"Installing {pkg}...")
try:
subprocess.check_call([
sys.executable, "-m", "pip", "install", pkg, "--quiet"
])
except subprocess.CalledProcessError as e:
print(f"Failed to install {pkg}: {e}")
sys.exit(1)
# Install dependencies before importing
install_dependencies()
# Import audio processing libraries
from pydub import AudioSegment
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.backends.backend_pdf import PdfPages
import seaborn as sns
from scipy import signal, stats
from scipy.fft import fft, fftfreq
import librosa
from tqdm import tqdm
from jinja2 import Template
# Configure matplotlib for better plots
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(funcName)s:%(lineno)d - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
class AudioAnalysisConfig:
"""Configuration class for audio analysis parameters."""
def __init__(self):
# Supported audio formats
self.supported_formats = {'.mp3', '.wav', '.flac', '.m4a', '.aac', '.ogg', '.wma'}
# Analysis parameters
self.chunk_size = 1024 * 1024 # 1MB chunks for memory efficiency
self.spectral_bands = 10 # Number of frequency bands for analysis
self.window_size = 2048 # FFT window size
self.hop_length = 512 # Hop length for STFT
self.fmax = 8000 # Maximum frequency for analysis
# Visualization parameters
self.figsize = (12, 8)
self.dpi = 300
self.waveform_downsample = 10000 # Max points for waveform plot
# Processing parameters
self.max_workers = min(mp.cpu_count(), 8)
self.enable_parallel = True
class AudioMetrics:
"""Class to compute comprehensive audio metrics."""
@staticmethod
def compute_basic_stats(audio: AudioSegment) -> Dict[str, Any]:
"""Compute basic audio statistics."""
return {
"duration_seconds": len(audio) / 1000.0,
"channels": audio.channels,
"frame_rate": audio.frame_rate,
"sample_width": audio.sample_width,
"rms": audio.rms,
"average_loudness_dBFS": audio.dBFS,
"max_loudness_dBFS": audio.max_dBFS,
"file_size_mb": 0 # Will be filled by caller
}
@staticmethod
def compute_spectral_features(y: np.ndarray, sr: int, config: AudioAnalysisConfig) -> Dict[str, Any]:
"""Compute advanced spectral features using librosa."""
try:
# Spectral centroid
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
# Spectral rolloff
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0]
# Zero crossing rate
zcr = librosa.feature.zero_crossing_rate(y)[0]
# Spectral bandwidth
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0]
# MFCC features
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
# Chroma features
chroma = librosa.feature.chroma_stft(y=y, sr=sr)
# Tempo estimation
tempo, beats = librosa.beat.beat_track(y=y, sr=sr)
# RMS energy
rms_energy = librosa.feature.rms(y=y)[0]
return {
"spectral_centroid_mean": float(np.mean(spectral_centroids)),
"spectral_centroid_std": float(np.std(spectral_centroids)),
"spectral_rolloff_mean": float(np.mean(spectral_rolloff)),
"spectral_rolloff_std": float(np.std(spectral_rolloff)),
"zero_crossing_rate_mean": float(np.mean(zcr)),
"zero_crossing_rate_std": float(np.std(zcr)),
"spectral_bandwidth_mean": float(np.mean(spectral_bandwidth)),
"spectral_bandwidth_std": float(np.std(spectral_bandwidth)),
"tempo_bpm": float(tempo),
"beat_count": len(beats),
"rms_energy_mean": float(np.mean(rms_energy)),
"rms_energy_std": float(np.std(rms_energy)),
"mfcc_means": [float(np.mean(mfcc)) for mfcc in mfccs],
"chroma_means": [float(np.mean(chroma_band)) for chroma_band in chroma],
"dynamic_range_db": float(np.max(rms_energy) - np.min(rms_energy)) if len(rms_energy) > 0 else 0
}
except Exception as e:
logger.warning(f"Error computing spectral features: {e}")
return {}
@staticmethod
def compute_frequency_analysis(samples: np.ndarray, sr: int, config: AudioAnalysisConfig) -> Dict[str, Any]:
"""Compute frequency domain analysis."""
try:
# Compute FFT
fft_vals = fft(samples)
fft_freqs = fftfreq(len(samples), 1/sr)
# Only use positive frequencies
positive_freqs = fft_freqs[:len(fft_freqs)//2]
positive_fft = np.abs(fft_vals[:len(fft_vals)//2])
# Find dominant frequencies
peak_indices = signal.find_peaks(positive_fft, height=np.max(positive_fft)*0.1)[0]
dominant_freqs = positive_freqs[peak_indices]
dominant_powers = positive_fft[peak_indices]
# Sort by power
sorted_indices = np.argsort(dominant_powers)[::-1]
top_freqs = dominant_freqs[sorted_indices[:10]] # Top 10 frequencies
# Frequency band analysis
bands = {
"sub_bass": (20, 60),
"bass": (60, 250),
"low_midrange": (250, 500),
"midrange": (500, 2000),
"upper_midrange": (2000, 4000),
"presence": (4000, 6000),
"brilliance": (6000, 20000)
}
band_powers = {}
for band_name, (low, high) in bands.items():
band_mask = (positive_freqs >= low) & (positive_freqs <= high)
band_power = np.sum(positive_fft[band_mask])
band_powers[f"{band_name}_power"] = float(band_power)
return {
"fundamental_frequency": float(positive_freqs[np.argmax(positive_fft)]),
"dominant_frequencies": [float(f) for f in top_freqs],
"spectral_centroid_fft": float(np.sum(positive_freqs * positive_fft) / np.sum(positive_fft)),
"spectral_spread": float(np.sqrt(np.sum(((positive_freqs - np.sum(positive_freqs * positive_fft) / np.sum(positive_fft))**2) * positive_fft) / np.sum(positive_fft))),
**band_powers
}
except Exception as e:
logger.warning(f"Error in frequency analysis: {e}")
return {}
class AudioVisualizer:
"""Class for generating audio visualizations."""
def __init__(self, config: AudioAnalysisConfig):
self.config = config
def create_waveform_plot(self, samples: np.ndarray, sr: int, title: str, output_path: Path):
"""Create and save waveform visualization."""
try:
# Downsample for visualization if needed
if len(samples) > self.config.waveform_downsample:
step = len(samples) // self.config.waveform_downsample
samples = samples[::step]
time_axis = np.linspace(0, len(samples) / sr, len(samples))
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=self.config.figsize)
# Full waveform
ax1.plot(time_axis, samples, linewidth=0.5, alpha=0.8)
ax1.set_title(f"Waveform: {title}", fontsize=14, fontweight='bold')
ax1.set_xlabel("Time (seconds)")
ax1.set_ylabel("Amplitude")
ax1.grid(True, alpha=0.3)
# Zoomed section (first 10 seconds or full duration if shorter)
zoom_duration = min(10, len(samples) / sr)
zoom_samples = int(zoom_duration * sr)
if zoom_samples < len(samples):
ax2.plot(time_axis[:zoom_samples], samples[:zoom_samples], linewidth=0.8)
ax2.set_title(f"Waveform Detail (First {zoom_duration:.1f}s)", fontsize=12)
else:
ax2.plot(time_axis, samples, linewidth=0.8)
ax2.set_title("Waveform Detail (Full Duration)", fontsize=12)
ax2.set_xlabel("Time (seconds)")
ax2.set_ylabel("Amplitude")
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_path, dpi=self.config.dpi, bbox_inches='tight')
plt.close()
except Exception as e:
logger.error(f"Error creating waveform plot: {e}")
def create_spectrogram(self, y: np.ndarray, sr: int, title: str, output_path: Path):
"""Create and save spectrogram visualization."""
try:
# Compute spectrogram
D = librosa.amplitude_to_db(np.abs(librosa.stft(y, hop_length=self.config.hop_length)), ref=np.max)
fig, ax = plt.subplots(figsize=self.config.figsize)
img = librosa.display.specshow(D, y_axis='hz', x_axis='time', sr=sr,
hop_length=self.config.hop_length, ax=ax)
ax.set_title(f"Spectrogram: {title}", fontsize=14, fontweight='bold')
fig.colorbar(img, ax=ax, format="%+2.0f dB")
plt.tight_layout()
plt.savefig(output_path, dpi=self.config.dpi, bbox_inches='tight')
plt.close()
except Exception as e:
logger.error(f"Error creating spectrogram: {e}")
def create_frequency_analysis_plot(self, samples: np.ndarray, sr: int, title: str, output_path: Path):
"""Create frequency domain analysis visualization."""
try:
# Compute FFT
fft_vals = fft(samples)
fft_freqs = fftfreq(len(samples), 1/sr)
# Only positive frequencies
positive_freqs = fft_freqs[:len(fft_freqs)//2]
positive_fft = np.abs(fft_vals[:len(fft_vals)//2])
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=self.config.figsize)
# Full spectrum
ax1.semilogx(positive_freqs[1:], 20 * np.log10(positive_fft[1:]))
ax1.set_title(f"Frequency Spectrum: {title}", fontsize=14, fontweight='bold')
ax1.set_xlabel("Frequency (Hz)")
ax1.set_ylabel("Magnitude (dB)")
ax1.grid(True, alpha=0.3)
ax1.set_xlim([20, sr//2])
# Frequency bands visualization
bands = {
"Sub Bass": (20, 60),
"Bass": (60, 250),
"Low Mid": (250, 500),
"Midrange": (500, 2000),
"Upper Mid": (2000, 4000),
"Presence": (4000, 6000),
"Brilliance": (6000, 20000)
}
band_powers = []
band_names = []
colors = plt.cm.Set3(np.linspace(0, 1, len(bands)))
for i, (band_name, (low, high)) in enumerate(bands.items()):
band_mask = (positive_freqs >= low) & (positive_freqs <= high)
if np.any(band_mask):
band_power = np.mean(positive_fft[band_mask])
band_powers.append(20 * np.log10(band_power) if band_power > 0 else -60)
band_names.append(band_name)
# Add colored regions to spectrum plot
ax1.axvspan(low, min(high, sr//2), alpha=0.2, color=colors[i], label=band_name)
# Band power bar chart
bars = ax2.bar(band_names, band_powers, color=colors[:len(band_powers)])
ax2.set_title("Frequency Band Analysis", fontsize=12)
ax2.set_ylabel("Average Power (dB)")
ax2.tick_params(axis='x', rotation=45)
ax2.grid(True, alpha=0.3)
# Add value labels on bars
for bar, power in zip(bars, band_powers):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height + 1,
f'{power:.1f}', ha='center', va='bottom', fontsize=9)
ax1.legend(loc='upper right', fontsize=8)
plt.tight_layout()
plt.savefig(output_path, dpi=self.config.dpi, bbox_inches='tight')
plt.close()
except Exception as e:
logger.error(f"Error creating frequency analysis plot: {e}")
class AudioAnalyzer:
"""Main audio analysis class."""
def __init__(self, config: AudioAnalysisConfig):
self.config = config
self.visualizer = AudioVisualizer(config)
def analyze_single_file(self, file_path: Path, output_dir: Path) -> Optional[Dict[str, Any]]:
"""Analyze a single audio file comprehensively."""
try:
logger.info(f"Processing: {file_path.name}")
# Create output directory for this file
safe_name = "".join(c for c in file_path.stem if c.isalnum() or c in (' ', '-', '_')).rstrip()
track_folder = output_dir / safe_name
track_folder.mkdir(parents=True, exist_ok=True)
# Load audio file
audio = AudioSegment.from_file(str(file_path))
# Convert to mono numpy array for analysis
samples = np.array(audio.get_array_of_samples())
if audio.channels == 2:
samples = samples.reshape((-1, 2)).mean(axis=1)
# Normalize samples
if len(samples) > 0:
samples = samples.astype(np.float32)
if np.max(np.abs(samples)) > 0:
samples = samples / np.max(np.abs(samples))
# Use librosa for more accurate sample rate
try:
y_librosa, sr_librosa = librosa.load(str(file_path), sr=None, mono=True)
except:
y_librosa, sr_librosa = samples, audio.frame_rate
# Compute comprehensive metrics
results = {
"filename": file_path.name,
"file_path": str(file_path),
"analysis_timestamp": datetime.now().isoformat(),
"file_hash": self._compute_file_hash(file_path),
"file_size_mb": round(file_path.stat().st_size / (1024 * 1024), 2)
}
# Basic audio statistics
basic_stats = AudioMetrics.compute_basic_stats(audio)
basic_stats["file_size_mb"] = results["file_size_mb"]
results.update(basic_stats)
# Advanced spectral features
spectral_features = AudioMetrics.compute_spectral_features(y_librosa, sr_librosa, self.config)
results.update(spectral_features)
# Frequency analysis
freq_analysis = AudioMetrics.compute_frequency_analysis(samples, audio.frame_rate, self.config)
results.update(freq_analysis)
# Generate visualizations
self._generate_visualizations(samples, y_librosa, audio.frame_rate, sr_librosa,
file_path.stem, track_folder)
# Save detailed results
self._save_results(results, track_folder)
logger.info(f"βœ“ Completed analysis: {file_path.name}")
return results
except Exception as e:
logger.error(f"Failed to analyze {file_path.name}: {e}")
return None
def _compute_file_hash(self, file_path: Path) -> str:
"""Compute SHA-256 hash of file for integrity checking."""
hash_sha256 = hashlib.sha256()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
except:
return "unknown"
def _generate_visualizations(self, samples: np.ndarray, y_librosa: np.ndarray,
sr: int, sr_librosa: int, title: str, output_dir: Path):
"""Generate all visualizations for the audio file."""
try:
# Waveform plot
waveform_path = output_dir / "waveform.png"
self.visualizer.create_waveform_plot(samples, sr, title, waveform_path)
# Spectrogram
spectrogram_path = output_dir / "spectrogram.png"
self.visualizer.create_spectrogram(y_librosa, sr_librosa, title, spectrogram_path)
# Frequency analysis
frequency_path = output_dir / "frequency_analysis.png"
self.visualizer.create_frequency_analysis_plot(samples, sr, title, frequency_path)
except Exception as e:
logger.error(f"Error generating visualizations for {title}: {e}")
def _save_results(self, results: Dict[str, Any], output_dir: Path):
"""Save analysis results in multiple formats."""
# JSON format
json_path = output_dir / "analysis.json"
with open(json_path, 'w') as f:
json.dump(results, f, indent=2, default=str)
# Summary text file
summary_path = output_dir / "summary.txt"
with open(summary_path, 'w') as f:
f.write(f"Audio Analysis Summary\n")
f.write(f"=" * 50 + "\n\n")
f.write(f"File: {results['filename']}\n")
f.write(f"Duration: {results['duration_seconds']:.2f} seconds\n")
f.write(f"Sample Rate: {results['frame_rate']} Hz\n")
f.write(f"Channels: {results['channels']}\n")
f.write(f"Average Loudness: {results['average_loudness_dBFS']:.2f} dBFS\n")
f.write(f"Max Loudness: {results['max_loudness_dBFS']:.2f} dBFS\n")
if 'tempo_bpm' in results:
f.write(f"Tempo: {results['tempo_bpm']:.1f} BPM\n")
if 'spectral_centroid_mean' in results:
f.write(f"Spectral Centroid: {results['spectral_centroid_mean']:.1f} Hz\n")
class BatchProcessor:
"""Handles batch processing of multiple audio files."""
def __init__(self, config: AudioAnalysisConfig):
self.config = config
self.analyzer = AudioAnalyzer(config)
def process_directory(self, input_dir: Path, output_dir: Path) -> List[Dict[str, Any]]:
"""Process all supported audio files in a directory."""
# Find all supported audio files
audio_files = []
for ext in self.config.supported_formats:
audio_files.extend(list(input_dir.glob(f"*{ext}")))
audio_files.extend(list(input_dir.glob(f"*{ext.upper()}")))
audio_files = sorted(set(audio_files)) # Remove duplicates and sort
if not audio_files:
logger.warning(f"No supported audio files found in {input_dir}")
return []
logger.info(f"Found {len(audio_files)} audio files to process")
# Process files
all_results = []
if self.config.enable_parallel and len(audio_files) > 1:
# Parallel processing
with ProcessPoolExecutor(max_workers=self.config.max_workers) as executor:
future_to_file = {
executor.submit(self.analyzer.analyze_single_file, file_path, output_dir): file_path
for file_path in audio_files
}
for future in tqdm(as_completed(future_to_file), total=len(audio_files),
desc="Processing files"):
result = future.result()
if result:
all_results.append(result)
else:
# Sequential processing
for file_path in tqdm(audio_files, desc="Processing files"):
result = self.analyzer.analyze_single_file(file_path, output_dir)
if result:
all_results.append(result)
# Generate batch report
self._generate_batch_report(all_results, output_dir)
return all_results
def _generate_batch_report(self, results: List[Dict[str, Any]], output_dir: Path):
"""Generate comprehensive batch analysis report."""
if not results:
return
# CSV summary
csv_path = output_dir / "batch_summary.csv"
fieldnames = set()
for result in results:
fieldnames.update(result.keys())
with open(csv_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=sorted(fieldnames))
writer.writeheader()
writer.writerows(results)
# HTML Report
html_path = output_dir / "batch_report.html"
self._generate_html_report(results, html_path)
# Statistical summary
stats_path = output_dir / "batch_statistics.json"
batch_stats = self._compute_batch_statistics(results)
with open(stats_path, 'w') as f:
json.dump(batch_stats, f, indent=2, default=str)
logger.info(f"Batch report generated: {html_path}")
def _compute_batch_statistics(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Compute statistical summary of the batch."""
numeric_fields = ['duration_seconds', 'average_loudness_dBFS', 'max_loudness_dBFS',
'spectral_centroid_mean', 'tempo_bpm', 'file_size_mb']
stats = {
"total_files": len(results),
"total_duration_hours": sum(r.get('duration_seconds', 0) for r in results) / 3600,
"analysis_timestamp": datetime.now().isoformat()
}
for field in numeric_fields:
values = [r.get(field) for r in results if r.get(field) is not None]
if values:
stats[field] = {
"mean": float(np.mean(values)),
"median": float(np.median(values)),
"std": float(np.std(values)),
"min": float(np.min(values)),
"max": float(np.max(values))
}
return stats
def _generate_html_report(self, results: List[Dict[str, Any]], output_path: Path):
"""Generate an HTML report with embedded visualizations."""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Audio Analysis Batch Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }
.header { background-color: #2c3e50; color: white; padding: 20px; border-radius: 5px; }
.summary { background-color: white; padding: 15px; margin: 20px 0; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.file-entry { background-color: white; margin: 10px 0; padding: 15px; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.stats-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 10px; }
.stat-item { background-color: #ecf0f1; padding: 10px; border-radius: 3px; text-align: center; }
.stat-value { font-size: 1.5em; font-weight: bold; color: #2c3e50; }
.stat-label { font-size: 0.9em; color: #7f8c8d; }
table { width: 100%; border-collapse: collapse; margin-top: 10px; }
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
th { background-color: #34495e; color: white; }
.file-name { font-weight: bold; color: #2c3e50; }
</style>
</head>
<body>
<div class="header">
<h1>🎡 Audio Analysis Batch Report</h1>
<p>Generated on {{ timestamp }}</p>
</div>
<div class="summary">
<h2>πŸ“Š Summary Statistics</h2>
<div class="stats-grid">
<div class="stat-item">
<div class="stat-value">{{ total_files }}</div>
<div class="stat-label">Total Files</div>
</div>
<div class="stat-item">
<div class="stat-value">{{ "%.1f"|format(total_duration) }}</div>
<div class="stat-label">Total Duration (hours)</div>
</div>
<div class="stat-item">
<div class="stat-value">{{ "%.1f"|format(avg_duration) }}</div>
<div class="stat-label">Avg Duration (min)</div>
</div>
<div class="stat-item">
<div class="stat-value">{{ "%.1f"|format(total_size) }}</div>
<div class="stat-label">Total Size (MB)</div>
</div>
</div>
</div>
<div class="summary">
<h2>πŸ“‹ File Details</h2>
<table>
<thead>
<tr>
<th>File Name</th>
<th>Duration</th>
<th>Loudness (dBFS)</th>
<th>Tempo (BPM)</th>
<th>Size (MB)</th>
</tr>
</thead>
<tbody>
{% for file in files %}
<tr>
<td class="file-name">{{ file.filename }}</td>
<td>{{ "%.1f"|format(file.duration_seconds) }}s</td>
<td>{{ "%.1f"|format(file.average_loudness_dBFS) }}</td>
<td>{{ "%.1f"|format(file.get('tempo_bpm', 0)) }}</td>
<td>{{ "%.1f"|format(file.file_size_mb) }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</body>
</html>
"""
# Prepare template data
total_duration = sum(r.get('duration_seconds', 0) for r in results) / 3600
avg_duration = sum(r.get('duration_seconds', 0) for r in results) / len(results) / 60
total_size = sum(r.get('file_size_mb', 0) for r in results)
template = Template(html_template)
html_content = template.render(
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
total_files=len(results),
total_duration=total_duration,
avg_duration=avg_duration,
total_size=total_size,
files=results
)
with open(output_path, 'w') as f:
f.write(html_content)
def main():
"""Main entry point with enhanced argument parsing."""
import argparse
parser = argparse.ArgumentParser(
description="Enhanced Audio Analysis Toolkit - Comprehensive batch audio file analysis",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s /path/to/music --output results
%(prog)s /path/to/music -o results --parallel --workers 4
%(prog)s /path/to/music --no-parallel --verbose
"""
)
parser.add_argument("input_dir", help="Directory containing audio files to analyze")
parser.add_argument("-o", "--output-dir", default="audio_analysis_output",
help="Output directory for analysis results (default: audio_analysis_output)")
parser.add_argument("--parallel", action="store_true", default=True,
help="Enable parallel processing (default)")
parser.add_argument("--no-parallel", action="store_false", dest="parallel",
help="Disable parallel processing")
parser.add_argument("--workers", type=int, default=None,
help="Number of parallel workers (default: auto)")
parser.add_argument("--verbose", "-v", action="store_true",
help="Enable verbose logging")
parser.add_argument("--formats", nargs="+", default=None,
help="Specific audio formats to process (e.g., --formats mp3 wav)")
args = parser.parse_args()
# Configure logging
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Validate input directory
input_dir = Path(args.input_dir)
if not input_dir.is_dir():
logger.error(f"Input directory not found: {input_dir}")
sys.exit(1)
# Setup configuration
config = AudioAnalysisConfig()
config.enable_parallel = args.parallel
if args.workers:
config.max_workers = args.workers
if args.formats:
# Filter supported formats
requested_formats = {f".{fmt.lower().lstrip('.')}" for fmt in args.formats}
config.supported_formats = config.supported_formats.intersection(requested_formats)
if not config.supported_formats:
logger.error("No valid audio formats specified")
sys.exit(1)
# Create output directory
output_dir = Path(args.output_dir)
output_dir.mkdir(exist_ok=True)
# Log configuration
logger.info(f"🎡 Enhanced Audio Analysis Toolkit v2.0")
logger.info(f"Input directory: {input_dir}")
logger.info(f"Output directory: {output_dir}")
logger.info(f"Supported formats: {', '.join(sorted(config.supported_formats))}")
logger.info(f"Parallel processing: {'Enabled' if config.enable_parallel else 'Disabled'}")
if config.enable_parallel:
logger.info(f"Max workers: {config.max_workers}")
# Process files
try:
processor = BatchProcessor(config)
results = processor.process_directory(input_dir, output_dir)
if results:
logger.info(f"βœ… Analysis complete! Processed {len(results)} files")
logger.info(f"πŸ“ Results saved to: {output_dir}")
logger.info(f"πŸ“Š View batch report: {output_dir / 'batch_report.html'}")
else:
logger.warning("No files were successfully processed")
except KeyboardInterrupt:
logger.info("Analysis interrupted by user")
sys.exit(1)
except Exception as e:
logger.error(f"Analysis failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment