Skip to content

Instantly share code, notes, and snippets.

@dmorrison42
Last active May 4, 2022 08:19
Show Gist options
  • Save dmorrison42/b815991e36ea652e37df9669ecf9e6d5 to your computer and use it in GitHub Desktop.
Save dmorrison42/b815991e36ea652e37df9669ecf9e6d5 to your computer and use it in GitHub Desktop.
Podcast Splitting Script
#!/usr/bin/env python
# Dependencies:
# ffmpeg: https://www.ffmpeg.org/download.html
# fpcalc: https://acoustid.org/chromaprint
from datetime import datetime
import os
import os.path
import json
import math
import shutil
import sqlite3
from subprocess import run
from statistics import mean
db_path = 'fingerprints.db'
in_dir = 'C:\\Users\\dan\\Documents\\gPodder\\Downloads'
out_dir = 'Output'
temp_dir = 'Temp'
min_length = 300
db = None
cursor = None
def clear_db():
safe_remove(db_path)
def init_db():
global db
global cursor
db = sqlite3.connect(db_path)
cursor = db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS fingerprints
(name TEXT NOT NULL UNIQUE, fingerprint TEXT NOT NULL, fuzzy TEXT, match_id INTEGER)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS matches
(fingerprint TEXT)''')
db.commit()
def hex_encode_fingerprint(fingerprint):
return ''.join([hex(digit)[2:].zfill(8) for digit in fingerprint])
def hex_decode_fingerprint(fingerprint):
return [int(fingerprint[i:i+8], 16) for i in range(0, len(fingerprint), 8)]
def fuzz(fingerprint):
bits = []
for number in fingerprint:
avg = 0
for i in range(4):
line = round(mean([(number >> j) & 1 for j in range(4*i, 4*(i+1))])) << i
avg |= line
bits.append(str(avg))
return ''.join(bits)
def add_fingerprint(fingerprint, name):
# Try to insert row
fingerprint_str = hex_encode_fingerprint(fingerprint)
try:
fuzzy = fuzz(fingerprint)
cursor.execute('INSERT INTO fingerprints (fingerprint, fuzzy, name) VALUES (?,?,?)', [
fingerprint_str, fuzzy, name])
db.commit()
except sqlite3.IntegrityError as ex:
print('Already Recorded', ex)
return
# Get exact matches
match_name, match_id = get_exact_match(fingerprint, name)
if (match_name is not None):
if (match_id is None):
cursor.execute('INSERT INTO matches (fingerprint) VALUES (?)', [fingerprint_str])
cursor.execute('SELECT last_insert_rowid()')
(match_id,) = cursor.fetchone()
cursor.execute('UPDATE fingerprints SET match_id=? WHERE fingerprint=?', [match_id, fingerprint_str])
db.commit()
return
# Get fuzzy matches
match_name, match = _get_max_match(fingerprint, name, True)
if (match > .8):
cursor.execute('SELECT match_id, fingerprint from fingerprints where name=?', [match_name])
(match_id, match_fingerprint) = cursor.fetchone()
match_fingerprint = hex_decode_fingerprint(match_fingerprint)
if (match_id is None):
cursor.execute('INSERT INTO matches (fingerprint) VALUES (?)', [None])
cursor.execute('SELECT last_insert_rowid()')
(match_id,) = cursor.fetchone()
cursor.execute('UPDATE fingerprints SET match_id=? WHERE name=?', [match_id, match_name])
cursor.execute('UPDATE fingerprints SET match_id=? WHERE name=?', [match_id, name])
def get_exact_match(fingerprint, name):
fingerprint_str = hex_encode_fingerprint(fingerprint)
cursor.execute('SELECT name, match_id FROM fingerprints where fingerprint=? AND name<>?', [fingerprint_str, name])
match = cursor.fetchone()
if match is None:
return (None, None)
return tuple(match)
def get_max_match(fingerprint, name):
return _get_max_match(fingerprint, name)
def _get_max_match(fingerprint, name, skip_exact=False):
if not skip_exact:
exact_match = get_exact_match(fingerprint, name)
if exact_match[0] is not None:
return (exact_match[0], 1)
max_name = None
max_match = 0
cursor = db.cursor()
cursor.execute('''
SELECT fingerprint, name FROM (
SELECT COALESCE(matches.fingerprint, fingerprints.fingerprint) as fingerprint, name
FROM fingerprints
LEFT JOIN matches
ON match_id == matches.ROWID
WHERE name<>?
) group by fingerprint
''', [name])
rows = cursor.fetchall()
for line in rows:
other_fingerprint, other_name = line
other_fingerprint = hex_decode_fingerprint(other_fingerprint)
match = fingerprint_similarity(fingerprint, other_fingerprint)
if match > max_match:
max_match = match
max_name = other_name
return max_name, max_match
def has_prefix(prefix):
cursor = db.cursor()
cursor.execute('''
SELECT fingerprint, name FROM fingerprints
WHERE name LIKE ?
''', [prefix])
rows = cursor.fetchall()
return len(rows) > 0
def get_matching_names(name):
cursor = db.cursor()
cursor.execute('''
SELECT name FROM fingerprints
WHERE match_id IN (
SELECT match_id FROM fingerprints
WHERE name=?
)
''', [name])
matches = cursor.fetchall()
return [r[0] for r in matches]
popcnt_table_8bit = [
0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,
1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,
1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,
2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,
1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,
2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,
2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,
3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8,
]
def popcnt(x):
"""
Count the number of set bits in the given 32-bit integer.
"""
return (popcnt_table_8bit[(x >> 0) & 0xFF] +
popcnt_table_8bit[(x >> 8) & 0xFF] +
popcnt_table_8bit[(x >> 16) & 0xFF] +
popcnt_table_8bit[(x >> 24) & 0xFF])
def fingerprint_similarity(a, b):
error = 0
for x, y in zip(a, b):
error += popcnt(x ^ y)
return 1.0 - error / 32.0 / min(len(a), len(b))
def call(*args, **kwargs):
return run(*args, capture_output=True, **kwargs)
def get_duration(path):
txt = call([
'ffprobe', '-v', 'error', '-show_entries',
'format=duration', '-of',
'default=noprint_wrappers=1:nokey=1', path]).stdout
return float(txt)
def split(path, split_times, offset=0):
name = os.path.splitext(os.path.basename(path))[0]
out_path = os.path.join(temp_dir, name.replace('%', '') + ' %03d.mp3')
args = [
'ffmpeg', '-i', path, '-f', 'segment',
'-ss', str(offset),
'-reset_timestamps', '1',
'-c', 'copy', '-map', '0',
]
if len(split_times) > 0:
split_times = ','.join(str(l) for l in split_times)
args += ['-segment_times', split_times]
else:
args += ['-segment_time', '1000000']
args.append(out_path)
out = call(args)
if (out.returncode != 0):
print(out.stderr.decode('utf-8'))
def get_silences(path):
args = [
'ffmpeg', '-i', path, '-af',
'silencedetect=noise=-30dB:d=1.4',
'-f', 'null', '-',
]
out = call(args)
silences = []
if (out.returncode != 0):
print(out.stderr.decode('utf-8'))
return []
for line in out.stderr.split(b'\n'):
if (not line.startswith(b'[silencedetect')):
continue
if (b'silence_end' not in line):
continue
split = float(line.split()[4])
silences.append(math.floor(split))
return silences
def get_fingerprint(path):
args = ['fpcalc', '-raw', path]
out = call(args)
if (out.returncode != 0):
print(out.stderr.decode('utf-8'))
return None
return list(map(int, out.stdout.split()[1].split(b'=')[-1].split(b',')))
def safe_remove(path):
try:
os.remove(path)
except:
pass
def remove_duplicates(path):
for filename in os.listdir(path):
full_path = os.path.join(path, filename)
if (not has_prefix(filename)):
fingerprint = get_fingerprint(full_path)
if fingerprint is None:
print(f'Removing Empty file: {full_path}')
safe_remove(full_path)
continue
add_fingerprint(fingerprint, filename)
matches = get_matching_names(filename)
for match_name in matches:
try:
os.remove(os.path.join(path, match_name))
print(f'Removed duplicate: {match_name}')
except:
pass
# TODO: Consider not removing original
try:
os.remove(os.path.join(out_dir, match_name))
print(f'Removed Original: {match_name}')
except:
pass
if __name__ == '__main__':
init_db()
if os.path.isdir(out_dir):
shutil.rmtree(out_dir)
if os.path.isdir(temp_dir):
shutil.rmtree(temp_dir)
os.makedirs(os.path.abspath(out_dir))
for root, dirs, files in os.walk(in_dir, topdown=False):
for name in files:
if name.lower().endswith('.png'):
continue
os.makedirs(os.path.abspath(temp_dir))
full_path = os.path.abspath(os.path.join(root, name))
print(full_path)
try:
duration = get_duration(full_path)
splits = [str(i) for i in get_silences(full_path)]
split(full_path, splits, 0)
remove_duplicates(temp_dir)
for path in os.listdir(temp_dir):
shutil.move(os.path.join(temp_dir, path), out_dir)
finally:
shutil.rmtree(temp_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment