Skip to content

Instantly share code, notes, and snippets.

@leoimpett
Last active August 1, 2024 08:23
Show Gist options
  • Save leoimpett/eaa66ababd7b5ea24bfe76ca4c4bc0e3 to your computer and use it in GitHub Desktop.
Save leoimpett/eaa66ababd7b5ea24bfe76ca4c4bc0e3 to your computer and use it in GitHub Desktop.
import numpy as np
import pyaudio
import wave
from flucoma import fluid
from scipy.io import wavfile
import simpleaudio as sa
from sklearn.neighbors import NearestNeighbors
import os
import random
import time
import logging
from collections import deque
# Leo Impett 2024
# Remember to start any terminal by adding the flucoma library to the PATH environment variable
# for instance export PATH=/Users/impett/Documentz/Code/dad_audio_lausanne/FluidCorpusManipulation/bin:$PATH
# this only has to be done once per terminal session (in the terminal in which you are running python)
# Tunable parameters
# -----------------
# Audio recording parameters
RATE = 44100 # Sample rate
CHUNK = 1024 # Buffer size for recording
# Segmentation and analysis parameters
MIN_SEGMENT_LENGTH = RATE # Minimum length of a segment (1 second)
MFCC_COEFFS = 13 # Number of MFCC coefficients to use. Default 13
MFCC_BANDS = 40 # Number of mel bands for MFCC. Default 40
# MEMORY MANAGEMENT PARAMETERS
MAX_SEGMENTS = 20 # Maximum number of segments to keep in memory. Default 100
MAX_OUTPUT_FILES = 10 # Maximum number of output files to keep. Default 10
# LISTENING PARAMETERS
MIN_RECORDING_LENGTH = 3 # Minimum recording length in seconds. Default 3
MAX_RECORDING_LENGTH = 20 # Maximum recording length in seconds. Default 20
REVERSE_CHANNELS = 0 # Reverse the channels. Default 0 (false). Turn it to 1 to reverse the channels
# Slicing parameters - using flucoma novelty slice
NOVELTY_THRESHOLD = 0.5 # Novelty threshold. Default 0.5
NOVELTY_FILTER = 1 # Novelty filter size. Default 1
NOVELTY_FFT = [1024, -1, -1] # FFT settings for novelty slice. Default [1024, -1, -1]
NOVELTY_KERNEL = 3 # Kernel size for novelty slice. Default 3
# For info see FluidCorpusManipulation/docs/fluid-noveltyslice.html
# Playback parameters
CROSSFADE_DURATION = 0.05 # Crossfade duration in seconds. Default 0.05
FADE_DURATION = 0.05 # Fade in/out duration in seconds. Default 0.05
# Chinese whispers parameters
MIN_CHAIN_LENGTH = 3 # Minimum chain length. Default 3
MAX_CHAIN_LENGTH = 10 # Maximum chain length. Default 10
# NB a range of 3-10 means a minimum of 3 seconds because each segments is at least 1 second long
# Probability of creating and playing concatenated audio
PLAY_PROBABILITY = 0.5
# Paths
audio_segments_path = './audio_segments'
output_audio_path = './audio_output'
os.makedirs(audio_segments_path, exist_ok=True)
os.makedirs(output_audio_path, exist_ok=True)
# delete the contents of audio_segments_path and audio_output_path
for file in os.listdir(audio_segments_path):
os.remove(os.path.join(audio_segments_path, file))
for file in os.listdir(output_audio_path):
os.remove(os.path.join(output_audio_path, file))
# FluCoMa temporary directory
FLUCOMA_TEMP_DIR = '~/.python-flucoma'
# Set up logging
# Uncomment the desired logging level and comment out the others
# logging_level = logging.INFO
# logging_level = logging.DEBUG
# logging_level = logging.WARNING
# logging_level = logging.ERROR
logging_level = logging.CRITICAL
log_file = 'audio_processing.log'
if os.path.exists(log_file):
os.remove(log_file)
logging.basicConfig(filename=log_file, level=logging_level,
format='%(asctime)s - %(levelname)s - %(message)s')
# Print available audio devices
p = pyaudio.PyAudio()
print("Available audio input devices:")
input_devices = []
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0:
input_devices.append(f"Index {i}: {device_info['name']}")
print(f"Index {i}: {device_info['name']}")
print("\nAvailable audio output devices:")
output_devices = []
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info['maxOutputChannels'] > 0:
output_devices.append(f"Index {i}: {device_info['name']}")
print(f"Index {i}: {device_info['name']}")
p.terminate()
print("\nNote: The script is currently set to use the default input and output devices.")
print("To change the input or output device, modify the 'record_audio' and 'play_audio_nonblocking' functions.")
# Initialize data structures
segment_data = []
nn_tree = NearestNeighbors(n_neighbors=(MAX_CHAIN_LENGTH+5), algorithm='ball_tree')
# Audio format for PyAudio
FORMAT = pyaudio.paInt16
def record_audio(duration):
"""
Record audio for a specified duration.
Args:
duration (float): Recording duration in seconds.
Returns:
tuple: (bytes: Recorded audio data for channel 1, float: (one minus) the average amplitude of channel 2)
"""
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=2, rate=RATE, input=True, frames_per_buffer=CHUNK)
frames_ch1 = []
ch2_amplitudes = []
for _ in range(0, int(RATE / CHUNK * duration)):
data = stream.read(CHUNK)
# Convert bytes to numpy array
audio_chunk = np.frombuffer(data, dtype=np.int16)
# Extract channel 1 data (even indices)
ch1_data = audio_chunk[0::2]
# Extract channel 2 data (odd indices) and calculate amplitude
ch2_data = audio_chunk[1::2]
# If REVERSE_CHANNELS=1 then swap the two channels
if REVERSE_CHANNELS:
ch1_data, ch2_data = ch2_data, ch1_data
frames_ch1.append(ch1_data.tobytes())
ch2_amplitudes.append(np.mean(np.abs(ch2_data)))
stream.stop_stream()
stream.close()
p.terminate()
# Combine all channel 1 frames
channel1 = b''.join(frames_ch1)
# Calculate average amplitude of channel 2
ch2_amplitude = np.mean(ch2_amplitudes)
ch2_amplitude = ch2_amplitude / 32767.5 # Normalize to 16-bit audio range
ch2_amplitude = 1.0 - ch2_amplitude
# Make sure this is always in the range 0 to 1 just in case things get weird
ch2_amplitude = np.clip(ch2_amplitude, 0, 1)
print("Amplitude control:", ch2_amplitude)
print("Duration:", duration)
return channel1, ch2_amplitude
def save_audio(audio_data, filename):
"""
Save audio data to a WAV file.
Args:
audio_data (bytes): Audio data to save.
filename (str): Path to save the WAV file.
"""
with wave.open(filename, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(pyaudio.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(audio_data)
# Initialize global variables
feature_sum = None
feature_count = 0
def analyze_audio(audio_path):
"""
Analyze audio file using MFCC and pitch, then calculate statistics.
Args:
audio_path (str): Path to the audio file.
Returns:
np.array: Feature vector of the audio, including MFCC and pitch statistics.
"""
global feature_sum, feature_count
mfcc = fluid.mfcc(audio_path, numcoeffs=MFCC_COEFFS, numbands=MFCC_BANDS)
pitch = fluid.pitch(audio_path)
feature = np.concatenate([np.mean(mfcc, axis=1), np.mean(pitch, axis=1)])
if feature_sum is None:
feature_sum = feature
else:
feature_sum += feature
feature_count += 1
# Avoid division by zero and handle NaN values
with np.errstate(divide='ignore', invalid='ignore'):
normalized_feature = np.where(
feature_sum != 0,
feature / (feature_sum / feature_count),
0
)
# Replace any remaining NaN or inf values with 0
normalized_feature = np.nan_to_num(normalized_feature, nan=0.0, posinf=0.0, neginf=0.0)
return normalized_feature
def crossfade(audio1, audio2, fade_length):
"""
Apply crossfade between two audio segments.
Args:
audio1, audio2 (np.array): Audio segments to crossfade.
fade_length (int): Length of the crossfade in samples.
Returns:
np.array: Crossfaded audio.
"""
audio1 = audio1.astype(np.float64)
audio2 = audio2.astype(np.float64)
fade_length = min(fade_length, len(audio1), len(audio2))
fade_in = np.linspace(0, 1, fade_length)
fade_out = np.linspace(1, 0, fade_length)
audio1[-fade_length:] *= fade_out
audio2[:fade_length] *= fade_in
result = np.concatenate([audio1[:-fade_length], audio1[-fade_length:] + audio2[:fade_length], audio2[fade_length:]])
# Normalize to prevent clipping
max_val = np.max(np.abs(result))
if max_val > 32767:
result = result * (32767 / max_val)
return result.astype(np.int16)
def apply_fade(audio, fade_length):
"""
Apply fade in and fade out to an audio segment.
Args:
audio (np.array): Audio segment to apply fades to.
fade_length (int): Length of the fade in samples.
Returns:
np.array: Audio with fades applied.
"""
# if fade_length is greater than the length of the audio, just return the audio
if fade_length > len(audio) // 2:
return audio
audio = audio.astype(np.float64)
fade_in = np.linspace(0, 1, fade_length)
fade_out = np.linspace(1, 0, fade_length)
audio[:fade_length] *= fade_in
audio[-fade_length:] *= fade_out
return audio
def play_audio_nonblocking(file_path):
"""
Play audio file without blocking the main thread.
Args:
file_path (str): Path to the audio file to play.
Returns:
simpleaudio.PlayObject or None: Play object if successful, None otherwise.
"""
if not os.path.exists(file_path):
logging.error(f"File {file_path} does not exist.")
return None
try:
wave_obj = sa.WaveObject.from_wave_file(file_path)
play_obj = wave_obj.play()
return play_obj
except Exception as e:
logging.error(f"Error playing audio file: {e}")
return None
def process_segments(audio_file, novelty_slice, min_length=MIN_SEGMENT_LENGTH):
"""
Process audio segments based on novelty slices and minimum length.
Args:
audio_file (str): Path to the audio file.
novelty_slice (list): List of novelty slice points.
min_length (int): Minimum length of a segment in samples.
Returns:
list: List of processed audio segments.
"""
with wave.open(audio_file, 'rb') as wf:
audio_array = np.frombuffer(wf.readframes(-1), dtype=np.int16)
# Create initial segments
segments = [audio_array[int(novelty_slice[i]):int(novelty_slice[i+1])] for i in range(1, len(novelty_slice) - 2)]
# Concatenate short segments
i = 0
while i < len(segments):
if len(segments[i]) < min_length:
if i > 0 and len(segments[i-1]) + len(segments[i]) < 2 * min_length:
segments[i-1] = np.concatenate((segments[i-1], segments[i]))
segments.pop(i)
elif i < len(segments) - 1:
segments[i] = np.concatenate((segments[i], segments[i+1]))
segments.pop(i+1)
else:
i += 1
else:
i += 1
return segments
def cleanup_files():
"""
Remove older output files and FluCoMa temporary files.
"""
# Clean up output files
output_files = sorted(
[f for f in os.listdir(output_audio_path) if f.startswith('output_') and f.endswith('.wav')],
key=lambda x: os.path.getmtime(os.path.join(output_audio_path, x)),
reverse=True
)
for old_file in output_files[MAX_OUTPUT_FILES:]:
try:
os.remove(os.path.join(output_audio_path, old_file))
logging.info(f"Removed old output file: {old_file}")
except Exception as e:
logging.error(f"Error removing old file {old_file}: {e}")
# Clean up FluCoMa temporary files
try:
for file in os.listdir(FLUCOMA_TEMP_DIR):
if file.endswith('.wav'):
os.remove(os.path.join(FLUCOMA_TEMP_DIR, file))
logging.info(f"Removed FluCoMa temporary file: {file}")
except Exception as e:
logging.error(f"Error cleaning up FluCoMa temporary files: {e}")
def save_and_analyze_segments(segments):
"""
Save segments to disk and analyze them, adding to the segment data.
Args:
segments (list): List of audio segments to process.
Returns:
int: Number of new samples processed.
"""
new_samples = 0
for segment in segments:
segment_file = f"{audio_segments_path}/segment_{len(segment_data)}.wav"
save_audio(segment.tobytes(), segment_file)
normalized_feature = analyze_audio(segment_file)
segment_data.append({'file': segment_file, 'feature': normalized_feature})
new_samples += 1
return new_samples
def chinese_whispers_process():
"""
Perform the Chinese Whispers process to create a chain of similar audio segments.
Returns:
list: List of file paths for the chain of audio segments.
"""
chain_length = random.randint(MIN_CHAIN_LENGTH, min(MAX_CHAIN_LENGTH, len(segment_data)))
chain = []
used_indices = set()
query_index = random.randint(0, len(segment_data) - 1)
while len(chain) < chain_length:
chain.append(segment_data[query_index]['file'])
used_indices.add(query_index)
distances, indices = nn_tree.kneighbors([segment_data[query_index]['feature']], n_neighbors=len(segment_data))
for idx in indices[0]:
if idx not in used_indices:
query_index = idx
break
else:
break # No unused neighbors found
return chain
def main():
global segment_data
"""
Main loop of the audio processing system.
"""
while True:
# 1. Record audio for a random duration
duration = random.uniform(MIN_RECORDING_LENGTH, MAX_RECORDING_LENGTH)
logging.info(f"Recording for {duration:.2f} seconds...")
print(f"Recording for {duration:.2f} seconds...")
audio_data, ch2_amplitude = record_audio(duration)
temp_file = 'temp_recording_buffer.wav'
save_audio(audio_data, temp_file)
# 2. Segment the audio
# If you want to add parameters to FLUCOMA Novelty Slice, you would do it here.
novelty_slice = fluid.noveltyslice(temp_file, threshold=NOVELTY_THRESHOLD, filtersize=NOVELTY_FILTER, fftsettings=NOVELTY_FFT, kernelsize=NOVELTY_KERNEL)
if len(novelty_slice) <= 3:
logging.info("Not enough segments, continuing...")
continue
# 3. Process middle segments
processed_segments = process_segments(temp_file, novelty_slice)
# Save and analyze new segments
new_samples = save_and_analyze_segments(processed_segments)
logging.info(f"Processed {new_samples} new segments. Total samples: {len(segment_data)}")
# Update the KD-tree with all features
if len(segment_data) > 3:
# if segment_data is > max_segments, remove the oldest segments
if len(segment_data) > MAX_SEGMENTS:
segment_data = segment_data[-MAX_SEGMENTS:]
features = [seg['feature'] for seg in segment_data]
nn_tree.fit(features)
# 4-5. Chinese whispers process (with 50% probability)
if random.random() < PLAY_PROBABILITY and len(segment_data) > 2:
chain = chinese_whispers_process()
logging.info(f"Created chain of length: {len(chain)}")
# 6. Concatenate with crossfade and apply overall fade
output_file = f'{output_audio_path}/output_{int(time.time())}.wav'
try:
concatenated_audio = None
for file in chain:
rate, audio = wavfile.read(file)
if concatenated_audio is None:
concatenated_audio = audio
else:
fade_length = int(CROSSFADE_DURATION * rate)
concatenated_audio = crossfade(concatenated_audio, audio, fade_length)
if concatenated_audio is not None:
overall_fade_length = int(FADE_DURATION * rate)
concatenated_audio = apply_fade(concatenated_audio, overall_fade_length)
# Scale the output volume based on channel 2 amplitude
scaling_factor = ch2_amplitude
concatenated_audio = (concatenated_audio * scaling_factor).astype(np.int16)
wavfile.write(output_file, rate, concatenated_audio.astype(np.int16))
logging.info(f"Saved audio to {output_file}")
# 7. Play the wav file
play_audio_nonblocking(output_file)
logging.info(f"Playing {output_file} in the background")
# Clean up old files
cleanup_files()
else:
logging.warning("No audio to concatenate")
except Exception as e:
logging.error(f"Error processing audio: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment