Last active
May 25, 2020 15:29
-
-
Save NP-chaonay/bbe01d289ccc01c81d97ff081e52726f to your computer and use it in GitHub Desktop.
1st code snippist for Data Science project on Audio Processing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #### Get the most n highest-amplitude musical note using my custom algorithm of FFT (on FFT length 35999) on each period (4134 samples) in the arr. | |
| ### Metadata | |
| + Author : Nuttapong Punpipat (NP-chaonay) | |
| + Updated On : 25/05/2020 at 9:56 UTC | |
| + General Updating Frequency : Weekly | |
| + Code Snippet Completeness : N/A | |
| + Stability : Beta | |
| ### General Notices | |
| - Thanks for the code snippet (from: https://intellipaat.com/community/32125/plotting-a-fast-fourier-transform-in-python) | |
| - The result in array is the musical note level in numerical format (A4 note is transformed as 54), but : | |
| - if result is 0 : no frequencies detected with amplitude. | |
| <<< Below is the code zone >>> | |
| ### Import objects and define constants. | |
| import scipy.io.wavfile as scp_io_wav | |
| import numpy as np | |
| import scipy.fft | |
| import pandas as pd | |
| import subprocess,os,time | |
| import matplotlib.pyplot as plt | |
| from math import log2 | |
| # Period size is set to 4134 (3/32 of 1 seconds), changing this seems to be complicated. (You need to change the method "forward-then-reverse array value looping" in code "Get the frequency-amplitude graph") | |
| # Sample rate to use. (Changing this requires to change the others that should be changed (For example, k variable), which seem to be complicated.). | |
| rate=44100 | |
| # Amount of freqencies bin (exluding 0) : | |
| # = ( Maximum frequency of specific sample rate ) / ( Difference between (Lowest-frequency in standard tuning that not lower than 20 (E0 frequency) ) and ( The previous (E0 frequency) added by 1 semitone ). | |
| # = 22050/((440*2**(-52/12))-(440*2**(-53/12))) | |
| # = 17999 | |
| # Due to some of FFT stuff (FFT window?) is symmetrical by default (So multiplied by 2) : | |
| # Use estimation of value "Amount of freqencies bin (exluding 0)" because of the above reason (the value required to be multiplied by 2). | |
| # And +1 to including 0. | |
| # >>> (17999*2)+1 = 35999 >>> Used as FFT length. | |
| k=35999 | |
| ### Import external functions and objects (from my libraries.) | |
| # To prepare data from FFmpeg-compatible media file to 1-ch 44100Hz 16-bit PCM format WAVE file with some audio tweaks. | |
| def prepare_data_ffmpeg(path,start=None,end=None,pitch=None,to_path=None): | |
| print('1st Normalizing/Converting using ffmpeg-normalize...') | |
| process=subprocess.run(executable='ffmpeg-normalize',args=['ffmpeg-normalize','-o',temp_path+'converted_phase1','-f','-vn','-ofmt','wav','-nt','peak','-t','0',path],stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.DEVNULL) | |
| if process.returncode!=0: | |
| print('[Error] ffmpeg-normalize process doesn\'t return to 0.') | |
| if process.stdout is not None: | |
| print('<< Start of Process stdout console output>>') | |
| print(process.stdout.decode()) | |
| print('<< End of Process stdout console output>>') | |
| else: print('[Note] process\'s stdout is not available.') | |
| if process.stderr is not None: | |
| print('<< Start of Process stderr console output>>') | |
| print(process.stderr.decode()) | |
| print('<< End of Process stderr console output>>') | |
| else: print('[Note] process\'s stderr is not available.') | |
| raise DataError('Error occurred when pre-processing the audio data from local file.') | |
| print('Converting/Tweaking using ffmpeg...') | |
| arg_filters='highpass=f=500, lowpass=f=4000' | |
| if pitch!=None: arg_filters+=', rubberband=pitch='+str(pitch)+':pitchq=quality:channels=together' | |
| else: pass | |
| args=['ffmpeg','-i',temp_path+'converted_phase1','-af',arg_filters,'-f','wav','-acodec','pcm_s16le','-ar','44100','-ac','1','-y',temp_path+'converted_phase2'] | |
| if start!=None and end!=None: args.insert(1,'-ss'); args.insert(2,str(start)); args.insert(3,'-to'); args.insert(4,str(end)) | |
| elif start==None and end!=None: args.insert(1,'-to'); args.insert(2,str(end)) | |
| elif end==None and start!=None: args.insert(1,'-ss'); args.insert(2,str(start)) | |
| else: pass | |
| process=subprocess.run(executable='ffmpeg',args=args,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.DEVNULL) | |
| if process.returncode!=0: | |
| print('[Error] ffmpeg process doesn\'t return to 0.') | |
| if process.stdout is not None: | |
| print('<< Start of Process stdout console output>>') | |
| print(process.stdout.decode()) | |
| print('<< End of Process stdout console output>>') | |
| else: print('[Note] process\'s stdout is not available.') | |
| if process.stderr is not None: | |
| print('<< Start of Process stderr console output>>') | |
| print(process.stderr.decode()) | |
| print('<< End of Process stderr console output>>') | |
| else: print('[Note] process\'s stderr is not available.') | |
| raise DataError('Error occurred when pre-processing the audio data from local file.') | |
| print('2nd Normalizing using ffmpeg-normalize...') | |
| process=subprocess.run(executable='ffmpeg-normalize',args=['ffmpeg-normalize','-o',temp_path+'processed_data','-f','-c:a','pcm_s16le','-ar','44100','-ofmt','wav','-nt','peak','-t','0',temp_path+'converted_phase2'],stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.DEVNULL) | |
| if process.returncode!=0: | |
| print('[Error] ffmpeg-normalize process doesn\'t return to 0.') | |
| if process.stdout is not None: | |
| print('<< Start of Process stdout console output>>') | |
| print(process.stdout.decode()) | |
| print('<< End of Process stdout console output>>') | |
| else: print('[Note] process\'s stdout is not available.') | |
| if process.stderr is not None: | |
| print('<< Start of Process stderr console output>>') | |
| print(process.stderr.decode()) | |
| print('<< End of Process stderr console output>>') | |
| else: print('[Note] process\'s stderr is not available.') | |
| raise DataError('Error occurred when pre-processing the audio data from local file.') | |
| if to_path is None: to_path=temp_path+'processed_data' | |
| else: | |
| file_byte=open(temp_path+'processed_data','rb').read() | |
| open(to_path,'wb').write(file_byte) | |
| return to_path | |
| # To crop the audio array | |
| def prepare_data_crop(path,to_path=None): | |
| audio=scp_io_wav.read(path) | |
| rate=audio[0] | |
| snd=audio[1] | |
| del audio | |
| duration=rate*5 | |
| if snd.shape[0]<duration*5: raise DataError('Inputted audio has sample point less than '+str(duration*5)+'.') | |
| side=round( (len(snd)-duration)/2 ) | |
| side_1=round( (len(snd)-duration)*0.25 ) | |
| side_2=round( (len(snd)-duration)*0.75 ) | |
| snd_f=np.array(snd[:duration]) | |
| snd_m=np.array(snd[side:side+duration]) | |
| snd_l=np.array(snd[-duration:]) | |
| snd_1=np.array(snd[side_1:side_1+duration]) | |
| snd_2=np.array(snd[side_2:side_2+duration]) | |
| snd=np.concatenate([snd_f,snd_1,snd_m,snd_2,snd_l]) | |
| if to_path is None: | |
| to_path=temp_path+'processed_data' | |
| scp_io_wav.write(to_path,rate,snd.astype('int16')) | |
| return to_path | |
| # To convert from frequency value array to musical note level value array. (A4 frequency=0). | |
| def calculate_data_note(audio_haffot,offset=0): | |
| note_array=[] | |
| for freq in audio_haffot: | |
| if freq!=0: | |
| note_array+=[float(round(12*log2(freq/440)))] | |
| else: | |
| note_array+=[float('nan')] | |
| return np.nan_to_num(np.array(note_array),nan=np.nanmean(note_array))+offset | |
| ### Simple Parameters (Adjust without much problem or tweakings.) | |
| audio_path='/home/np-chaonay/Misc/MusicDataset-2/Songs/dsb-32' | |
| temp_path='/tmp/np-chaonay.ML/' | |
| # Define the amount of n highest-amplitude musical note to get. (1-120) | |
| # Note: IMO, 1,2 isn't accurate enough, so set to 5 (Main Melody + 3 note of root-key's chord + Drums (which possibly loud nearly as vocal.)) | |
| display_num=5 | |
| # Amplitude threshold (In dB), any frequencies goes below than this will not be counted. (Referenced by Audacity settings, high-amplitude range is (-36)-0 dB.) | |
| db_threshold=-36 | |
| ### Initialization | |
| # Create temporary directory | |
| try: os.mkdir(temp_path) | |
| except FileExistsError: pass | |
| # Define xf | |
| # Note: "[17:-1674]" means to exlude frequency 0 and any that out-of-range of human-hearable. | |
| xf = np.linspace(0, rate/2, (k//2)+1)[17:-1674] | |
| # Convert to nearest musical note level, using offset as 54 (to allow value "0" as the value for no amplitude on every frequency). | |
| xf=calculate_data_note(xf,54) | |
| ### Load audio data. | |
| # audio should be any 1-ch 16bit 44100Hz audio array. | |
| # In this example, get the audio by using prepare_data_ffmpeg() on FFmpeg-supported media file for tweaking, then use scp_io_wav.read()[1]. | |
| audio=scp_io_wav.read(prepare_data_ffmpeg(audio_path))[1] | |
| # Or get the audio directly | |
| #audio=scp_io_wav.read(audio_path)[1] | |
| # Check if the amount of samples is odd, if true then crop out 1 last sample from audio array. | |
| if len(audio)%2!=0: audio=audio[:-1] | |
| else: pass | |
| # Crop out any bits (remainder from dividing by 4134). | |
| cut_duration=(len(audio)%4134)//2 | |
| if cut_duration!=0: audio=audio[cut_duration:-cut_duration] | |
| else: pass | |
| ### Loop thorugh the audio array for analysing. | |
| # Define/Redefine note_array (requires if repeat the process). | |
| note_array=[] | |
| for i in range(len(audio)//4134): | |
| # On end position, add by k for allow overlapping, else add by 4134. | |
| # Divided by 32767 for normalizing. | |
| arr=audio[4134*i:4134*i+4134]/32767 | |
| ## Get the frequency-amplitude graph | |
| # Use this when overlapping is used. | |
| #yf = 10 * np.log10( (2/k) * np.abs(scipy.fft.fft(arr, k)[:(k//2)+1][17:-1674]) ) | |
| # Use this when overlapping isn't used. | |
| yf = 10 * np.log10( (2/k) * np.abs(scipy.fft.fft(np.concatenate([arr,arr[::-1]]*4+[arr]), k)[:(k//2)+1][17:-1674]) ) | |
| ## Get the most n highest-amplitude musical note. | |
| note_df=pd.DataFrame(zip(xf,yf),columns=['freq','amp']).groupby('freq').max().sort_values('amp',ascending=False).head(display_num) | |
| # If the any fields has value equal to 0 then blank out. | |
| result=note_df.index.values[(note_df.values>=db_threshold).reshape(-1)] | |
| # If there're any fields have been blanked out (on above), fill with the value -1*(Final-transformed value of frequency 0 : -144 (See above code that used to define xf.)) | |
| result=np.concatenate([result,[0]*(display_num-len(result))]) | |
| # Add to note_array for accumulated the data for futher processing. | |
| note_array+=[result] | |
| # Display to the output | |
| # Note: (Use i+1 because when we hear the specific audio period, the sample point "i+1*(4134)" is passed and processed.) | |
| # Note: Add cut_duration because to match the duration on actual audio. | |
| #print('Notes='+str(result), 'Duration='+str(round((cut_duration+(i+1)*4134)/rate,2))) | |
| # Result (Shape in (Top 4 musical notes, Timestamp)) | |
| note_array=np.concatenate(note_array).reshape(-1,display_num).T | |
| # Get the Pandas dataframe from note_array. | |
| df=pd.DataFrame(note_array.T,columns=np.array(range(1,display_num+1)).astype('str'),dtype='int') | |
| ### Data Analysing | |
| # Show the most 10 highest-note-count's note from the 1st-loudest note on timestamp. (Most of the time, this can show the root key of the song.) | |
| df.groupby('1').size().sort_values(ascending=False).head(10) | |
| # Show mean,median,mode from the 1st-loudest note on timestamp. | |
| df['1'].mean() | |
| df['1'].median() # (Most of the time, this can match key in the root chord.) | |
| df['1'].mode() | |
| # Replace musical note level to keyname | |
| df.replace(range(1,121),['E','F','F#','G','G#','A','A#','B','C','C#','D','D#']*10) | |
| # OR into numerical keyname | |
| df.replace(range(1,121),[1,2,3,4,5,6,7,8,9,10,11,0]*10) | |
| # Groupby musical keyname instead of level. Then find the top count. (Most of the time, this can show the root key and the average mode (major-minor) of the song.) | |
| top_keyname=(df[df>0]%12).fillna(-1).groupby('1').size().sort_values(ascending=False) | |
| top_keyname=pd.DataFrame(zip(top_keyname.index,top_keyname.values),columns=['key','count']) | |
| top_keyname.key.replace([0,1,2,3,4,5,6,7,8,9,10,11],['D#','E','F','F#','G','G#','A','A#','B','C','C#','D'],inplace=True) | |
| top_keyname | |
| ### Visualization | |
| ## Visualize note_array like playing the audio. | |
| for i,data in enumerate(note_array.T): | |
| print('Notes='+str(data), 'Duration='+str(round((cut_duration+(i+1)*4134)/rate,2))) | |
| # Slow down processing. (Try to match with sample rate) | |
| time.sleep(4134/rate) | |
| ## Label ticks in x axis, useful when ploting, contains timestamp in seconds. | |
| # Note: cut_duration is added because to match the actual audio duration. | |
| # Note: On start position, 4134 is added because the time that first portion of audio be analyzed, that portion is passed and processed, so include the passed portion in the start duration. | |
| x_axis=np.linspace(cut_duration+4134,cut_duration+(len(audio)//4134)*4134,len(audio)//4134)/rate | |
| ## Plot df. (from the 1st-loudest note on timestamp.) | |
| plt.scatter(x_axis,df['1'],marker='x',s=4) | |
| plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment