Last active
July 20, 2016 07:58
-
-
Save hetsch/b5e4f019a8cb4298f47dfe493bd67efa to your computer and use it in GitHub Desktop.
Replay gain processing with bs1770gain
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
## Usage: rate-music [0-5] | |
# | |
# Adds current playing song to the mpd playlist corresponding to the | |
# rating assigned. Any previous rating is removed. If 0 is given, the | |
# songs rating will be removed. | |
# | |
# From: https://bbs.archlinux.org/viewtopic.php?id=116113 | |
## USER CONFIGURATION----------------------------------------------------- | |
## Path to playlists | |
MOUNTPOINT="/Volumes/tank_music" | |
# mount if not mounted | |
if ! mount | grep "on $MOUNTPOINT" > /dev/null; then | |
echo "Mounting the music library" | |
# Mounting with password and other stuff | |
# see: http://apple.stackexchange.com/a/197608 | |
osascript -e 'tell application "Finder" to mount volume "smb://192.168.1.143/tank_music"' | |
fi | |
PLAYLISTDIR="$MOUNTPOINT/.mpd/playlists" | |
## END USER CONFIGURATION-------------------------------------------------- | |
## Prefix and suffix strings for the playlist file name | |
PL_PREFIX='Rating_' | |
PL_SUFFIX='.m3u' | |
## Get current song from ncmpcpp or cmus or throw an error | |
# SONG=`ncmpcpp --current-song '%D/%f' 2>/dev/null` || \ | |
# SONG=`cmus-remote -Q 2>/dev/null | grep file` || \ | |
# { echo "Error: you need either ncmpcpp or cmus installed to run this script. Aborting." >&2; exit 1; } | |
# ncmpcpp --current-song '%D/%f' did not work for me. Console commands for ncmpcpp have been deprecated | |
SONG=`mpc -h 192.168.1.143 -f '%file%' current` || { echo "Error: you need mpc installed to run this script. Aborting." >&2; exit 1; } | |
## Error cases | |
if [[ -z "$SONG" ]]; then | |
echo 'No song is playing.' | |
exit 1 | |
fi | |
if [[ -z "$1" || "$1" -lt 0 || "$1" -gt 5 ]]; then | |
echo "Rating must be between 1 and 5. Or 0 (zero) to delete the current song's rating." | |
exit 1 | |
fi | |
## Path to lock file | |
LOCK="/tmp/rate-music.lock" | |
## Lock the file | |
# exec 9>"$lock" | |
# if ! flock -n 9; then | |
# notify-send "Rating failed: Another instance is running." | |
# exit 1 | |
# fi | |
if ! lockfile -r 0 $LOCK; then | |
# see: http://apple.stackexchange.com/a/79504 | |
osascript -e "display notification \"Rating failed: Another instance is running\" with title \"MPD rating failed\"" | |
exit 1 | |
fi | |
## Strip "file " from the output | |
SONG=${SONG/file \///} | |
## Temporary file for grepping and sorting | |
TMP="$PLAYLISTDIR/tmp.m3u" | |
## Remove the song from all rating playlists | |
for n in {1..5}; do | |
f="$PLAYLISTDIR/${PL_PREFIX}$n${PL_SUFFIX}" | |
if [[ -f "$f" ]]; then | |
grep -vF "$SONG" "$f" > "$TMP" | |
mv -f $TMP $f | |
fi | |
done | |
## Append the song to the new rating playlist | |
if [[ $1 -ne 0 ]]; then | |
f="$PLAYLISTDIR/${PL_PREFIX}$1${PL_SUFFIX}" | |
mkdir -p "$PLAYLISTDIR" | |
echo "$SONG" >> "$f" | |
sort -u "$f" -o "$TMP" | |
mv -f $TMP $f | |
fi | |
## The lock file will be unlocked when the script ends | |
rm -f $LOCK |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import locale | |
import socket | |
import copy | |
import traceback | |
import collections | |
import lxml.html | |
import taglib | |
import mutagen | |
from mutagen.mp3 import MP3 | |
from mutagen.id3 import RVA2, TXXX | |
from mutagen.apev2 import APEv2 | |
from colorama import init, Fore, Back | |
init(autoreset=True) | |
MUSIC_LIBRARY_PATH = None | |
if socket.gethostname() == "r2d2": | |
MUSIC_LIBRARY_PATH = "/mnt/tank/music" | |
else: | |
MUSIC_LIBRARY_PATH = "/Volumes/tank_music" | |
def log_success(msg): | |
print(Fore.GREEN + msg) | |
def log_info(msg): | |
print(Fore.BLUE + msg) | |
def log_failure(msg): | |
print(Fore.RED + msg) | |
RG_TAGS = ( | |
'REPLAYGAIN_ALBUM_GAIN', | |
'REPLAYGAIN_ALBUM_PEAK', | |
'REPLAYGAIN_ALBUM_RANGE', | |
'REPLAYGAIN_TRACK_GAIN', | |
'REPLAYGAIN_TRACK_PEAK', | |
'REPLAYGAIN_TRACK_RANGE', | |
'REPLAYGAIN_REFERENCE_LOUDNESS', | |
'REPLAYGAIN_ALGORITHM' | |
) | |
def check(filename): | |
"""Returns True if the file has all ReplayGain data.""" | |
try: | |
tags = mutagen.File(filename) | |
if type(tags) != MP3: | |
return 'replaygain_track_peak' in tags and 'replaygain_track_gain' in tags | |
if 'TXXX:replaygain_track_peak' not in tags or 'TXXX:replaygain_track_gain' not in tags: | |
return False | |
if 'RVA2:track' not in tags: | |
return False | |
tags = APEv2(filename) | |
return 'replaygain_track_peak' in tags and 'replaygain_track_gain' in tags | |
except: | |
return False | |
def read(filename): | |
"""Returns (peak, gain) for a file.""" | |
peak = gain = None | |
def parse_rg(tags): | |
p = g = None | |
if 'replaygain_track_peak' in tags: | |
p = float(tags['replaygain_track_peak'][0]) | |
if 'replaygain_track_gain' in tags: | |
value = tags['replaygain_track_gain'][0] | |
if value.endswith(' dB'): | |
g = float(value[:-3]) | |
else: | |
log_failure('Malformed track gain info: "%s" in %s' % (value, filename)) | |
return (p, g) | |
try: | |
peak, gain = parse_rg(mutagen.File(filename, easy=True)) | |
except: | |
pass | |
# Prefer the first value because RVA2 is more precise than | |
# APE, formatted as %.2f. | |
if peak is None or gain is None: | |
try: | |
peak, gain = parse_rg(APEv2(filename)) | |
except: | |
pass | |
return (peak, gain) | |
def write(filename, peak, gain, range_, ref_loudness=-18.0, algorithm="ITU-R BS.1770"): | |
"""Writes RG tags to file.""" | |
if peak is None: | |
raise Exception('peak is None') | |
elif gain is None: | |
raise Exception('gain is None') | |
try: | |
tags = mutagen.File(filename) | |
# album and track data | |
# see: http://wiki.hydrogenaud.io/index.php?title=ReplayGain_2.0_specification#ID3v2 | |
# uppercase or lowercase tags (think that mpd uses lowercase) | |
# - see: http://getmusicbee.com/forum/index.php?topic=10394.msg73234#msg73234 | |
# - see: http://mpd.wikia.com/wiki/Hack:rg.py | |
# write the same tags as bs1770gain | |
# Note: bs1770gain uses "LU" in gain tags, specification proposes "dB". Usually they can be | |
# converted 1:1 (1 LU == 1 dB) | |
a_gain, t_gain = map(lambda n: "{:.2f} dB".format(n), gain) | |
a_peak, t_peak = map(lambda n: "{:.6f}".format(n), peak) | |
a_range, t_range = map(lambda n: "{:.2f}".format(n), range_) | |
data = ( | |
a_gain, | |
a_peak, | |
a_range, | |
t_gain, | |
t_peak, | |
t_range, | |
ref_loudness, | |
algorithm | |
) | |
# delete tags | |
for key in list(tags.keys()): | |
if key.upper().endswith(RG_TAGS): | |
tags.pop(key, None) | |
#tags.save(filename) | |
#return | |
if type(tags) == MP3: | |
# ID3v2.4 | |
for key, value in zip(RG_TAGS, data): | |
tags['TXXX:{}'.format(key)] = TXXX(encoding=0, desc=key, text=[value]) | |
# RVA2 | |
# shared = { | |
# 'reference_loudness': ref_loudness, | |
# 'alorithm': algorithm | |
# } | |
# tags['RVA2:album'] = RVA2(desc=u'album', channel=1, peak=a_peak, gain=a_gain, range=a_range, **shared) | |
# tags['RVA2:track'] = RVA2(desc=u'track', channel=1, peak=t_peak, gain=t_gain, range=t_range, **shared) | |
tags.save(filename) | |
# # Additionally write APEv2 tags to MP3 files. | |
# try: | |
# tags = APEv2(filename) | |
# except: | |
# tags = APEv2() | |
# for key, value in zip(RG_TAGS, data): | |
# tags[key.lower()] = value | |
# | |
# tags.save(filename) | |
else: | |
for key, value in zip(RG_TAGS, data): | |
tags[key] = value | |
tags.save(filename) | |
return True | |
except Exception as e: | |
log_failure(traceback.format_exc()) | |
return False | |
if __name__ == "__main__": | |
log_info("Lets go ...") | |
path = MUSIC_LIBRARY_PATH | |
if len(sys.argv) > 1: | |
path = os.path.abspath(sys.argv[1]) | |
if not os.path.exists(path): | |
log_failure("Root path <{}> does not exist".format(path)) | |
sys.exit(1) | |
folders = [] | |
for folder, subfolders, files in os.walk(path): | |
for file_ in files: | |
if file_ == "RG-Results.xml": | |
# see: https://forums.mp3tag.de/lofiversion/index.php?t20881.htm | |
# XMLParser is too strict and creates troubles with unescaped ampersands aso. | |
# see: http://stackoverflow.com/a/26267496 | |
html = lxml.html.parse(os.path.join(folder, file_)) | |
a_gain = float(html.xpath("//album/summary/integrated/@lu")[0]) | |
a_peak = float(html.xpath("//album/summary/true-peak/@factor")[0]) | |
a_range = float(html.xpath("//album/summary/range/@lufs")[0]) | |
# EBU R128 has reference level of -23.0 LUFS we used -18 LUFS, the same as foobar | |
ref_loudness = -18.0 | |
algorithm = "ITU-R BS.1770 (EBU R128)" | |
for track in html.xpath("//album/track"): | |
filepath = os.path.join(folder, track.xpath(".//@file")[0]) | |
t_gain = float(track.xpath("./integrated/@lu")[0]) | |
t_peak = float(track.xpath("./true-peak/@factor")[0]) | |
t_range = float(track.xpath("./range/@lufs")[0]) | |
if write( | |
filepath, | |
(a_gain, t_gain), | |
(a_peak, t_peak), | |
(a_range, t_range), | |
ref_loudness, | |
algorithm | |
): | |
log_success("SUCCESS: Written tags for <{}>".format(filepath)) | |
else: | |
log_failure("ERROR: Couldn't write replay gain tags for <{}>".format(filepath)) | |
# examine result with | |
# - (bs1770gain): "bs1770gain ~/Downloads/Back\ in\ Black -l" | |
# - (pytaglib): "pyprinttags ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3" | |
# - (mutagen): "mid3v2 ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3"" | |
# - (beet); "beet info ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Explanation of the bs1770gain command used in the run_worker function: | |
1) PRIORITY OF THE PROCESS | |
"nice -n XX" sets the desired priority to the bs1770gain process. The | |
higher the parameter, the lower the system priority | |
2) LOUDNESS CALCULATIONS | |
- For a good general guide on loudness, see http://transom.org/2015/the-audio-producers-guide-to-loudness/ | |
- For the specs of ITU-R BS.1770 see: http://www.itu.int/rec/R-REC-BS.1770/en | |
The command calculates the integrated loudness (-i/--integrated), which is a decent predictor of consistency between pieces of audio. It additionally calculates the -s/--shortterm and -m/--momentary loudness because they are not costly and may be used for other purposes. | |
Additionally, it calculates the loudness range (-r/--range), which is a measure of the variation of loudness over the course of a song. | |
3) PEAK CALCULATIONS | |
There are two possibilities to find the peak value of the audio file. -p/--samplepeak and -t/--truepeak. Both are good values but a true peak respects that peaks can lie between samples and are often higher than the peaks shown/calculated by a sample peak meter/algorithm. In doing so, the calculation time is much higher compared to simpler samplepeak calculations. On a simple album -p/--samplepeak takes about 9secs, whereas -t/--truepeak takes 30secs. For more information see https://auphonic.com/blog/2012/08/02/loudness-measurement-and-normalization-ebu-r128-calm-act/. | |
If calculating the --truepeak of an audio file, the costs of computing the --samplepeak are not noteworthy - also all other | |
options don't count too much. | |
With -t/--truepeak: | |
time bs1770gain -t ~/Downloads/XXX | |
... | |
real 0m32.861s | |
user 0m32.550s | |
sys 0m0.302s | |
time bs1770gain -ismrpt ~/Downloads/XXX | |
... | |
real 0m34.068s | |
user 0m33.761s | |
sys 0m0.302s | |
Using -p/--samplepeak: | |
time bs1770gain -p ~/Downloads/XXX | |
... | |
real 0m9.322s | |
user 0m9.048s | |
sys 0m0.270s | |
time bs1770gain -ismrp ~/Downloads/XXX | |
... | |
real 0m10.233s | |
user 0m9.968s | |
sys 0m0.262s | |
4) NORMALIZATION AND LOUDNESS VALUE | |
Usually, EBU R128 algorithm normalizes to -23.0 LUFS. This value is typically for TV and radio streams but often experienced as | |
too low for podcasts and audio listening at home. See https://auphonic.com/blog/2013/01/07/loudness-targets-mobile-audio-podcasts-radio-tv/. The average program loudness resolves around -19 LUFS, ReplayGain2 is around -18 LUFS and that is what Foobar2000 | |
uses, see http://forum.doom9.org/showpost.php?p=1701328&postcount=21 and other comments in this thread. Also see https://forum.dbpoweramp.com/showthread.php?29262-Replay-Gain-dbPoweramp-vs-Foobar&p=143105&viewfull=1#post143105. | |
By default this script goes with the foobar2000 settings by using "--norm -18.0". For example, this changes the "lu" value in the resulting XML from e.g. (-23.0 LUFS) <integrated lufs="-10.76" lu="-12.24" /> | |
to (-18.0 LUFS) <integrated lufs="-10.76" lu="-7.24" />. | |
""" | |
import os | |
import sys | |
import multiprocessing | |
import threading | |
import subprocess | |
import signal | |
import shlex | |
import time | |
import socket | |
import traceback | |
import unicodedata | |
import logging | |
import argparse | |
from time import perf_counter | |
from io import StringIO | |
import lxml.html | |
import taglib | |
import colorama | |
from colorama import init, Fore | |
init(autoreset=True) | |
########################################################### | |
# C O N F I G U R A T I O N | |
########################################################### | |
MUSIC_LIBRARY_PATH = None | |
if socket.gethostname().lower() == "r2d2": | |
MUSIC_LIBRARY_PATH = "/mnt/tank/music" | |
else: | |
MUSIC_LIBRARY_PATH = "/Volumes/tank_music" | |
# Utilize only the half of the processors | |
NUM_CPUS = int(multiprocessing.cpu_count() / 2) | |
# The linux "nice" priority number | |
WORKER_PROCESS_PRIORITY = 19 | |
# Algorithm and associated default normalizaiton LUFS (reference loudness) | |
CALC_METHODS = ( | |
("ebu", -23.0), # EBU R128 (default) | |
("atsc", -24.0), # ATSC A/85 | |
("replaygain", -18.0) # ReplayGain 2.0 | |
) | |
CALC_ALGORITHM_FULL = { | |
"ebu": "EBU R128", | |
"atsc": "ATSC A/85", | |
"replaygain": "ReplayGain 2.0" | |
} | |
# Choose the algorithm | |
CALC_METHOD = CALC_METHODS[0] | |
# Override default normalization LUFS (reference loudness) | |
FORCE_REFERENCE_LOUDNESS = -18.0 | |
# The maximum time in seconds that the calculation process is allowed | |
# to take before terminating it. It sometimes happens that | |
# bs1770gain hangs at a specific album. 300 seconds (5min) should be | |
# good for most audio files | |
MAX_CALCULATION_DURATION = 300 | |
# Taken from Wikipedia | |
AUDIO_EXTS = ( | |
".3gp", | |
".aa", | |
".aac", | |
".aax", | |
".act", | |
".aiff", | |
".aif", | |
".amr", | |
".ape", | |
".au", | |
".awb", | |
".dct", | |
".dss", | |
".dvf", | |
".flac", | |
".gsm", | |
".iklax", | |
".ivs", | |
".m4a", | |
".m4b", | |
".m4p", | |
".mmf", | |
".mp3", | |
".mpc", | |
".msv", | |
".ogg", | |
".oga", | |
".opus", | |
".ra", | |
".rm", | |
".raw", | |
".sln", | |
".vox", | |
".wav", | |
".wma", | |
".wv", | |
".webm" | |
) | |
RG_TAGS = ( | |
'REPLAYGAIN_ALBUM_GAIN', | |
'REPLAYGAIN_ALBUM_PEAK', | |
'REPLAYGAIN_ALBUM_RANGE', | |
'REPLAYGAIN_TRACK_GAIN', | |
'REPLAYGAIN_TRACK_PEAK', | |
'REPLAYGAIN_TRACK_RANGE', | |
'REPLAYGAIN_REFERENCE_LOUDNESS', | |
#'QUODLIBET::REPLAYGAIN_REFERENCE_LOUDNESS', | |
'REPLAYGAIN_ALGORITHM' | |
) | |
RG_RESULT_FILE = "RG-Results.xml" | |
# Save the current processed folder for resuming ... | |
PROCESSED_STATE_FILE = 'RG-Processed' | |
# Exclude folders from processing | |
EXCLUDES = ( | |
os.path.join(MUSIC_LIBRARY_PATH, "Dessou's Club"), | |
) | |
# bs1770gain uses "LU" in its replaygain tags, | |
# whereas replaygain2 specification proposes "dB". | |
# Usually they can be converted 1:1 (1 LU == 1 dB) | |
# see: https://sourceforge.net/p/idjc/bugs/79/ | |
# MPD supports "LU" so we go with them | |
LOUDNESS_UNIT = "LU" # "dB" | |
# The root path from which all relative paths will be calculated | |
# Will be set in the __main__ function | |
ROOT_PATH = None | |
########################################################### | |
# L O G G I N G | |
########################################################### | |
# A new success level and colored logger | |
# see: https://gist.github.com/hit9/5635505 | |
# see: https://gist.github.com/kergoth/813057 | |
# between WARNING and INFO | |
logging.SUCCESS = 25 | |
logging.addLevelName(logging.SUCCESS, 'SUCCESS') | |
class ColorizingStreamHandler(logging.StreamHandler): | |
color_map = { | |
logging.DEBUG: Fore.WHITE, | |
logging.INFO: Fore.BLUE, | |
logging.WARNING: Fore.YELLOW, | |
logging.ERROR: Fore.RED, | |
logging.CRITICAL: Fore.RED, | |
logging.SUCCESS: Fore.GREEN | |
} | |
def __init__(self, stream, color_map=None): | |
logging.StreamHandler.__init__(self, | |
colorama.AnsiToWin32(stream).stream) | |
if color_map is not None: | |
self.color_map = color_map | |
@property | |
def is_tty(self): | |
isatty = getattr(self.stream, 'isatty', None) | |
return isatty and isatty() | |
def format(self, record): | |
message = logging.StreamHandler.format(self, record) | |
if self.is_tty: | |
# Don't colorize a traceback | |
parts = message.split('\n', 1) | |
parts[0] = self.colorize(parts[0], record) | |
message = '\n'.join(parts) | |
return message | |
def colorize(self, message, record): | |
try: | |
return (self.color_map[record.levelno] + message + | |
colorama.Style.RESET_ALL) | |
except KeyError: | |
return message | |
logger = logging.getLogger("Core") | |
t_logger = logging.getLogger("Tags") | |
c_logger = logging.getLogger("Calc") | |
def setup_logging(): | |
loggers = (logger, t_logger, c_logger) | |
handler_1 = ColorizingStreamHandler(sys.stdout) | |
handler_1.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) | |
handler_2 = logging.FileHandler("replaygain.log") | |
handler_2.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) | |
for l in loggers: | |
l.addHandler(handler_1) | |
l.addHandler(handler_2) | |
l.setLevel(logging.DEBUG) | |
# add a success method to the logger instance | |
setattr(l, 'success', lambda message, *args: l._log(logging.SUCCESS, message, args)) | |
setup_logging() | |
########################################################### | |
# E X E P T I O N S | |
########################################################### | |
class WorkerException(Exception): | |
def __init__(self, message, path): | |
self.path = path | |
self.message = message | |
# Error with pickling exception classes | |
# see: http://stackoverflow.com/a/28335286 | |
super().__init__(message, path) | |
class TaggingException(WorkerException): | |
pass | |
class CalculatingException(WorkerException): | |
pass | |
########################################################### | |
# T A G G I N G | |
# | |
# examine result with | |
# - (bs1770gain): "bs1770gain ~/Downloads/Back\ in\ Black -l" | |
# - (pytaglib): "pyprinttags ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3" | |
# - (mutagen): "mid3v2 ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3"" | |
# - (beet); "beet info ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3" | |
########################################################### | |
class ReplayGainData(object): | |
def __init__(self, gain, peak, range_): | |
self._gain = gain | |
self._peak = peak | |
self._range = range_ | |
@property | |
def gain(self): | |
return "{:.2f} {}".format(self._gain, LOUDNESS_UNIT) | |
@property | |
def peak(self): | |
return "{:.6f}".format(self._peak) | |
@property | |
def range(self): | |
return "{:.2f} {}".format(self._range, LOUDNESS_UNIT) | |
class ReplayGainResult(object): | |
def __init__(self, path, album, track, ref_loudness=None, algorithm=None): | |
self.path = path | |
self.album = album | |
self.track = track | |
self._ref_loudness = ref_loudness | |
self._algorithm = algorithm | |
@property | |
def ref_loudness(self): | |
ref_loudness = self._ref_loudness | |
if not ref_loudness: | |
algorithm, ref_loudness = CALC_METHOD | |
if isinstance(FORCE_REFERENCE_LOUDNESS, float): | |
ref_loudness = FORCE_REFERENCE_LOUDNESS | |
return "{:.2f}".format(ref_loudness) | |
@property | |
def algorithm(self): | |
algorithm = self._algorithm | |
if not algorithm: | |
algorithm, ref_loundess = CALC_METHOD | |
# if algorithm == "ebu": | |
# algorithm = "EBU R128" | |
# elif algorithm == "atsc": | |
# algorithm = "ATSC A/85" | |
# elif algorithm == "replaygain": | |
# algorithm = "ReplayGain 2.0" | |
algorithm = CALC_ALGORITHM_FULL[algorithm] | |
return "ITU-R BS.1770 ({})".format(algorithm) | |
@property | |
def id3_tags(self): | |
mapping = {} | |
for key, value in zip(RG_TAGS, ( | |
self.album.gain, | |
self.album.peak, | |
self.album.range, | |
self.track.gain, | |
self.track.peak, | |
self.track.range, | |
self.ref_loudness, | |
self.algorithm | |
)): | |
mapping[key] = value | |
return mapping | |
def get_replaygain_xml_data(path, ref_loudness=None, algorithm=None): | |
xml_path = os.path.join(path, RG_RESULT_FILE) | |
try: | |
# Unicode normalization between OSX and Linux (Ubuntu) | |
# There is a special case if bs1770gain is executed on a | |
# OSX box and both filesystems, on OSX and Linux, are UTF-8 | |
# and, therefore, unicode. Linux and Windows use NFC whereas | |
# OSX uses NFD unicode. | |
# see: https://en.wikipedia.org/wiki/Unicode_equivalence | |
# see: http://nedbatchelder.com/blog/201106/filenames_with_accents.html | |
# LXML does not handle this special case very well and does | |
# not autoswitch between NFC and NFD. This is especially important | |
# for file and directory names. | |
# | |
# Sidenote: If OSX has problems to display the correct filenames of | |
# a SAMBA share, check if vfs_fruit is enabled and configured properly. | |
# see: https://www.mankier.com/8/vfs_fruit | |
# see: https://lists.samba.org/archive/samba/2014-September/184761.html | |
# see: https://lists.samba.org/archive/samba/2014-December/187568.html | |
# | |
# Here are some possibilites | |
# 1) Use rsync to transfer the file from OSX over ssh to the | |
# Linux box | |
# rsync -a --iconv=utf-8-mac,utf-8 localdir/ server:remotedir/ | |
# see: http://serverfault.com/a/427200 | |
# 2) Convert the NFD unicode file on the OSX box to NFC | |
# iconv -f UTF-8-MAC -t UTF-8 RG-Results.xml > RG-Results.conv.xml | |
# see: http://stackoverflow.com/q/14682829 | |
# uconv -f utf8 -t utf8 -x nfc RG-Results.xml -o RG-Results.conv.xml | |
# see: https://www.win.tue.nl/~aeb/linux/uc/nfc_vs_nfd.html | |
# 3) Convert all NFD filenames on the Linux box to NFC | |
# convmv -f utf8 -t utf8 --nfc --replace --nosmart (--notest) -r uploads/ | |
# see: https://gist.github.com/dessibelle/4685735 | |
# 4) Use the python solution below via unicodedata.normalize("NFC", value) | |
# and pass a StringIO to lxml.parse | |
# There are special characters like "&" in the bs1770gain result file that | |
# are not escaped to conform XML standards. Parse the file with a not so | |
# strict html parser. | |
# see: http://stackoverflow.com/a/26267496 | |
with open(xml_path, "r", encoding="utf-8") as f: | |
if sys.platform.startswith('darwin'): | |
data = unicodedata.normalize("NFD", f.read()) # f.read() | |
else: | |
data = unicodedata.normalize("NFC", f.read()) | |
xml = lxml.html.parse(StringIO(data)) | |
#xml = lxml.html.parse(xml_path) | |
except IOError as e: | |
raise TaggingException('<{}> could not be found'.format(RG_RESULT_FILE), path) | |
# How to link the values of the result file to replaygain values? | |
# see: https://forums.mp3tag.de/lofiversion/index.php?t20881.htm | |
results = [] | |
def get_value(node, value): | |
try: | |
return node.xpath(value)[0] | |
except IndexError as e: | |
raise TaggingException("<{}> seems to be malformed and missing the <{}> tag/attribute".format(RG_RESULT_FILE, value), path) | |
return None | |
a_data = ReplayGainData( | |
float(get_value(xml, "//album/summary/integrated/@lu")), | |
float(get_value(xml, "//album/summary/true-peak/@factor")), | |
float(get_value(xml, "//album/summary/range/@lufs")) | |
) | |
for track in xml.xpath("//album/track"): | |
t_path = os.path.join(path, get_value(track, ".//@file")) | |
t_data = ReplayGainData( | |
float(get_value(track, "./integrated/@lu")), | |
float(get_value(track, "./true-peak/@factor")), | |
float(get_value(track, "./range/@lufs")) | |
) | |
results.append(ReplayGainResult(t_path, a_data, t_data, ref_loudness, algorithm)) | |
return results | |
def clear_replaygain_id3(): | |
pass | |
def write_replaygain_id3(data): | |
if not os.path.exists(data.path): | |
raise TaggingException("Audio file does not exist in folder", data.path) | |
# sometimes bs1770gain also includes non audio files in its resulting xml | |
# with nonsense data, of course | |
if not os.path.splitext(data.path)[1] in AUDIO_EXTS: | |
t_logger.warning("File <{}> has not a valid audio file extension. Skip tagging".format(data.path)) | |
return | |
f = None | |
try: | |
f = taglib.File(data.path) | |
# @todo: there are strange problems with m4a files and | |
# tagging. It seems that taglib does not write tags for | |
# m4a files. Maybe issue a github ticket? | |
# remove unsupported tag names (like from itunes ...) | |
if f.unsupported: | |
t_logger.debug("Removing unsupported tags <{}>".format(f.unsupported)) | |
f.removeUnsupportedProperties(f.unsupported) | |
k = list(map(str.upper, f.tags.keys())) | |
for i, v in data.id3_tags.items(): | |
# first try to delete old data in lowercase | |
if i in k: | |
try: | |
# delete possible lowercase duplicate tag, | |
# the uppercase one will be overwritten anyways ... | |
del f.tags[i.lower()] | |
except KeyError as e: | |
pass | |
# then save new tags | |
f.tags[i] = v | |
f.save() | |
except Exception as e: | |
raise TaggingException("Failed saving tags of audio file", data.path).with_traceback(e.__traceback__) | |
finally: | |
# close the file anyways | |
if f: | |
f.close() | |
def get_audio_files(folder): | |
for f in os.listdir(folder): | |
if os.path.isfile(f) and not f.startswith(".") and os.path.splitext(f)[1] in AUDIO_EXTS: | |
yield f | |
def calc_vars(): | |
algorithm, ref_loudness = CALC_METHOD | |
if isinstance(FORCE_REFERENCE_LOUDNESS, float): | |
ref_loudness = FORCE_REFERENCE_LOUDNESS | |
return (ref_loudness, algorithm) | |
def start_tagging(folder, ref_loudness, algorithm): | |
t_logger.debug("Tagging audio files of folder <{}>".format(folder)) | |
start = perf_counter() | |
results = get_replaygain_xml_data(folder, ref_loudness, algorithm) | |
if results: | |
# Assume that we have the same amount of audio files | |
# In the folder and in the result XML | |
audio_files = list(get_audio_files(folder)) | |
if not len(audio_files) == len(results): | |
raise TaggingException("The amount of audio files <{}> does not match the amout of files in the XML file <{}>".format(len(audio_files), len(results)), folder) | |
for result in results: | |
write_replaygain_id3(result) | |
t_logger.debug("Finished tagging <{}> files in <{:.2f}> seconds, path <{}>".format(len(results), perf_counter() - start, folder)) | |
########################################################### | |
# C A L C U L A T I N G | |
########################################################### | |
def start_calculating(folder, ref_loudness, algorithm): | |
c_logger.debug("Calculating replay gain for folder <{}>".format(folder)) | |
start = perf_counter() | |
# measure all -i/--integrated, -s/--shortterm, -m/--momentary, -r/--range, -p/--samplepeak, and -t/--truepeak | |
command = 'nice -n {} bs1770gain "{}" -ismrpt --{} --norm {} --xml -f "{}"'.format( | |
WORKER_PROCESS_PRIORITY, | |
folder, | |
algorithm, | |
ref_loudness, | |
os.path.join(folder, RG_RESULT_FILE) | |
) | |
try: | |
# Run until maxium calculation time is reached | |
process = subprocess.run( | |
shlex.split(command), | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
# if the process takes longer than this timeout (secs) | |
# raise a TimeoutExpired exception | |
timeout=MAX_CALCULATION_DURATION, | |
# if return code is not zero raise a CalledProcessError exception | |
check=True | |
) | |
# process.check_returncode() | |
except subprocess.CalledProcessError as e: | |
raise CalculatingException("bs1770gain failed calculating replay gain data", folder) | |
except subprocess.TimeoutExpired as e: | |
raise CalculatingException("bs1770gain was terminated because it took longer than the maximum execution time", folder) | |
c_logger.debug("Finished calculating in {:.2f} seconds, path <{}>".format(perf_counter() - start, folder)) | |
########################################################### | |
# M U L T I P R O C E S S I N G | |
########################################################### | |
def init_worker(): | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
def run_worker(folder, calculating, tagging): | |
proc = multiprocessing.current_process() | |
start = perf_counter() | |
# create a subprocess and run it | |
logger.debug("Running worker, folder <{}>, name <{}>, pid <{}>".format(folder, proc.name, proc.pid)) | |
# get normalization vars | |
ref_loudness, algorithm = calc_vars() | |
#if is_tagged(): | |
# return | |
if calculating: | |
logger.debug("Starting to calculate replay gain for folder <{}> ...".format(folder)) | |
start_calculating(folder, ref_loudness, algorithm) | |
if tagging: | |
logger.debug("Starting to tag folder <{}> ...".format(folder)) | |
start_tagging(folder, ref_loudness, algorithm) | |
logger.debug("Finished worker, folder <{}>, name <{}>, pid <{}>, secs <{:.2f}>".format(folder, proc.name, proc.pid, perf_counter() - start)) | |
return (folder, proc.pid) | |
def log_path(postfix): | |
fp = os.path.join(ROOT_PATH, PROCESSED_STATE_FILE) | |
return "{}.{}.txt".format(fp, postfix) | |
def get_folders(path): | |
# find all file extension within a directory | |
# see: http://stackoverflow.com/a/4998326/1230358 | |
# find /mnt/tank/music/ -type f -name '*.*' | sed 's|.*\.||' | sort -u | |
#audio_exts = ('.aif', '.flac', '.m4a', '.mp3', '.mpeg', '.ogg', '.wav') | |
# only the folders that contain the above media files | |
# see: http://stackoverflow.com/a/9997442/1230358 | |
folders = [] | |
for folder, subfolders, files in os.walk(path): | |
# Skip special folders | |
if folder.startswith(EXCLUDES): | |
continue | |
# Skip root folder | |
if path == MUSIC_LIBRARY_PATH and folder == ROOT_PATH: | |
continue | |
# exit if folder was allready processes | |
# if os.path.isfile(os.path.join(folder, "RG-Results.xml")): | |
# print("Skipping folder <{}>".format(folder)) | |
# continue | |
# else check if we have valid audio files in directory | |
for file_ in files: | |
if os.path.splitext(file_)[-1].lower() in AUDIO_EXTS: | |
folders.append(os.path.relpath(folder, ROOT_PATH)) | |
return set(folders) | |
def get_folders_processed(path): | |
def c(x): | |
for y in x: | |
if y.strip(): | |
yield y.split("☢☢")[1].strip() | |
try: | |
with open(log_path("success"), 'r') as f: | |
return set(c(f.readlines())) | |
except IOError as e: | |
logger.warning("Could not gather already processed file for folder <{}>".format(path)) | |
#logger.exception(e) | |
return set([]) | |
def is_tagged(path, ref_loudness, algorithm): | |
f = taglib.File(path) | |
valid = False | |
try: | |
if float(f.tags[RG_TAGS[6]][0]) == float(ref_loudness) and f.tags[RG_TAGS[7]][0] == algorithm: | |
valid = True | |
except KeyError: | |
pass | |
finally: | |
f.close() | |
return valid | |
def validate(folders): | |
logger.info("Start validating") | |
start = perf_counter() | |
ref_loudness, algorithm = calc_vars() | |
# get the full algorithm name as in tags | |
algorithm = "ITU-R BS.1770 ({})".format(CALC_ALGORITHM_FULL[algorithm]) | |
for folder in folders: | |
folder_abs = os.path.join(ROOT_PATH, folder) | |
if os.path.exists(os.path.join(folder_abs, RG_RESULT_FILE)): | |
for folder_, subfolders_, files_ in os.walk(folder_abs): | |
for file_ in files_: | |
if os.path.splitext(file_)[-1].lower() in AUDIO_EXTS: | |
f = os.path.join(folder_, file_) | |
if not is_tagged(f, ref_loudness, algorithm): | |
logger.error("Audio file <{}> has no valid replay gain tags".format(f)) | |
else: | |
logger.error("No <{}> file was found in <{}>".format(RG_RESULT_FILE, folder)) | |
logger.info("Finished validating, secs <{:.2f}>".format(perf_counter() - start)) | |
# http://stackoverflow.com/questions/21159103/what-kind-of-problems-if-any-would-there-be-combining-asyncio-with-multiproces | |
# http://chriskiehl.com/article/parallelism-in-one-line/ | |
# http://stackoverflow.com/a/11623718 | |
def main(folders, calculating=False, tagging=False): | |
logger.debug("Initializing <{}> workers".format(NUM_CPUS)) | |
pool = multiprocessing.Pool(NUM_CPUS, init_worker) | |
# calculating(True/False):tagging(True/False) | |
mode = "{}:{}".format(calculating, tagging) | |
# These callbacks run in the main process not in the worker processes | |
# but in their own threads. So care for thread safety in them! | |
success_lock = threading.Lock() | |
def worker_on_success(result): | |
folder, process = result | |
logger.success('Worker <{}> finished!'.format(folder)) | |
# ensure unique write access | |
success_lock.acquire() | |
with open(log_path("success"), "a+") as f: | |
f.write("{}☢☢{}\n".format(mode, os.path.relpath(folder, ROOT_PATH))) | |
success_lock.release() | |
error_lock = threading.Lock() | |
def worker_on_error(e): | |
# todo: strange things going on here with the code | |
# maybe wrap that in functools.partial | |
if isinstance(e, TaggingException): | |
logger.error("Tagging failed for <{}> with message <{}>".format(e.path, e.message)) | |
elif isinstance(e, CalculatingException): | |
logger.error("Calculating replay gain failed for <{}> with message <{}>".format(e.path, e.message)) | |
else: | |
logger.error("An unknown exception happend") | |
logger.exception(e) | |
# ensure unique write access | |
error_lock.acquire() | |
with open(log_path("error"), "a+") as f: | |
f.write("{}☢☢{}\n".format(mode, os.path.relpath(e.path, ROOT_PATH))) | |
error_lock.release() | |
workers = [] | |
for folder in folders: | |
# returns a future | |
workers.append(pool.apply_async( | |
run_worker, | |
( | |
os.path.normpath(os.path.join(ROOT_PATH, folder)), | |
calculating, | |
tagging | |
), | |
callback=worker_on_success, | |
error_callback=worker_on_error | |
)) | |
# Keyboard interrupts and mutliprocessing | |
# see: http://noswap.com/blog/python-multiprocessing-keyboardinterrupt | |
# http://stackoverflow.com/questions/28674518/multiprocessing-pool-wait-for-all-results-but-process-individual-results-imme | |
try: | |
for worker in workers: | |
worker.wait() | |
# avoid zombies, see: http://stackoverflow.com/a/35372311 | |
# while True: | |
# time.sleep(1) | |
# if not multiprocessing.active_children(): | |
# log_info("No active workers left ...") | |
# break | |
except KeyboardInterrupt: | |
logger.info("Caught KeyboardInterrupt, terminating workers ...") | |
pool.terminate() | |
pool.join() | |
else: | |
logger.info("Finished. Quitting normally ...") | |
pool.close() | |
pool.join() | |
# killall leftover zombie processes | |
os.system('pkill bs1770gain') | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Process replaygain on a folder and its audio data') | |
_ = parser.add_argument('path', metavar='in-file', help='The folder to process') | |
#_ = parser.add_argument('-r', '--resume', action='store_false', help='Resume without processing already processed') | |
_ = parser.add_argument('-t', '--tag', action='store_true', help='Just process id3 tagging of the files within the folder') | |
_ = parser.add_argument('-c', '--calc', action='store_true', help='Just calculate the replay gain data of the files within the folder') | |
_ = parser.add_argument('-f', '--force', action='store_true', help='Recalculate or tag already processed folders/files') | |
_ = parser.add_argument('-T', '--timeout', default=False, type=int, help='The duration (secs) the calculatioin process is allowed to take') | |
_ = parser.add_argument('-v', '--validate', action='store_true', help='Checks if all audio files within the folder have valid replay gain tags') | |
args = parser.parse_args() | |
# Set worker timeout | |
if args.timeout: | |
MAX_CALCULATION_DURATION = args.timeout | |
logger.info("Setting worker timeout to <{}> seconds".format(args.timeout)) | |
logger.info("Lets go ...") | |
path = MUSIC_LIBRARY_PATH | |
if args.path: | |
path = os.path.abspath(args.path) | |
ROOT_PATH = path | |
if not os.path.exists(path): | |
logger.error("Root path <{}> does not exist".format(path)) | |
sys.exit(1) | |
folders = None | |
# resume the last state | |
if not args.force: | |
a_f = get_folders(path) | |
p_f = get_folders_processed(path) | |
folders = sorted(a_f - p_f, key=str.lower) | |
# totally new run | |
else: | |
folders = get_folders(path) | |
if not folders: | |
logger.info("Nothing left to process") | |
# just validate the audio files within the folder | |
if args.validate: | |
validate(folders) | |
# calculate or tag or both replay gain data/tags | |
else: | |
calc = args.calc | |
tag = args.tag | |
if not tag and not calc: | |
# do all of them | |
calc = True | |
tag = True | |
main(folders, calc, tag) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# see: http://stackoverflow.com/a/14879370 | |
import os | |
import sys | |
import socket | |
import traceback | |
import lxml.etree as et | |
from colorama import init, Fore, Back | |
init(autoreset=True) | |
MUSIC_LIBRARY_PATH = None | |
if socket.gethostname() == "r2d2": | |
MUSIC_LIBRARY_PATH = "/mnt/tank/music" | |
else: | |
MUSIC_LIBRARY_PATH = "/Volumes/tank_music" | |
def log_success(msg): | |
print(Fore.GREEN + msg) | |
def log_info(msg): | |
print(Fore.BLUE + msg) | |
def log_failure(msg): | |
print(Fore.RED + msg) | |
class hashabledict(dict): | |
def __hash__(self): | |
return hash(tuple(sorted(self.items()))) | |
class XMLCombiner(object): | |
def __init__(self, filenames): | |
assert len(filenames) > 0, 'No filenames!' | |
# XMLParser is too strict and creates troubles with unescaped ampersands aso. | |
# Use HTMLParser instead | |
# see: http://stackoverflow.com/a/26267496 | |
# also strip all whitespace for pretty printing afterwords | |
parser = et.HTMLParser(remove_blank_text=True) | |
# save all the roots, in order, to be processed later | |
self.roots = [et.parse(f, parser).getroot() for f in filenames] | |
def combine(self): | |
for r in self.roots[1:]: | |
# combine each element with the first one, and update that | |
self.combine_element(self.roots[0], r) | |
# return the string representation | |
return et.ElementTree(self.roots[0]) | |
def combine_element(self, one, other): | |
""" | |
This function recursively updates either the text or the children | |
of an element if another element is found in `one`, or adds it | |
from `other` if not found. | |
""" | |
# Create a mapping from tag name to element, as that's what we are fltering with | |
mapping = {(el.tag, hashabledict(el.attrib)): el for el in one} | |
for el in other: | |
# skip old integrated | |
if el.tag == 'integrated': | |
continue | |
if len(el) == 0: | |
# Not nested | |
try: | |
# Update the text | |
mapping[(el.tag, hashabledict(el.attrib))].text = el.text | |
except KeyError: | |
# An element with this name is not in the mapping | |
mapping[(el.tag, hashabledict(el.attrib))] = el | |
# Add it | |
one.append(el) | |
else: | |
try: | |
# Recursively process the element, and update it in the same way | |
self.combine_element(mapping[(el.tag, hashabledict(el.attrib))], el) | |
except KeyError: | |
# Not in the mapping | |
mapping[(el.tag, hashabledict(el.attrib))] = el | |
# Just add it | |
one.append(el) | |
if __name__ == '__main__': | |
# rg_file = sys.argv[1] | |
# if os.path.exists(rg_file): | |
# r = XMLCombiner((rg_file, "{}.orig".format(rg_file))).combine() | |
# pprint.pprint(et.tostring(r.getroot(), pretty_print=True)) | |
try: | |
with open(os.path.join(MUSIC_LIBRARY_PATH, "RG-Processed.txt"), 'r') as f: | |
processed = set(map(str.strip, f.readlines())) | |
except: | |
processed = set([]) | |
for folder in processed: | |
info_file = os.path.join(MUSIC_LIBRARY_PATH, folder, "RG-Results.xml") | |
if os.path.exists(info_file): | |
try: | |
r = XMLCombiner((info_file, "{}.orig".format(info_file))).combine() | |
with open("{}.comb".format(info_file), "bw+") as f: | |
f.write(et.tostring(r.getroot(), pretty_print=True)) | |
print("SUCCESS: Combined info file <{}>".format(info_file)) | |
except Exception as e: | |
log_failure("FAILURE: Combined info file <{}>".format(info_file)) | |
traceback.print_exc(file=sys.stdout) | |
with open(os.path.join(MUSIC_LIBRARY_PATH, "RG_Failure_1.txt"), 'a+') as f: | |
f.write("{}\n".format(os.path.relpath(os.path.dirname(info_file), MUSIC_LIBRARY_PATH))) |
Author
hetsch
commented
Jul 3, 2016
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment