Skip to content

Instantly share code, notes, and snippets.

@NP-chaonay
Last active May 25, 2020 15:29
Show Gist options
  • Save NP-chaonay/bbe01d289ccc01c81d97ff081e52726f to your computer and use it in GitHub Desktop.
Save NP-chaonay/bbe01d289ccc01c81d97ff081e52726f to your computer and use it in GitHub Desktop.
1st code snippist for Data Science project on Audio Processing.
#### Get the most n highest-amplitude musical note using my custom algorithm of FFT (on FFT length 35999) on each period (4134 samples) in the arr.
### Metadata
+ Author : Nuttapong Punpipat (NP-chaonay)
+ Updated On : 25/05/2020 at 9:56 UTC
+ General Updating Frequency : Weekly
+ Code Snippet Completeness : N/A
+ Stability : Beta
### General Notices
- Thanks for the code snippet (from: https://intellipaat.com/community/32125/plotting-a-fast-fourier-transform-in-python)
- The result in array is the musical note level in numerical format (A4 note is transformed as 54), but :
- if result is 0 : no frequencies detected with amplitude.
<<< Below is the code zone >>>
### Import objects and define constants.
import scipy.io.wavfile as scp_io_wav
import numpy as np
import scipy.fft
import pandas as pd
import subprocess,os,time
import matplotlib.pyplot as plt
from math import log2
# Period size is set to 4134 (3/32 of 1 seconds), changing this seems to be complicated. (You need to change the method "forward-then-reverse array value looping" in code "Get the frequency-amplitude graph")
# Sample rate to use. (Changing this requires to change the others that should be changed (For example, k variable), which seem to be complicated.).
rate=44100
# Amount of freqencies bin (exluding 0) :
# = ( Maximum frequency of specific sample rate ) / ( Difference between (Lowest-frequency in standard tuning that not lower than 20 (E0 frequency) ) and ( The previous (E0 frequency) added by 1 semitone ).
# = 22050/((440*2**(-52/12))-(440*2**(-53/12)))
# = 17999
# Due to some of FFT stuff (FFT window?) is symmetrical by default (So multiplied by 2) :
# Use estimation of value "Amount of freqencies bin (exluding 0)" because of the above reason (the value required to be multiplied by 2).
# And +1 to including 0.
# >>> (17999*2)+1 = 35999 >>> Used as FFT length.
k=35999
### Import external functions and objects (from my libraries.)
# To prepare data from FFmpeg-compatible media file to 1-ch 44100Hz 16-bit PCM format WAVE file with some audio tweaks.
def prepare_data_ffmpeg(path,start=None,end=None,pitch=None,to_path=None):
print('1st Normalizing/Converting using ffmpeg-normalize...')
process=subprocess.run(executable='ffmpeg-normalize',args=['ffmpeg-normalize','-o',temp_path+'converted_phase1','-f','-vn','-ofmt','wav','-nt','peak','-t','0',path],stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.DEVNULL)
if process.returncode!=0:
print('[Error] ffmpeg-normalize process doesn\'t return to 0.')
if process.stdout is not None:
print('<< Start of Process stdout console output>>')
print(process.stdout.decode())
print('<< End of Process stdout console output>>')
else: print('[Note] process\'s stdout is not available.')
if process.stderr is not None:
print('<< Start of Process stderr console output>>')
print(process.stderr.decode())
print('<< End of Process stderr console output>>')
else: print('[Note] process\'s stderr is not available.')
raise DataError('Error occurred when pre-processing the audio data from local file.')
print('Converting/Tweaking using ffmpeg...')
arg_filters='highpass=f=500, lowpass=f=4000'
if pitch!=None: arg_filters+=', rubberband=pitch='+str(pitch)+':pitchq=quality:channels=together'
else: pass
args=['ffmpeg','-i',temp_path+'converted_phase1','-af',arg_filters,'-f','wav','-acodec','pcm_s16le','-ar','44100','-ac','1','-y',temp_path+'converted_phase2']
if start!=None and end!=None: args.insert(1,'-ss'); args.insert(2,str(start)); args.insert(3,'-to'); args.insert(4,str(end))
elif start==None and end!=None: args.insert(1,'-to'); args.insert(2,str(end))
elif end==None and start!=None: args.insert(1,'-ss'); args.insert(2,str(start))
else: pass
process=subprocess.run(executable='ffmpeg',args=args,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.DEVNULL)
if process.returncode!=0:
print('[Error] ffmpeg process doesn\'t return to 0.')
if process.stdout is not None:
print('<< Start of Process stdout console output>>')
print(process.stdout.decode())
print('<< End of Process stdout console output>>')
else: print('[Note] process\'s stdout is not available.')
if process.stderr is not None:
print('<< Start of Process stderr console output>>')
print(process.stderr.decode())
print('<< End of Process stderr console output>>')
else: print('[Note] process\'s stderr is not available.')
raise DataError('Error occurred when pre-processing the audio data from local file.')
print('2nd Normalizing using ffmpeg-normalize...')
process=subprocess.run(executable='ffmpeg-normalize',args=['ffmpeg-normalize','-o',temp_path+'processed_data','-f','-c:a','pcm_s16le','-ar','44100','-ofmt','wav','-nt','peak','-t','0',temp_path+'converted_phase2'],stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.DEVNULL)
if process.returncode!=0:
print('[Error] ffmpeg-normalize process doesn\'t return to 0.')
if process.stdout is not None:
print('<< Start of Process stdout console output>>')
print(process.stdout.decode())
print('<< End of Process stdout console output>>')
else: print('[Note] process\'s stdout is not available.')
if process.stderr is not None:
print('<< Start of Process stderr console output>>')
print(process.stderr.decode())
print('<< End of Process stderr console output>>')
else: print('[Note] process\'s stderr is not available.')
raise DataError('Error occurred when pre-processing the audio data from local file.')
if to_path is None: to_path=temp_path+'processed_data'
else:
file_byte=open(temp_path+'processed_data','rb').read()
open(to_path,'wb').write(file_byte)
return to_path
# To crop the audio array
def prepare_data_crop(path,to_path=None):
audio=scp_io_wav.read(path)
rate=audio[0]
snd=audio[1]
del audio
duration=rate*5
if snd.shape[0]<duration*5: raise DataError('Inputted audio has sample point less than '+str(duration*5)+'.')
side=round( (len(snd)-duration)/2 )
side_1=round( (len(snd)-duration)*0.25 )
side_2=round( (len(snd)-duration)*0.75 )
snd_f=np.array(snd[:duration])
snd_m=np.array(snd[side:side+duration])
snd_l=np.array(snd[-duration:])
snd_1=np.array(snd[side_1:side_1+duration])
snd_2=np.array(snd[side_2:side_2+duration])
snd=np.concatenate([snd_f,snd_1,snd_m,snd_2,snd_l])
if to_path is None:
to_path=temp_path+'processed_data'
scp_io_wav.write(to_path,rate,snd.astype('int16'))
return to_path
# To convert from frequency value array to musical note level value array. (A4 frequency=0).
def calculate_data_note(audio_haffot,offset=0):
note_array=[]
for freq in audio_haffot:
if freq!=0:
note_array+=[float(round(12*log2(freq/440)))]
else:
note_array+=[float('nan')]
return np.nan_to_num(np.array(note_array),nan=np.nanmean(note_array))+offset
### Simple Parameters (Adjust without much problem or tweakings.)
audio_path='/home/np-chaonay/Misc/MusicDataset-2/Songs/dsb-32'
temp_path='/tmp/np-chaonay.ML/'
# Define the amount of n highest-amplitude musical note to get. (1-120)
# Note: IMO, 1,2 isn't accurate enough, so set to 5 (Main Melody + 3 note of root-key's chord + Drums (which possibly loud nearly as vocal.))
display_num=5
# Amplitude threshold (In dB), any frequencies goes below than this will not be counted. (Referenced by Audacity settings, high-amplitude range is (-36)-0 dB.)
db_threshold=-36
### Initialization
# Create temporary directory
try: os.mkdir(temp_path)
except FileExistsError: pass
# Define xf
# Note: "[17:-1674]" means to exlude frequency 0 and any that out-of-range of human-hearable.
xf = np.linspace(0, rate/2, (k//2)+1)[17:-1674]
# Convert to nearest musical note level, using offset as 54 (to allow value "0" as the value for no amplitude on every frequency).
xf=calculate_data_note(xf,54)
### Load audio data.
# audio should be any 1-ch 16bit 44100Hz audio array.
# In this example, get the audio by using prepare_data_ffmpeg() on FFmpeg-supported media file for tweaking, then use scp_io_wav.read()[1].
audio=scp_io_wav.read(prepare_data_ffmpeg(audio_path))[1]
# Or get the audio directly
#audio=scp_io_wav.read(audio_path)[1]
# Check if the amount of samples is odd, if true then crop out 1 last sample from audio array.
if len(audio)%2!=0: audio=audio[:-1]
else: pass
# Crop out any bits (remainder from dividing by 4134).
cut_duration=(len(audio)%4134)//2
if cut_duration!=0: audio=audio[cut_duration:-cut_duration]
else: pass
### Loop thorugh the audio array for analysing.
# Define/Redefine note_array (requires if repeat the process).
note_array=[]
for i in range(len(audio)//4134):
# On end position, add by k for allow overlapping, else add by 4134.
# Divided by 32767 for normalizing.
arr=audio[4134*i:4134*i+4134]/32767
## Get the frequency-amplitude graph
# Use this when overlapping is used.
#yf = 10 * np.log10( (2/k) * np.abs(scipy.fft.fft(arr, k)[:(k//2)+1][17:-1674]) )
# Use this when overlapping isn't used.
yf = 10 * np.log10( (2/k) * np.abs(scipy.fft.fft(np.concatenate([arr,arr[::-1]]*4+[arr]), k)[:(k//2)+1][17:-1674]) )
## Get the most n highest-amplitude musical note.
note_df=pd.DataFrame(zip(xf,yf),columns=['freq','amp']).groupby('freq').max().sort_values('amp',ascending=False).head(display_num)
# If the any fields has value equal to 0 then blank out.
result=note_df.index.values[(note_df.values>=db_threshold).reshape(-1)]
# If there're any fields have been blanked out (on above), fill with the value -1*(Final-transformed value of frequency 0 : -144 (See above code that used to define xf.))
result=np.concatenate([result,[0]*(display_num-len(result))])
# Add to note_array for accumulated the data for futher processing.
note_array+=[result]
# Display to the output
# Note: (Use i+1 because when we hear the specific audio period, the sample point "i+1*(4134)" is passed and processed.)
# Note: Add cut_duration because to match the duration on actual audio.
#print('Notes='+str(result), 'Duration='+str(round((cut_duration+(i+1)*4134)/rate,2)))
# Result (Shape in (Top 4 musical notes, Timestamp))
note_array=np.concatenate(note_array).reshape(-1,display_num).T
# Get the Pandas dataframe from note_array.
df=pd.DataFrame(note_array.T,columns=np.array(range(1,display_num+1)).astype('str'),dtype='int')
### Data Analysing
# Show the most 10 highest-note-count's note from the 1st-loudest note on timestamp. (Most of the time, this can show the root key of the song.)
df.groupby('1').size().sort_values(ascending=False).head(10)
# Show mean,median,mode from the 1st-loudest note on timestamp.
df['1'].mean()
df['1'].median() # (Most of the time, this can match key in the root chord.)
df['1'].mode()
# Replace musical note level to keyname
df.replace(range(1,121),['E','F','F#','G','G#','A','A#','B','C','C#','D','D#']*10)
# OR into numerical keyname
df.replace(range(1,121),[1,2,3,4,5,6,7,8,9,10,11,0]*10)
# Groupby musical keyname instead of level. Then find the top count. (Most of the time, this can show the root key and the average mode (major-minor) of the song.)
top_keyname=(df[df>0]%12).fillna(-1).groupby('1').size().sort_values(ascending=False)
top_keyname=pd.DataFrame(zip(top_keyname.index,top_keyname.values),columns=['key','count'])
top_keyname.key.replace([0,1,2,3,4,5,6,7,8,9,10,11],['D#','E','F','F#','G','G#','A','A#','B','C','C#','D'],inplace=True)
top_keyname
### Visualization
## Visualize note_array like playing the audio.
for i,data in enumerate(note_array.T):
print('Notes='+str(data), 'Duration='+str(round((cut_duration+(i+1)*4134)/rate,2)))
# Slow down processing. (Try to match with sample rate)
time.sleep(4134/rate)
## Label ticks in x axis, useful when ploting, contains timestamp in seconds.
# Note: cut_duration is added because to match the actual audio duration.
# Note: On start position, 4134 is added because the time that first portion of audio be analyzed, that portion is passed and processed, so include the passed portion in the start duration.
x_axis=np.linspace(cut_duration+4134,cut_duration+(len(audio)//4134)*4134,len(audio)//4134)/rate
## Plot df. (from the 1st-loudest note on timestamp.)
plt.scatter(x_axis,df['1'],marker='x',s=4)
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment