Last active
May 4, 2022 08:19
-
-
Save dmorrison42/b815991e36ea652e37df9669ecf9e6d5 to your computer and use it in GitHub Desktop.
Podcast Splitting Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Dependencies: | |
# ffmpeg: https://www.ffmpeg.org/download.html | |
# fpcalc: https://acoustid.org/chromaprint | |
from datetime import datetime | |
import os | |
import os.path | |
import json | |
import math | |
import shutil | |
import sqlite3 | |
from subprocess import run | |
from statistics import mean | |
db_path = 'fingerprints.db' | |
in_dir = 'C:\\Users\\dan\\Documents\\gPodder\\Downloads' | |
out_dir = 'Output' | |
temp_dir = 'Temp' | |
min_length = 300 | |
db = None | |
cursor = None | |
def clear_db(): | |
safe_remove(db_path) | |
def init_db(): | |
global db | |
global cursor | |
db = sqlite3.connect(db_path) | |
cursor = db.cursor() | |
cursor.execute('''CREATE TABLE IF NOT EXISTS fingerprints | |
(name TEXT NOT NULL UNIQUE, fingerprint TEXT NOT NULL, fuzzy TEXT, match_id INTEGER)''') | |
cursor.execute('''CREATE TABLE IF NOT EXISTS matches | |
(fingerprint TEXT)''') | |
db.commit() | |
def hex_encode_fingerprint(fingerprint): | |
return ''.join([hex(digit)[2:].zfill(8) for digit in fingerprint]) | |
def hex_decode_fingerprint(fingerprint): | |
return [int(fingerprint[i:i+8], 16) for i in range(0, len(fingerprint), 8)] | |
def fuzz(fingerprint): | |
bits = [] | |
for number in fingerprint: | |
avg = 0 | |
for i in range(4): | |
line = round(mean([(number >> j) & 1 for j in range(4*i, 4*(i+1))])) << i | |
avg |= line | |
bits.append(str(avg)) | |
return ''.join(bits) | |
def add_fingerprint(fingerprint, name): | |
# Try to insert row | |
fingerprint_str = hex_encode_fingerprint(fingerprint) | |
try: | |
fuzzy = fuzz(fingerprint) | |
cursor.execute('INSERT INTO fingerprints (fingerprint, fuzzy, name) VALUES (?,?,?)', [ | |
fingerprint_str, fuzzy, name]) | |
db.commit() | |
except sqlite3.IntegrityError as ex: | |
print('Already Recorded', ex) | |
return | |
# Get exact matches | |
match_name, match_id = get_exact_match(fingerprint, name) | |
if (match_name is not None): | |
if (match_id is None): | |
cursor.execute('INSERT INTO matches (fingerprint) VALUES (?)', [fingerprint_str]) | |
cursor.execute('SELECT last_insert_rowid()') | |
(match_id,) = cursor.fetchone() | |
cursor.execute('UPDATE fingerprints SET match_id=? WHERE fingerprint=?', [match_id, fingerprint_str]) | |
db.commit() | |
return | |
# Get fuzzy matches | |
match_name, match = _get_max_match(fingerprint, name, True) | |
if (match > .8): | |
cursor.execute('SELECT match_id, fingerprint from fingerprints where name=?', [match_name]) | |
(match_id, match_fingerprint) = cursor.fetchone() | |
match_fingerprint = hex_decode_fingerprint(match_fingerprint) | |
if (match_id is None): | |
cursor.execute('INSERT INTO matches (fingerprint) VALUES (?)', [None]) | |
cursor.execute('SELECT last_insert_rowid()') | |
(match_id,) = cursor.fetchone() | |
cursor.execute('UPDATE fingerprints SET match_id=? WHERE name=?', [match_id, match_name]) | |
cursor.execute('UPDATE fingerprints SET match_id=? WHERE name=?', [match_id, name]) | |
def get_exact_match(fingerprint, name): | |
fingerprint_str = hex_encode_fingerprint(fingerprint) | |
cursor.execute('SELECT name, match_id FROM fingerprints where fingerprint=? AND name<>?', [fingerprint_str, name]) | |
match = cursor.fetchone() | |
if match is None: | |
return (None, None) | |
return tuple(match) | |
def get_max_match(fingerprint, name): | |
return _get_max_match(fingerprint, name) | |
def _get_max_match(fingerprint, name, skip_exact=False): | |
if not skip_exact: | |
exact_match = get_exact_match(fingerprint, name) | |
if exact_match[0] is not None: | |
return (exact_match[0], 1) | |
max_name = None | |
max_match = 0 | |
cursor = db.cursor() | |
cursor.execute(''' | |
SELECT fingerprint, name FROM ( | |
SELECT COALESCE(matches.fingerprint, fingerprints.fingerprint) as fingerprint, name | |
FROM fingerprints | |
LEFT JOIN matches | |
ON match_id == matches.ROWID | |
WHERE name<>? | |
) group by fingerprint | |
''', [name]) | |
rows = cursor.fetchall() | |
for line in rows: | |
other_fingerprint, other_name = line | |
other_fingerprint = hex_decode_fingerprint(other_fingerprint) | |
match = fingerprint_similarity(fingerprint, other_fingerprint) | |
if match > max_match: | |
max_match = match | |
max_name = other_name | |
return max_name, max_match | |
def has_prefix(prefix): | |
cursor = db.cursor() | |
cursor.execute(''' | |
SELECT fingerprint, name FROM fingerprints | |
WHERE name LIKE ? | |
''', [prefix]) | |
rows = cursor.fetchall() | |
return len(rows) > 0 | |
def get_matching_names(name): | |
cursor = db.cursor() | |
cursor.execute(''' | |
SELECT name FROM fingerprints | |
WHERE match_id IN ( | |
SELECT match_id FROM fingerprints | |
WHERE name=? | |
) | |
''', [name]) | |
matches = cursor.fetchall() | |
return [r[0] for r in matches] | |
popcnt_table_8bit = [ | |
0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5, | |
1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6, | |
1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6, | |
2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7, | |
1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6, | |
2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7, | |
2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7, | |
3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8, | |
] | |
def popcnt(x): | |
""" | |
Count the number of set bits in the given 32-bit integer. | |
""" | |
return (popcnt_table_8bit[(x >> 0) & 0xFF] + | |
popcnt_table_8bit[(x >> 8) & 0xFF] + | |
popcnt_table_8bit[(x >> 16) & 0xFF] + | |
popcnt_table_8bit[(x >> 24) & 0xFF]) | |
def fingerprint_similarity(a, b): | |
error = 0 | |
for x, y in zip(a, b): | |
error += popcnt(x ^ y) | |
return 1.0 - error / 32.0 / min(len(a), len(b)) | |
def call(*args, **kwargs): | |
return run(*args, capture_output=True, **kwargs) | |
def get_duration(path): | |
txt = call([ | |
'ffprobe', '-v', 'error', '-show_entries', | |
'format=duration', '-of', | |
'default=noprint_wrappers=1:nokey=1', path]).stdout | |
return float(txt) | |
def split(path, split_times, offset=0): | |
name = os.path.splitext(os.path.basename(path))[0] | |
out_path = os.path.join(temp_dir, name.replace('%', '') + ' %03d.mp3') | |
args = [ | |
'ffmpeg', '-i', path, '-f', 'segment', | |
'-ss', str(offset), | |
'-reset_timestamps', '1', | |
'-c', 'copy', '-map', '0', | |
] | |
if len(split_times) > 0: | |
split_times = ','.join(str(l) for l in split_times) | |
args += ['-segment_times', split_times] | |
else: | |
args += ['-segment_time', '1000000'] | |
args.append(out_path) | |
out = call(args) | |
if (out.returncode != 0): | |
print(out.stderr.decode('utf-8')) | |
def get_silences(path): | |
args = [ | |
'ffmpeg', '-i', path, '-af', | |
'silencedetect=noise=-30dB:d=1.4', | |
'-f', 'null', '-', | |
] | |
out = call(args) | |
silences = [] | |
if (out.returncode != 0): | |
print(out.stderr.decode('utf-8')) | |
return [] | |
for line in out.stderr.split(b'\n'): | |
if (not line.startswith(b'[silencedetect')): | |
continue | |
if (b'silence_end' not in line): | |
continue | |
split = float(line.split()[4]) | |
silences.append(math.floor(split)) | |
return silences | |
def get_fingerprint(path): | |
args = ['fpcalc', '-raw', path] | |
out = call(args) | |
if (out.returncode != 0): | |
print(out.stderr.decode('utf-8')) | |
return None | |
return list(map(int, out.stdout.split()[1].split(b'=')[-1].split(b','))) | |
def safe_remove(path): | |
try: | |
os.remove(path) | |
except: | |
pass | |
def remove_duplicates(path): | |
for filename in os.listdir(path): | |
full_path = os.path.join(path, filename) | |
if (not has_prefix(filename)): | |
fingerprint = get_fingerprint(full_path) | |
if fingerprint is None: | |
print(f'Removing Empty file: {full_path}') | |
safe_remove(full_path) | |
continue | |
add_fingerprint(fingerprint, filename) | |
matches = get_matching_names(filename) | |
for match_name in matches: | |
try: | |
os.remove(os.path.join(path, match_name)) | |
print(f'Removed duplicate: {match_name}') | |
except: | |
pass | |
# TODO: Consider not removing original | |
try: | |
os.remove(os.path.join(out_dir, match_name)) | |
print(f'Removed Original: {match_name}') | |
except: | |
pass | |
if __name__ == '__main__': | |
init_db() | |
if os.path.isdir(out_dir): | |
shutil.rmtree(out_dir) | |
if os.path.isdir(temp_dir): | |
shutil.rmtree(temp_dir) | |
os.makedirs(os.path.abspath(out_dir)) | |
for root, dirs, files in os.walk(in_dir, topdown=False): | |
for name in files: | |
if name.lower().endswith('.png'): | |
continue | |
os.makedirs(os.path.abspath(temp_dir)) | |
full_path = os.path.abspath(os.path.join(root, name)) | |
print(full_path) | |
try: | |
duration = get_duration(full_path) | |
splits = [str(i) for i in get_silences(full_path)] | |
split(full_path, splits, 0) | |
remove_duplicates(temp_dir) | |
for path in os.listdir(temp_dir): | |
shutil.move(os.path.join(temp_dir, path), out_dir) | |
finally: | |
shutil.rmtree(temp_dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment