Skip to content

Instantly share code, notes, and snippets.

@roninhack
Created May 9, 2018 19:19
Show Gist options
  • Save roninhack/ed16a9eebcc49e8c073c8094a7779a15 to your computer and use it in GitHub Desktop.
Save roninhack/ed16a9eebcc49e8c073c8094a7779a15 to your computer and use it in GitHub Desktop.
Removing known ambient music from a recording to enhance ambient sounds.
#!/usr/bin/python2.6
#
# Author: Olivier Gillet ([email protected])
"""Class and functions to read and write numpy array from and to audio files."""
import copy
import logging
import numpy
import struct
import sys
sys.path.append('.')
# Constant used when converting between unsigned char and float. A different
# value is used in both directions to avoid clipping.
_UNSIGNED_CHAR_TO_FLOAT_SCALE = 128.0
_FLOAT_TO_UNSIGNED_CHAR_SCALE = 127.0
_DATA_CHUNK_HEADER_SIZE = 8
_FMT_CHUNK_DATA_SIZE = 16
_FMT_CHUNK_HEADER_SIZE = 8
_RIFF_FORMAT_DESCRIPTOR_SIZE = 4
class AudioIoException(Exception):
"""An error indicating a failure in audio file reading/writing."""
def __init__(self, message):
"""Initializes an AudioIoException object."""
Exception.__init__(self, 'Audio IO error: %s' % message)
def _ReadBytesOrFail(file_object, num_bytes, error_message):
"""Read a given number of bytes from the file or raise an error.
Args:
file_object: file object.
num_bytes: int. number of bytes to read.
error_message: string. text message of the exception thrown when the number
of bytes could not be read (for example, identifying which section the
caller attempted to read.)
Returns:
String with the bytes read from the file.
Raises:
AudioIoException:
- The required number of bytes could not be read from the file.
"""
read = file_object.read(num_bytes)
if len(read) < num_bytes:
raise AudioIoException(error_message)
return read
def _GoToIffChunk(file_object, iff_chunk_id):
"""Jump to a named chunk in a (R)IFF file.
Args:
file_object: file object.
iff_chunk_id: 4 chars ID of the chunk.
Returns:
length of the chunk in bytes. -1 if the chunk has not been found.
If the chunk is found, file_object is positioned at the beginning of the
chunk. Otherwise, it is positioned at the end of the file.
"""
while True:
chunk_id = file_object.read(4)
if len(chunk_id) < 4:
return -1
chunk_size = file_object.read(4)
if len(chunk_size) < 4:
return -1
chunk_size = struct.unpack('<L', chunk_size)
if iff_chunk_id == chunk_id:
return chunk_size[0]
else:
file_object.seek(chunk_size[0], 1)
def ReadWavFile(file_name, scale=True):
"""Read a .wav file into a numpy array.
Note: the FFmpeg based AudioDecoder is more generic, use this only as a
low-level alternative to AudioDecoder.
Note: does not support GFile, as the fromfile function from numpy can only
use native python file objects.
Args:
file_name: string. name of the local file to load.
scale: boolean. if True, returns float data in the [-1, 1] range instead
of integers.
Returns:
2-dimensional numpy array of size (num_samples, num_channels)
Raises:
AudioIoException:
- The file header is corrupted.
- The file uses an unsupported sampling rate, bitdepth or codec.
"""
f = file(file_name, 'rb')
header = f.read(12)
if len(header) < 12 or header[:4] != 'RIFF' or header[8:] != 'WAVE':
raise AudioIoException('Corrupted header')
format_header_size = _GoToIffChunk(f, 'fmt ')
if format_header_size < 0 or format_header_size != 16:
raise AudioIoException('Invalid header size')
format_header = _ReadBytesOrFail(f, 16, 'Corrupted header')
compression, num_channels, sample_rate, _, _, bitdepth = struct.unpack(
'<HHLLHH', format_header)
if compression != 1:
raise AudioIoException('Unknown .wav codec: %d' % compression)
if not num_channels:
raise AudioIoException('Wrong number of channels')
if sample_rate < 1000 or sample_rate > 96000:
raise AudioIoException('Invalid sample rate')
if bitdepth != 8 and bitdepth != 16:
raise AudioIoException('Unsupported bit depth')
sample_data_size = _GoToIffChunk(f, 'data')
num_samples = sample_data_size / (bitdepth / 8)
# Make sure we are reading a number of samples which is a multiple of the
# number of channels. Some corrupted stereo .wav files may contain 5 samples!
num_samples -= num_samples % num_channels
if bitdepth == 8:
samples = numpy.fromfile(f, dtype=numpy.ubyte, count=num_samples)
if scale:
samples = (samples / _UNSIGNED_CHAR_TO_FLOAT_SCALE) - 1.0
else:
bytes = bitdepth / 8
samples = numpy.fromfile(f, dtype='<i%d' % bytes, count=num_samples)
if scale:
# Semantics of x = x / y and x /= y are different when x and y are
# numpy arrays of a different type. x /= y casts to y's type, while
# x = x / y casts to x's type.
# pylint: disable-msg=C6407
samples = samples / float(1 << (bitdepth - 1))
return (samples.reshape(-1, num_channels), sample_rate)
def Quantize(signal, bitdepth, normalize=True):
"""Convert an array of float to an array of integers.
Args:
signal: numpy array. source signal.
bitdepth: int. size of the integer in bits.
normalize: boolean. whether samples should be scaled to use all the
available dynamic range.
Returns:
array of integers.
"""
norm = numpy.abs(signal).max()
# Normalization or clipping.
if normalize and norm > 0:
scaled_signal = signal / norm
else:
scaled_signal = copy.copy(signal)
if norm > 1.0:
logging.warning('Some samples will be clipped.')
# Clip samples above 1 and below -1.
scaled_signal[scaled_signal < -1] = -1
scaled_signal[scaled_signal > 1] = 1
if bitdepth == 8:
scaled_signal = (scaled_signal + 1.0) * _FLOAT_TO_UNSIGNED_CHAR_SCALE
scaled_signal = numpy.array(scaled_signal, dtype=numpy.uint8)
else:
scale = (1 << (bitdepth - 1)) - 1
# pylint: disable-msg=C6407
scaled_signal = scaled_signal * scale
scaled_signal = numpy.array(scaled_signal, dtype='i%d' % (bitdepth / 8))
return scaled_signal
def WriteWavFile(signal, sample_rate, file_name, bitdepth=16, normalize=True):
"""Write a .wav file from a numpy array.
Note: does not support GFile, as the tofile method of numpy arrays can only
use native python file objects.
Args:
signal: 2-dimensional numpy array, of size (num_samples, num_channels).
sample_rate: int. sample rate of the signal in Hz.
file_name: string. name of the destination file.
bitdepth: int. bitdepth in bits (default 16).
normalize: boolean. if set to True, scale the data to the [-1, 1] range
before writing.
"""
if signal.dtype == numpy.uint8 or signal.dtype == numpy.int16:
bitdepth = signal.dtype.itemsize * 8
scaled_signal = signal
else:
scaled_signal = Quantize(signal, bitdepth, normalize=normalize)
if scaled_signal.ndim == 1:
num_channels = 1
else:
num_channels = scaled_signal.shape[1]
# Compute the total size of the output .wav file, minus the size of the
# first two fields of the RIFF header.
# RIFF Format.
total_size = _RIFF_FORMAT_DESCRIPTOR_SIZE
# 'fmt ' chunk.
total_size += _FMT_CHUNK_HEADER_SIZE + _FMT_CHUNK_DATA_SIZE
# 'data' chunk.
total_size += _DATA_CHUNK_HEADER_SIZE + scaled_signal.nbytes
f = file(file_name, 'wb')
try:
f.write('RIFF')
f.write(struct.pack('<L', total_size))
f.write('WAVEfmt ')
bitrate = sample_rate * num_channels * (bitdepth / 8)
bits_per_sample = num_channels * (bitdepth / 8)
f.write(struct.pack('<LHHLLHH', 16, 1, num_channels, sample_rate, bitrate,
bits_per_sample, bitdepth))
f.write('data')
f.write(struct.pack('<L', scaled_signal.nbytes))
scaled_signal.tofile(f)
finally:
f.close()
import audio_io
import numpy
original_audio, _ = audio_io.ReadWavFile('a_music.wav')
original_audio = original_audio.ravel()
recorded_signal, _ = audio_io.ReadWavFile('b_music_in_room_and_cat.wav')
recorded_signal = recorded_signal.ravel()
# Truncate all signals same length, then pad to avoid boundary effects.
n = min(
recorded_signal.shape[0],
original_audio.shape[0])
original_audio_padded = numpy.zeros((2 * n, ))
recorded_signal_padded = numpy.zeros((2 * n, ))
original_audio_padded[:n] = original_audio[:n]
recorded_signal_padded[:n] = recorded_signal[:n]
# Use cross-correlation to estimate the impulse response of the room
# and speakers.
A = numpy.fft.fft(original_audio_padded)
B = numpy.fft.fft(recorded_signal)
Raa = numpy.conj(A) * A
Rab = numpy.conj(A) * B
Hab = Rab / Raa
h_est = numpy.fft.ifft(Hab).real
# Truncate estimated impulse response
h_est = h_est[:7500]
audio_io.WriteWavFile(h_est, 22050, 'h_est.wav')
# Get the estimate of the original audio played through the room.
estimated_audio_in_room = numpy.convolve(original_audio, h_est)
# Do the subtraction.
estimated_ambient_sound = recorded_signal[:n] - estimated_audio_in_room[:n]
audio_io.WriteWavFile(estimated_ambient_sound, 22050, 'c_cat_estimated.wav')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment