Last active
January 14, 2021 08:42
-
-
Save cpmpercussion/1505b74b573d106df6b820b960951567 to your computer and use it in GitHub Desktop.
A script to parse lots of MIDI files into a simple melody-only 16th-note-only NumPy array format. Generates worker processes to use all available CPU power.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from music21 import converter, instrument, note, chord, stream, midi | |
import numpy as np | |
import pandas as pd | |
import os | |
import time | |
import h5py | |
import argparse | |
import multiprocessing | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-d', '--directory', | |
action='store', # tell to store a value | |
dest='directory', # use `username` to access value | |
help='The directory to search for MIDI files.') | |
parser.add_argument('-n', '--nthreads', | |
action='store', | |
dest='nthreads', | |
help="Number of threads to start.", | |
type=int, | |
default=10) | |
args = parser.parse_args() | |
midi_directory = args.directory | |
nthreads = args.nthreads | |
np.set_printoptions(threshold=10e6) | |
# MELODY_NOTE_ON = [0, 127] # (note on at that MIDI pitch) | |
MELODY_NOTE_OFF = 128 # (stop playing all previous notes) | |
MELODY_NO_EVENT = 129 # (no change from previous event) | |
def streamToNoteArray(stream): | |
""" | |
Convert a Music21 sequence to a numpy array of int8s into Melody-RNN format: | |
0-127 - note on at specified pitch | |
128 - note off | |
129 - no event | |
""" | |
# Part one, extract from stream | |
# = np.int(np.round(stream.flat.highestTime / 0.25)) # in semiquavers | |
stream_list = [] | |
for element in stream.flat: | |
if isinstance(element, note.Note): | |
stream_list.append([np.round(element.offset / 0.25), max(np.round(element.quarterLength / 0.25),1), element.pitch.midi]) | |
elif isinstance(element, chord.Chord): | |
stream_list.append([np.round(element.offset / 0.25), max(np.round(element.quarterLength / 0.25),1), max([p.midi for p in element.pitches])]) | |
np_stream_list = np.array(stream_list, dtype=np.int) | |
df = pd.DataFrame({'on': np_stream_list.T[0], 'off': np_stream_list.T[0] + np_stream_list.T[1], 'pitch': np_stream_list.T[2]}) | |
df = df.sort_values(['on','pitch'], ascending=[True, False]) # sort the dataframe properly | |
df = df.drop_duplicates(subset=['on']) # drop duplicate values | |
# part 2, convert into a sequence of note events | |
output = np.zeros(df.off.max()+1, dtype=np.int16) + np.int16(MELODY_NO_EVENT) # set array full of no events by default. | |
# Fill in the output list | |
for row in df.iterrows(): | |
output[row[1].on] = row[1].pitch # set note on | |
output[row[1].off] = MELODY_NOTE_OFF | |
return output | |
def process_file_worker(q, counter, file_lock): | |
while True: | |
midi_file = q.get() | |
if midi_file is None: | |
break | |
start = time.time() | |
try: | |
s = converter.parse(midi_file) | |
except Exception as e: | |
print("exception while parsing midi") | |
print(e) | |
q.task_done() | |
continue | |
arr = streamToNoteArray(s.parts[0]) # just extract first voice | |
melody_array = np.array(arr) | |
with file_lock: | |
h5f_file = h5py.File('midi_arrays.h5', 'a') | |
counter.value += 1 | |
h5f_file.create_dataset('midi'+str(counter.value), data=melody_array) | |
print("{}: Converted: {} it took {:0.4f}".format(counter.value, midi_file, time.time() - start)) | |
h5f_file.close() | |
q.task_done() | |
def main(): | |
print("Going to search:", midi_directory) | |
midi_files = [] | |
start = time.time() | |
for root, dirs, files in os.walk(midi_directory): | |
for file in files: | |
if ".mid" in file: | |
midi_files.append(root + os.sep + file) | |
print("Found", len(midi_files), "midi files.") | |
print("Search took", time.time() - start) | |
#h5f_file = h5py.File('midi_arrays.h5', 'w') | |
file_lock = multiprocessing.Lock() # Protects the file from concurrent access | |
q = multiprocessing.JoinableQueue() | |
counter = multiprocessing.Value('i', 0) | |
multiprocessing.Pool(nthreads, process_file_worker, (q, counter, file_lock,)) | |
for file in midi_files: | |
q.put(file) | |
q.join() # block until tasks are done. | |
print("Done.") | |
#h5f_file.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment