Last active
January 2, 2024 16:17
-
-
Save carlwilson/679d5166dce7cac33cc10af1eb85f156 to your computer and use it in GitHub Desktop.
Python Stereo convolution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
import os | |
import sys | |
# pip import soundfile | |
import soundfile as SF | |
# pip install numpy | |
import numpy | |
# pip install scipy | |
from scipy import signal | |
SRC = '/mnt/8TB-WD/music' | |
DEST = '/mnt/4TB-WD.3/ChordPoly/SE535' | |
def convolve_file(src_dest): | |
try: | |
# Read source file using soundfile, should be a WAV or FLAC | |
orig_data, orig_rate = SF.read(src_dest[0]) | |
print('src {} rate {}'.format(src_dest[0], orig_rate)) | |
except RuntimeError: | |
# Catch the horrible runtimes thrown by invalid files | |
print('ERROR: Reading {}'.format(src_dest[0])) | |
return None, 0 | |
# Don't overwrite files | |
if os.path.isfile(src_dest[1]): | |
print('Skipping {} as destination exists.'.format(src_dest[0])) | |
return None, 0 | |
# Read the filter data | |
filt_data, _ = SF.read('ShureSE535/Shure SE535 linear phase {}Hz.wav'.format(orig_rate)) | |
try: | |
# Apply 2 channel convolution | |
# TODO: beware, not all files are stereo | |
new_data = numpy.array([signal.convolve(orig_data[:,0], filt_data[:,0]), # Channel 1 | |
signal.convolve(orig_data[:,1], filt_data[:,1])]).T # Channel 2 | |
except Exception: | |
print('ERROR: Convolving {}'.format(src_dest[0])) | |
# Now write the data as a 24 bit FLAC to the destination, | |
SF.write(src_dest[1], new_data, orig_rate, subtype='PCM_24') | |
return new_data, orig_rate | |
if __name__ == '__main__': | |
to_process = set() | |
start_time = time.time() | |
# transcode all the found files | |
try: | |
for root, dirs, files in os.walk(SRC): | |
for file in files: | |
if not file.casefold().endswith('.flac') | file.casefold().endswith('.wav'): | |
continue | |
out_dir = os.path.join(DEST, ''.join(root[len(SRC)+1:].split())) | |
if not os.path.isdir(out_dir): | |
os.makedirs(out_dir) | |
to_process.add((os.path.join(root, file), | |
os.path.join(out_dir, '_'.join(file.split())))) | |
pool = mp.Pool(processes=8) # Set number of threads here | |
terminated = False | |
succeeded = False | |
result = pool.map_async(convolve_file, to_process) | |
while 1: | |
try: | |
# wait for the result to come in, and mark success once it does | |
result.get(0.1) | |
succeeded = True | |
break | |
except mp.TimeoutError: | |
continue | |
except KeyboardInterrupt: | |
terminated = True | |
pool.terminate() | |
pool.join() | |
except Exception as e: | |
# catch and log all other exceptions gracefully | |
log.exception(e) | |
overall_time = time.time() - start_time | |
if succeeded: | |
sys.exit(0) | |
elif terminated: | |
sys.exit(3) | |
else: | |
sys.exit(4) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Never use this for very long IRs (>1 second), as this code will be very slow and suboptimal.