Last active
November 11, 2024 14:21
-
-
Save ZFTurbo/a98facf1406903e94a44791323e7cf0e to your computer and use it in GitHub Desktop.
Moises to MUSDB18
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import soundfile as sf | |
import numpy as np | |
from tqdm import tqdm | |
import glob | |
import operator | |
def combine_audio_files(files: list[str], prefered_length: int) -> tuple[np.ndarray, int]: | |
""" | |
Combines multiple audio files into one by overlaying them. | |
Parameters: | |
- files (list[str]): List of file paths to be combined. | |
Returns: | |
- tuple[np.ndarray, int]: A tuple containing the combined audio data array and sample rate. | |
""" | |
combined_data, sample_rate = sf.read(files[0]) | |
for file in files[1:]: | |
data, sr = sf.read(file) | |
if len(data) > len(combined_data): | |
combined_data = np.pad(combined_data, ((0, len(data) - len(combined_data)), (0, 0)), 'constant') | |
elif len(combined_data) > len(data): | |
data = np.pad(data, ((0, len(combined_data) - len(data)), (0, 0)), 'constant') | |
combined_data += data | |
# We need to make all the same length | |
if len(combined_data) > prefered_length: | |
combined_data = combined_data[:prefered_length] | |
elif len(combined_data) < prefered_length: | |
combined_data = np.pad(combined_data, ((0, prefered_length - len(combined_data)), (0, 0)), 'constant') | |
return combined_data, sample_rate | |
def files_to_categories(src_folder: str, categories: list[str]) -> dict[str, list[str]]: | |
""" | |
Finds all .wav files located in folders that do not contain specified categories | |
in their folder names, within the given src_folder directory. | |
Parameters: | |
- src_folder (str): Path to the main directory containing subdirectories with files. | |
- categories (list[str]): Keywords that should not be part of the folder's name. | |
Returns: | |
- dict[str, list[str]]: A dict with keys as categories, values as lists of paths to .wav files found. | |
""" | |
files = {category: [] for category in categories + ['other']} | |
for folder in os.listdir(src_folder): | |
folder_path = os.path.join(src_folder, folder) | |
if os.path.isdir(folder_path): | |
if folder.lower() in categories: | |
stem = folder.lower() | |
else: | |
stem = 'other' | |
for f in os.listdir(folder_path): | |
if f.endswith('.wav'): | |
files[stem].append(os.path.join(folder_path, f)) | |
return files | |
def sort_dict_by_values(a, reverse=True): | |
sorted_x = sorted(a.items(), key=operator.itemgetter(1), reverse=reverse) | |
return sorted_x | |
def find_required_length(all_files): | |
lengths = {} | |
for cat in all_files: | |
for file in all_files[cat]: | |
wav, sr = sf.read(file) | |
if len(wav) in lengths: | |
lengths[len(wav)] += 1 | |
else: | |
lengths[len(wav)] = 1 | |
res = sort_dict_by_values(lengths) | |
# print(res) | |
return res[0][0] | |
def process_folder(src_folder: str, dest_folder: str) -> None: | |
""" | |
Processes a folder containing audio tracks, copying and combining necessary files into the target structure. | |
Parameters: | |
- src_folder (str): Path to the source folder of MoisesDB. | |
- dest_folder (str): Path to the target folder for MUSDB18. | |
""" | |
logs = [] | |
if not os.path.exists(dest_folder): | |
os.makedirs(dest_folder) | |
categories = ["bass", "drums", "vocals"] | |
# If the required stem does not exist in the source folder (src_folder), | |
# we add silence instead of the file with the same duration as the standard file. | |
problem_categories = [] | |
duration = 0 | |
all_files = files_to_categories(src_folder, categories) | |
# Some of stems has different length - we need to align them to the same length | |
prefered_length = find_required_length(all_files) | |
# Using tqdm to display progress for categories | |
for category in tqdm(categories, desc=f"Processing categories in {os.path.basename(src_folder)}"): | |
files = all_files[category] | |
if files: | |
combined_data, sample_rate = combine_audio_files(files, prefered_length) | |
sf.write(os.path.join(dest_folder, f"{category}.wav"), combined_data, sample_rate) | |
duration = max(duration, len(combined_data) / sample_rate) | |
else: | |
logs.append(f"Warning: file for category '{category}' does not exist in {src_folder}") | |
problem_categories.append(category) | |
other_files = all_files['other'] | |
if other_files: | |
other_combined_data, sample_rate = combine_audio_files(other_files, prefered_length) | |
sf.write(os.path.join(dest_folder, "other.wav"), other_combined_data, sample_rate) | |
else: | |
logs.append(f"Warning: file for category 'other' does not exist in {src_folder}") | |
problem_categories.append('other') | |
for category in problem_categories: | |
silence = np.zeros((prefered_length, 2), dtype=np.float32) | |
sf.write(os.path.join(dest_folder, f"{category}.wav"), silence, sample_rate) | |
# mixture.wav | |
all_files_list = [file for sublist in all_files.values() for file in sublist] | |
mixture_data, sample_rate = combine_audio_files(all_files_list, prefered_length) | |
sf.write(os.path.join(dest_folder, "mixture.wav"), mixture_data, sample_rate) | |
return logs | |
def process_folder_wrapper(args: tuple[str, str]) -> None: | |
""" | |
A wrapper function for 'process_folder' that unpacks the arguments. | |
Parameters: | |
- args (tuple[str, str]): A tuple containing the source folder and destination folder paths. | |
""" | |
src_folder, dest_folder = args | |
return process_folder(src_folder, dest_folder) | |
def convert_dataset(src_root: str, dest_root: str, max_folders: int = 240, num_workers: int = 4) -> None: | |
""" | |
Converts MoisesDB dataset to MUSDB18 format for a specified number of folders. | |
Parameters: | |
- src_root (str): Root directory of the MoisesDB dataset. | |
- dest_root (str): Root directory where the new dataset will be saved. | |
- max_folders (int): Maximum number of folders to process. | |
- num_workers (int): Number of parallel workers for processing. | |
""" | |
from multiprocessing import Pool | |
logs=[] | |
folders_to_process = [] | |
for folder in os.listdir(src_root): | |
if len(folders_to_process) >= max_folders: | |
break | |
src_folder = os.path.join(src_root, folder) | |
dest_folder = os.path.join(dest_root, folder) | |
if os.path.isdir(src_folder): | |
folders_to_process.append((src_folder, dest_folder)) | |
else: | |
print(f"Skip {src_folder} — not dir") | |
with Pool(num_workers) as pool: | |
result=pool.map(process_folder_wrapper, folders_to_process) | |
for log in result: | |
if log: | |
logs.extend(log) | |
return logs | |
def check_length(dest_root): | |
folders = glob.glob(dest_root + '/*') | |
for folder in folders: | |
print(folder) | |
files = glob.glob(folder + '/*.*') | |
print(len(files)) | |
lengths = [] | |
for file in files: | |
wav, sr = sf.read(file) | |
lengths.append(wav.shape) | |
print(set(lengths)) | |
if __name__ == '__main__': | |
src_root = r"D:/Music_Datasets/moisesdb_v0.1/" | |
dest_root = r"D:/Music_Datasets/moisesdb_musdb_format/" | |
convert_dataset(src_root, dest_root, max_folders=5000, num_workers=8) | |
check_length(dest_root) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment