Created
May 9, 2018 19:19
-
-
Save roninhack/ed16a9eebcc49e8c073c8094a7779a15 to your computer and use it in GitHub Desktop.
Removing known ambient music from a recording to enhance ambient sounds.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2.6 | |
# | |
# Author: Olivier Gillet ([email protected]) | |
"""Class and functions to read and write numpy array from and to audio files.""" | |
import copy | |
import logging | |
import numpy | |
import struct | |
import sys | |
sys.path.append('.') | |
# Constant used when converting between unsigned char and float. A different | |
# value is used in both directions to avoid clipping. | |
_UNSIGNED_CHAR_TO_FLOAT_SCALE = 128.0 | |
_FLOAT_TO_UNSIGNED_CHAR_SCALE = 127.0 | |
_DATA_CHUNK_HEADER_SIZE = 8 | |
_FMT_CHUNK_DATA_SIZE = 16 | |
_FMT_CHUNK_HEADER_SIZE = 8 | |
_RIFF_FORMAT_DESCRIPTOR_SIZE = 4 | |
class AudioIoException(Exception): | |
"""An error indicating a failure in audio file reading/writing.""" | |
def __init__(self, message): | |
"""Initializes an AudioIoException object.""" | |
Exception.__init__(self, 'Audio IO error: %s' % message) | |
def _ReadBytesOrFail(file_object, num_bytes, error_message): | |
"""Read a given number of bytes from the file or raise an error. | |
Args: | |
file_object: file object. | |
num_bytes: int. number of bytes to read. | |
error_message: string. text message of the exception thrown when the number | |
of bytes could not be read (for example, identifying which section the | |
caller attempted to read.) | |
Returns: | |
String with the bytes read from the file. | |
Raises: | |
AudioIoException: | |
- The required number of bytes could not be read from the file. | |
""" | |
read = file_object.read(num_bytes) | |
if len(read) < num_bytes: | |
raise AudioIoException(error_message) | |
return read | |
def _GoToIffChunk(file_object, iff_chunk_id): | |
"""Jump to a named chunk in a (R)IFF file. | |
Args: | |
file_object: file object. | |
iff_chunk_id: 4 chars ID of the chunk. | |
Returns: | |
length of the chunk in bytes. -1 if the chunk has not been found. | |
If the chunk is found, file_object is positioned at the beginning of the | |
chunk. Otherwise, it is positioned at the end of the file. | |
""" | |
while True: | |
chunk_id = file_object.read(4) | |
if len(chunk_id) < 4: | |
return -1 | |
chunk_size = file_object.read(4) | |
if len(chunk_size) < 4: | |
return -1 | |
chunk_size = struct.unpack('<L', chunk_size) | |
if iff_chunk_id == chunk_id: | |
return chunk_size[0] | |
else: | |
file_object.seek(chunk_size[0], 1) | |
def ReadWavFile(file_name, scale=True): | |
"""Read a .wav file into a numpy array. | |
Note: the FFmpeg based AudioDecoder is more generic, use this only as a | |
low-level alternative to AudioDecoder. | |
Note: does not support GFile, as the fromfile function from numpy can only | |
use native python file objects. | |
Args: | |
file_name: string. name of the local file to load. | |
scale: boolean. if True, returns float data in the [-1, 1] range instead | |
of integers. | |
Returns: | |
2-dimensional numpy array of size (num_samples, num_channels) | |
Raises: | |
AudioIoException: | |
- The file header is corrupted. | |
- The file uses an unsupported sampling rate, bitdepth or codec. | |
""" | |
f = file(file_name, 'rb') | |
header = f.read(12) | |
if len(header) < 12 or header[:4] != 'RIFF' or header[8:] != 'WAVE': | |
raise AudioIoException('Corrupted header') | |
format_header_size = _GoToIffChunk(f, 'fmt ') | |
if format_header_size < 0 or format_header_size != 16: | |
raise AudioIoException('Invalid header size') | |
format_header = _ReadBytesOrFail(f, 16, 'Corrupted header') | |
compression, num_channels, sample_rate, _, _, bitdepth = struct.unpack( | |
'<HHLLHH', format_header) | |
if compression != 1: | |
raise AudioIoException('Unknown .wav codec: %d' % compression) | |
if not num_channels: | |
raise AudioIoException('Wrong number of channels') | |
if sample_rate < 1000 or sample_rate > 96000: | |
raise AudioIoException('Invalid sample rate') | |
if bitdepth != 8 and bitdepth != 16: | |
raise AudioIoException('Unsupported bit depth') | |
sample_data_size = _GoToIffChunk(f, 'data') | |
num_samples = sample_data_size / (bitdepth / 8) | |
# Make sure we are reading a number of samples which is a multiple of the | |
# number of channels. Some corrupted stereo .wav files may contain 5 samples! | |
num_samples -= num_samples % num_channels | |
if bitdepth == 8: | |
samples = numpy.fromfile(f, dtype=numpy.ubyte, count=num_samples) | |
if scale: | |
samples = (samples / _UNSIGNED_CHAR_TO_FLOAT_SCALE) - 1.0 | |
else: | |
bytes = bitdepth / 8 | |
samples = numpy.fromfile(f, dtype='<i%d' % bytes, count=num_samples) | |
if scale: | |
# Semantics of x = x / y and x /= y are different when x and y are | |
# numpy arrays of a different type. x /= y casts to y's type, while | |
# x = x / y casts to x's type. | |
# pylint: disable-msg=C6407 | |
samples = samples / float(1 << (bitdepth - 1)) | |
return (samples.reshape(-1, num_channels), sample_rate) | |
def Quantize(signal, bitdepth, normalize=True): | |
"""Convert an array of float to an array of integers. | |
Args: | |
signal: numpy array. source signal. | |
bitdepth: int. size of the integer in bits. | |
normalize: boolean. whether samples should be scaled to use all the | |
available dynamic range. | |
Returns: | |
array of integers. | |
""" | |
norm = numpy.abs(signal).max() | |
# Normalization or clipping. | |
if normalize and norm > 0: | |
scaled_signal = signal / norm | |
else: | |
scaled_signal = copy.copy(signal) | |
if norm > 1.0: | |
logging.warning('Some samples will be clipped.') | |
# Clip samples above 1 and below -1. | |
scaled_signal[scaled_signal < -1] = -1 | |
scaled_signal[scaled_signal > 1] = 1 | |
if bitdepth == 8: | |
scaled_signal = (scaled_signal + 1.0) * _FLOAT_TO_UNSIGNED_CHAR_SCALE | |
scaled_signal = numpy.array(scaled_signal, dtype=numpy.uint8) | |
else: | |
scale = (1 << (bitdepth - 1)) - 1 | |
# pylint: disable-msg=C6407 | |
scaled_signal = scaled_signal * scale | |
scaled_signal = numpy.array(scaled_signal, dtype='i%d' % (bitdepth / 8)) | |
return scaled_signal | |
def WriteWavFile(signal, sample_rate, file_name, bitdepth=16, normalize=True): | |
"""Write a .wav file from a numpy array. | |
Note: does not support GFile, as the tofile method of numpy arrays can only | |
use native python file objects. | |
Args: | |
signal: 2-dimensional numpy array, of size (num_samples, num_channels). | |
sample_rate: int. sample rate of the signal in Hz. | |
file_name: string. name of the destination file. | |
bitdepth: int. bitdepth in bits (default 16). | |
normalize: boolean. if set to True, scale the data to the [-1, 1] range | |
before writing. | |
""" | |
if signal.dtype == numpy.uint8 or signal.dtype == numpy.int16: | |
bitdepth = signal.dtype.itemsize * 8 | |
scaled_signal = signal | |
else: | |
scaled_signal = Quantize(signal, bitdepth, normalize=normalize) | |
if scaled_signal.ndim == 1: | |
num_channels = 1 | |
else: | |
num_channels = scaled_signal.shape[1] | |
# Compute the total size of the output .wav file, minus the size of the | |
# first two fields of the RIFF header. | |
# RIFF Format. | |
total_size = _RIFF_FORMAT_DESCRIPTOR_SIZE | |
# 'fmt ' chunk. | |
total_size += _FMT_CHUNK_HEADER_SIZE + _FMT_CHUNK_DATA_SIZE | |
# 'data' chunk. | |
total_size += _DATA_CHUNK_HEADER_SIZE + scaled_signal.nbytes | |
f = file(file_name, 'wb') | |
try: | |
f.write('RIFF') | |
f.write(struct.pack('<L', total_size)) | |
f.write('WAVEfmt ') | |
bitrate = sample_rate * num_channels * (bitdepth / 8) | |
bits_per_sample = num_channels * (bitdepth / 8) | |
f.write(struct.pack('<LHHLLHH', 16, 1, num_channels, sample_rate, bitrate, | |
bits_per_sample, bitdepth)) | |
f.write('data') | |
f.write(struct.pack('<L', scaled_signal.nbytes)) | |
scaled_signal.tofile(f) | |
finally: | |
f.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import audio_io | |
import numpy | |
original_audio, _ = audio_io.ReadWavFile('a_music.wav') | |
original_audio = original_audio.ravel() | |
recorded_signal, _ = audio_io.ReadWavFile('b_music_in_room_and_cat.wav') | |
recorded_signal = recorded_signal.ravel() | |
# Truncate all signals same length, then pad to avoid boundary effects. | |
n = min( | |
recorded_signal.shape[0], | |
original_audio.shape[0]) | |
original_audio_padded = numpy.zeros((2 * n, )) | |
recorded_signal_padded = numpy.zeros((2 * n, )) | |
original_audio_padded[:n] = original_audio[:n] | |
recorded_signal_padded[:n] = recorded_signal[:n] | |
# Use cross-correlation to estimate the impulse response of the room | |
# and speakers. | |
A = numpy.fft.fft(original_audio_padded) | |
B = numpy.fft.fft(recorded_signal) | |
Raa = numpy.conj(A) * A | |
Rab = numpy.conj(A) * B | |
Hab = Rab / Raa | |
h_est = numpy.fft.ifft(Hab).real | |
# Truncate estimated impulse response | |
h_est = h_est[:7500] | |
audio_io.WriteWavFile(h_est, 22050, 'h_est.wav') | |
# Get the estimate of the original audio played through the room. | |
estimated_audio_in_room = numpy.convolve(original_audio, h_est) | |
# Do the subtraction. | |
estimated_ambient_sound = recorded_signal[:n] - estimated_audio_in_room[:n] | |
audio_io.WriteWavFile(estimated_ambient_sound, 22050, 'c_cat_estimated.wav') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment