Last active
August 29, 2015 14:00
-
-
Save notwa/11331477 to your computer and use it in GitHub Desktop.
personalized one-way music sync
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess as sp | |
import os | |
def ogg(fin, fout): | |
p1 = sp.Popen(["ffmpeg", "-loglevel", "error", "-i", fin, "-f", "flac", "-"], stdout=sp.PIPE) | |
p2 = sp.Popen(["oggenc", "-Q", "-q", "5", "-", "-o", fout], stdin=p1.stdout, stdout=sp.PIPE) | |
p1.stdout.close() | |
p2.communicate() | |
ret = p1.poll() or p2.poll() | |
return ret |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import MutableMapping | |
import mutagenx | |
import mutagenx.id3 | |
from mutagenx.easyid3 import EasyID3 | |
def popms(id3): | |
for k, v in id3.items(): | |
if k.startswith('POPM'): | |
yield k, v | |
def byte2rating(b): | |
if b >= 224: return 5 | |
if b >= 160: return 4 | |
if b >= 96: return 3 | |
if b >= 32: return 2 | |
if b >= 1: return 1 | |
return 0 | |
def rating2byte(r): | |
if r == 5: return 256 | |
if r == 4: return 192 | |
if r == 3: return 128 | |
if r == 2: return 64 | |
if r == 1: return 1 | |
return 0 | |
def rating_get(id3, key): | |
if 'TXXX:RATING' in id3: | |
rating = id3['TXXX:RATING'] | |
return list(rating.text) | |
else: | |
try: | |
_, popm = next(popms(id3)) | |
except StopIteration: | |
return [] | |
else: | |
return [str(byte2rating(popm.rating))] | |
def _canconv(r): | |
try: | |
ir = int(r) | |
if ir != str(ir): | |
return False | |
return ir >= 1 and ir <= 5 | |
except (ValueError, TypeError): | |
return False | |
def rating_set(id3, key, val): | |
rating_delete(id3, key) | |
if _canconv(val): | |
popm = mutagenx.id3.POPM() | |
popm.email = "Windows Media Player 9 Series" | |
popm.count = 0 | |
popm.rating = rating2byte(int(val)) | |
id3.add(popm) | |
else: | |
if 'TXXX:RATING' in id3: | |
del(id3['TXXX:RATING']) | |
id3.add(mutagenx.id3.TXXX(encoding=3, desc='RATING', text=str(val))) | |
def rating_delete(id3, key): | |
for k, v in popms(id3): | |
del(id3[k]) | |
if 'TXXX:RATING' in id3: | |
del(id3['TXXX:RATING']) | |
replaygain_tags = ('replaygain_album_gain', 'replaygain_album_peak', \ | |
'replaygain_track_gain', 'replaygain_track_peak') | |
for tag in replaygain_tags: | |
EasyID3.RegisterTXXXKey(tag, tag) | |
extra_tags = ('sync', 'totaltracks', 'totaldiscs') | |
for tag in extra_tags: | |
EasyID3.RegisterTXXXKey(tag, tag.upper()) | |
EasyID3.RegisterTextKey('albumartist', 'TPE2') | |
EasyID3.RegisterKey('rating', rating_get, rating_set, rating_delete) | |
class SyncFile(MutableMapping): | |
def __init__(self, path): | |
self.md = mutagenx.File(path, easy=True) | |
self.path = path | |
self.seen = False | |
def __getitem__(self, key): | |
if self.md == None: | |
print(self.path) | |
d = self.md[key] | |
try: | |
return d[0] | |
except IndexError: | |
raise KeyError(key) | |
def __setitem__(self, key, value): | |
#if type(value) != str: | |
#raise ValueError | |
if type(value) is type(None): | |
raise ValueError | |
self.md[key] = [value] | |
def __delitem__(self, key): | |
del(self.md[key]) | |
def __iter__(self): | |
for k in self.md: | |
try: | |
self.__getitem__(k) | |
except KeyError: | |
pass | |
else: | |
yield k | |
def __len__(self): | |
return len([k for k in self.__iter__()]) | |
def save(self): | |
return self.md.save() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
import os, os.path | |
import sys | |
from shutil import copy2 | |
from tempfile import mkstemp | |
from zlib import crc32 | |
import mutaext | |
import convert | |
from mutaext import SyncFile | |
goodexts = ('.mp3', '.flac', '.ogg') | |
matchtags = ['artist', 'album', 'title', 'tracknumber', 'discnumber'] | |
alltags = [ | |
'albumartist', 'composer', 'comment', | |
'genre', 'date', | |
] | |
alltags.extend(mutaext.replaygain_tags) | |
alltags.extend(mutaext.extra_tags) | |
alltags.extend(matchtags) | |
lament = lambda *args, **kwargs: print(*args, file=sys.stderr, **kwargs) | |
walkfiles = lambda w: (os.path.join(r, f) for r, _, fs in w for f in fs) | |
extof = lambda p: os.path.splitext(p)[1].lower() | |
filterext = lambda ps, es: (p for p in ps if extof(p) in es) | |
def shouldsync(md): | |
rating = md.get('rating', '') | |
sync = md.get('sync', '') | |
if rating.isnumeric(): | |
rating = int(rating) | |
if sync: | |
sync = sync.lower() | |
return sync == 'yes' or type(rating) == int and rating >= 3 and sync != 'no' and sync != 'space' | |
import re | |
re_digits = re.compile(r'\d+') | |
def tonumber(crap): | |
if crap is None or len(crap) == 0: | |
return 0 | |
nums = re_digits.findall(crap) | |
if len(nums) == 0: | |
return 0 | |
return int(nums[0]) | |
def fixmetadata(md): | |
md['artist'] = md.get('artist', "Unknown Artist") | |
md['album'] = md.get('album', "Unknown Album") | |
md['discnumber'] = str(tonumber(md.get('discnumber', '0'))) | |
md['tracknumber'] = str(tonumber(md.get('tracknumber', '0'))) | |
if 'title' not in md: | |
fn = os.path.basename(md.path) | |
fn = os.path.splitext(fn)[0] | |
md['title'] = str(fn) | |
def findmatching(haystack, needle): | |
#matchme = [needle[t].lower() for t in matchtags] | |
#ismatch = lambda hay: [hay[t].lower() for t in matchtags] == matchme | |
matchme = [needle[t] for t in matchtags] | |
ismatch = lambda hay: [hay[t] for t in matchtags] == matchme | |
for match in (hay for hay in haystack if ismatch(hay)): | |
if match.seen: | |
# TODO: check other tags too? | |
lament("Duplicate") | |
return None | |
match.seen = needle.path | |
return match | |
def updatemetadata(mdold, mdnew): | |
modified = False | |
for tag in alltags: | |
if tag in mdnew: | |
if tag not in mdold or mdnew[tag] != mdold[tag]: | |
mdold[tag] = mdnew[tag] | |
modified = True | |
elif tag in mdold: | |
del mdold[tag] | |
modified = True | |
return modified | |
def makefilename(md): | |
title = md['title'] | |
artist = md['artist'] | |
album = md['album'] | |
track = md['tracknumber'] | |
disc = md['discnumber'] | |
fn = "%(disc)s-%(track)s - %(artist)s - %(album)s - %(title)s" % locals() | |
# FAT is a pain to deal with so just use nondescript filenames | |
crc = crc32(fn.encode('utf-8')) & 0xFFFFFFFF | |
fn = '{:08X}.ogg'.format(crc) | |
return fn | |
def run(args): | |
if not len(args) in (2, 3): | |
lament("I need a path or two!") | |
return 1 | |
inonly = len(args) == 2 | |
tosync = [] | |
indir = args[1] | |
paths = lambda dir: filterext(walkfiles(os.walk(dir)), goodexts) | |
for p in paths(indir): | |
md = SyncFile(p) | |
if shouldsync(md): | |
if inonly: | |
print(p) | |
else: | |
fixmetadata(md) | |
tosync.append(md) | |
if inonly: | |
return 0 | |
lament("Matching tags...") | |
outdir = args[2] | |
for p in paths(outdir): | |
md = SyncFile(p) | |
fixmetadata(md) | |
match = findmatching(tosync, md) | |
if match == None: | |
print("DEL", p) | |
print('was', md['title'], 'by', md['artist']) | |
os.remove(p) | |
elif updatemetadata(md, match): | |
print("UPD", p) | |
md.save() | |
lament("Syncing files...") | |
for md in tosync: | |
fn = makefilename(md) | |
fout = os.path.join(outdir, fn) | |
if md.seen: | |
_from = md.seen | |
_to = fout | |
if _from != _to: | |
print("MOV", _from) | |
os.rename(_from, _to) | |
continue | |
print("ADD", md.path) | |
_, ftemp = mkstemp() | |
try: | |
convert.ogg(md.path, ftemp) | |
mdnew = SyncFile(ftemp) | |
for tag in alltags: | |
if tag in md: | |
mdnew[tag] = md[tag] | |
fixmetadata(mdnew) # redundant? | |
mdnew.save() | |
copy2(ftemp, fout) | |
finally: | |
os.remove(ftemp) | |
return 0 | |
if __name__ == '__main__': | |
ret = 0 | |
try: | |
ret = run(sys.argv) | |
except KeyboardInterrupt: | |
sys.exit(1) | |
sys.exit(ret) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment