Last active
August 1, 2024 08:23
-
-
Save leoimpett/eaa66ababd7b5ea24bfe76ca4c4bc0e3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pyaudio | |
import wave | |
from flucoma import fluid | |
from scipy.io import wavfile | |
import simpleaudio as sa | |
from sklearn.neighbors import NearestNeighbors | |
import os | |
import random | |
import time | |
import logging | |
from collections import deque | |
# Leo Impett 2024 | |
# Remember to start any terminal by adding the flucoma library to the PATH environment variable | |
# for instance export PATH=/Users/impett/Documentz/Code/dad_audio_lausanne/FluidCorpusManipulation/bin:$PATH | |
# this only has to be done once per terminal session (in the terminal in which you are running python) | |
# Tunable parameters | |
# ----------------- | |
# Audio recording parameters | |
RATE = 44100 # Sample rate | |
CHUNK = 1024 # Buffer size for recording | |
# Segmentation and analysis parameters | |
MIN_SEGMENT_LENGTH = RATE # Minimum length of a segment (1 second) | |
MFCC_COEFFS = 13 # Number of MFCC coefficients to use. Default 13 | |
MFCC_BANDS = 40 # Number of mel bands for MFCC. Default 40 | |
# MEMORY MANAGEMENT PARAMETERS | |
MAX_SEGMENTS = 20 # Maximum number of segments to keep in memory. Default 100 | |
MAX_OUTPUT_FILES = 10 # Maximum number of output files to keep. Default 10 | |
# LISTENING PARAMETERS | |
MIN_RECORDING_LENGTH = 3 # Minimum recording length in seconds. Default 3 | |
MAX_RECORDING_LENGTH = 20 # Maximum recording length in seconds. Default 20 | |
REVERSE_CHANNELS = 0 # Reverse the channels. Default 0 (false). Turn it to 1 to reverse the channels | |
# Slicing parameters - using flucoma novelty slice | |
NOVELTY_THRESHOLD = 0.5 # Novelty threshold. Default 0.5 | |
NOVELTY_FILTER = 1 # Novelty filter size. Default 1 | |
NOVELTY_FFT = [1024, -1, -1] # FFT settings for novelty slice. Default [1024, -1, -1] | |
NOVELTY_KERNEL = 3 # Kernel size for novelty slice. Default 3 | |
# For info see FluidCorpusManipulation/docs/fluid-noveltyslice.html | |
# Playback parameters | |
CROSSFADE_DURATION = 0.05 # Crossfade duration in seconds. Default 0.05 | |
FADE_DURATION = 0.05 # Fade in/out duration in seconds. Default 0.05 | |
# Chinese whispers parameters | |
MIN_CHAIN_LENGTH = 3 # Minimum chain length. Default 3 | |
MAX_CHAIN_LENGTH = 10 # Maximum chain length. Default 10 | |
# NB a range of 3-10 means a minimum of 3 seconds because each segments is at least 1 second long | |
# Probability of creating and playing concatenated audio | |
PLAY_PROBABILITY = 0.5 | |
# Paths | |
audio_segments_path = './audio_segments' | |
output_audio_path = './audio_output' | |
os.makedirs(audio_segments_path, exist_ok=True) | |
os.makedirs(output_audio_path, exist_ok=True) | |
# delete the contents of audio_segments_path and audio_output_path | |
for file in os.listdir(audio_segments_path): | |
os.remove(os.path.join(audio_segments_path, file)) | |
for file in os.listdir(output_audio_path): | |
os.remove(os.path.join(output_audio_path, file)) | |
# FluCoMa temporary directory | |
FLUCOMA_TEMP_DIR = '~/.python-flucoma' | |
# Set up logging | |
# Uncomment the desired logging level and comment out the others | |
# logging_level = logging.INFO | |
# logging_level = logging.DEBUG | |
# logging_level = logging.WARNING | |
# logging_level = logging.ERROR | |
logging_level = logging.CRITICAL | |
log_file = 'audio_processing.log' | |
if os.path.exists(log_file): | |
os.remove(log_file) | |
logging.basicConfig(filename=log_file, level=logging_level, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
# Print available audio devices | |
p = pyaudio.PyAudio() | |
print("Available audio input devices:") | |
input_devices = [] | |
for i in range(p.get_device_count()): | |
device_info = p.get_device_info_by_index(i) | |
if device_info['maxInputChannels'] > 0: | |
input_devices.append(f"Index {i}: {device_info['name']}") | |
print(f"Index {i}: {device_info['name']}") | |
print("\nAvailable audio output devices:") | |
output_devices = [] | |
for i in range(p.get_device_count()): | |
device_info = p.get_device_info_by_index(i) | |
if device_info['maxOutputChannels'] > 0: | |
output_devices.append(f"Index {i}: {device_info['name']}") | |
print(f"Index {i}: {device_info['name']}") | |
p.terminate() | |
print("\nNote: The script is currently set to use the default input and output devices.") | |
print("To change the input or output device, modify the 'record_audio' and 'play_audio_nonblocking' functions.") | |
# Initialize data structures | |
segment_data = [] | |
nn_tree = NearestNeighbors(n_neighbors=(MAX_CHAIN_LENGTH+5), algorithm='ball_tree') | |
# Audio format for PyAudio | |
FORMAT = pyaudio.paInt16 | |
def record_audio(duration): | |
""" | |
Record audio for a specified duration. | |
Args: | |
duration (float): Recording duration in seconds. | |
Returns: | |
tuple: (bytes: Recorded audio data for channel 1, float: (one minus) the average amplitude of channel 2) | |
""" | |
p = pyaudio.PyAudio() | |
stream = p.open(format=FORMAT, channels=2, rate=RATE, input=True, frames_per_buffer=CHUNK) | |
frames_ch1 = [] | |
ch2_amplitudes = [] | |
for _ in range(0, int(RATE / CHUNK * duration)): | |
data = stream.read(CHUNK) | |
# Convert bytes to numpy array | |
audio_chunk = np.frombuffer(data, dtype=np.int16) | |
# Extract channel 1 data (even indices) | |
ch1_data = audio_chunk[0::2] | |
# Extract channel 2 data (odd indices) and calculate amplitude | |
ch2_data = audio_chunk[1::2] | |
# If REVERSE_CHANNELS=1 then swap the two channels | |
if REVERSE_CHANNELS: | |
ch1_data, ch2_data = ch2_data, ch1_data | |
frames_ch1.append(ch1_data.tobytes()) | |
ch2_amplitudes.append(np.mean(np.abs(ch2_data))) | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
# Combine all channel 1 frames | |
channel1 = b''.join(frames_ch1) | |
# Calculate average amplitude of channel 2 | |
ch2_amplitude = np.mean(ch2_amplitudes) | |
ch2_amplitude = ch2_amplitude / 32767.5 # Normalize to 16-bit audio range | |
ch2_amplitude = 1.0 - ch2_amplitude | |
# Make sure this is always in the range 0 to 1 just in case things get weird | |
ch2_amplitude = np.clip(ch2_amplitude, 0, 1) | |
print("Amplitude control:", ch2_amplitude) | |
print("Duration:", duration) | |
return channel1, ch2_amplitude | |
def save_audio(audio_data, filename): | |
""" | |
Save audio data to a WAV file. | |
Args: | |
audio_data (bytes): Audio data to save. | |
filename (str): Path to save the WAV file. | |
""" | |
with wave.open(filename, 'wb') as wf: | |
wf.setnchannels(1) | |
wf.setsampwidth(pyaudio.get_sample_size(FORMAT)) | |
wf.setframerate(RATE) | |
wf.writeframes(audio_data) | |
# Initialize global variables | |
feature_sum = None | |
feature_count = 0 | |
def analyze_audio(audio_path): | |
""" | |
Analyze audio file using MFCC and pitch, then calculate statistics. | |
Args: | |
audio_path (str): Path to the audio file. | |
Returns: | |
np.array: Feature vector of the audio, including MFCC and pitch statistics. | |
""" | |
global feature_sum, feature_count | |
mfcc = fluid.mfcc(audio_path, numcoeffs=MFCC_COEFFS, numbands=MFCC_BANDS) | |
pitch = fluid.pitch(audio_path) | |
feature = np.concatenate([np.mean(mfcc, axis=1), np.mean(pitch, axis=1)]) | |
if feature_sum is None: | |
feature_sum = feature | |
else: | |
feature_sum += feature | |
feature_count += 1 | |
# Avoid division by zero and handle NaN values | |
with np.errstate(divide='ignore', invalid='ignore'): | |
normalized_feature = np.where( | |
feature_sum != 0, | |
feature / (feature_sum / feature_count), | |
0 | |
) | |
# Replace any remaining NaN or inf values with 0 | |
normalized_feature = np.nan_to_num(normalized_feature, nan=0.0, posinf=0.0, neginf=0.0) | |
return normalized_feature | |
def crossfade(audio1, audio2, fade_length): | |
""" | |
Apply crossfade between two audio segments. | |
Args: | |
audio1, audio2 (np.array): Audio segments to crossfade. | |
fade_length (int): Length of the crossfade in samples. | |
Returns: | |
np.array: Crossfaded audio. | |
""" | |
audio1 = audio1.astype(np.float64) | |
audio2 = audio2.astype(np.float64) | |
fade_length = min(fade_length, len(audio1), len(audio2)) | |
fade_in = np.linspace(0, 1, fade_length) | |
fade_out = np.linspace(1, 0, fade_length) | |
audio1[-fade_length:] *= fade_out | |
audio2[:fade_length] *= fade_in | |
result = np.concatenate([audio1[:-fade_length], audio1[-fade_length:] + audio2[:fade_length], audio2[fade_length:]]) | |
# Normalize to prevent clipping | |
max_val = np.max(np.abs(result)) | |
if max_val > 32767: | |
result = result * (32767 / max_val) | |
return result.astype(np.int16) | |
def apply_fade(audio, fade_length): | |
""" | |
Apply fade in and fade out to an audio segment. | |
Args: | |
audio (np.array): Audio segment to apply fades to. | |
fade_length (int): Length of the fade in samples. | |
Returns: | |
np.array: Audio with fades applied. | |
""" | |
# if fade_length is greater than the length of the audio, just return the audio | |
if fade_length > len(audio) // 2: | |
return audio | |
audio = audio.astype(np.float64) | |
fade_in = np.linspace(0, 1, fade_length) | |
fade_out = np.linspace(1, 0, fade_length) | |
audio[:fade_length] *= fade_in | |
audio[-fade_length:] *= fade_out | |
return audio | |
def play_audio_nonblocking(file_path): | |
""" | |
Play audio file without blocking the main thread. | |
Args: | |
file_path (str): Path to the audio file to play. | |
Returns: | |
simpleaudio.PlayObject or None: Play object if successful, None otherwise. | |
""" | |
if not os.path.exists(file_path): | |
logging.error(f"File {file_path} does not exist.") | |
return None | |
try: | |
wave_obj = sa.WaveObject.from_wave_file(file_path) | |
play_obj = wave_obj.play() | |
return play_obj | |
except Exception as e: | |
logging.error(f"Error playing audio file: {e}") | |
return None | |
def process_segments(audio_file, novelty_slice, min_length=MIN_SEGMENT_LENGTH): | |
""" | |
Process audio segments based on novelty slices and minimum length. | |
Args: | |
audio_file (str): Path to the audio file. | |
novelty_slice (list): List of novelty slice points. | |
min_length (int): Minimum length of a segment in samples. | |
Returns: | |
list: List of processed audio segments. | |
""" | |
with wave.open(audio_file, 'rb') as wf: | |
audio_array = np.frombuffer(wf.readframes(-1), dtype=np.int16) | |
# Create initial segments | |
segments = [audio_array[int(novelty_slice[i]):int(novelty_slice[i+1])] for i in range(1, len(novelty_slice) - 2)] | |
# Concatenate short segments | |
i = 0 | |
while i < len(segments): | |
if len(segments[i]) < min_length: | |
if i > 0 and len(segments[i-1]) + len(segments[i]) < 2 * min_length: | |
segments[i-1] = np.concatenate((segments[i-1], segments[i])) | |
segments.pop(i) | |
elif i < len(segments) - 1: | |
segments[i] = np.concatenate((segments[i], segments[i+1])) | |
segments.pop(i+1) | |
else: | |
i += 1 | |
else: | |
i += 1 | |
return segments | |
def cleanup_files(): | |
""" | |
Remove older output files and FluCoMa temporary files. | |
""" | |
# Clean up output files | |
output_files = sorted( | |
[f for f in os.listdir(output_audio_path) if f.startswith('output_') and f.endswith('.wav')], | |
key=lambda x: os.path.getmtime(os.path.join(output_audio_path, x)), | |
reverse=True | |
) | |
for old_file in output_files[MAX_OUTPUT_FILES:]: | |
try: | |
os.remove(os.path.join(output_audio_path, old_file)) | |
logging.info(f"Removed old output file: {old_file}") | |
except Exception as e: | |
logging.error(f"Error removing old file {old_file}: {e}") | |
# Clean up FluCoMa temporary files | |
try: | |
for file in os.listdir(FLUCOMA_TEMP_DIR): | |
if file.endswith('.wav'): | |
os.remove(os.path.join(FLUCOMA_TEMP_DIR, file)) | |
logging.info(f"Removed FluCoMa temporary file: {file}") | |
except Exception as e: | |
logging.error(f"Error cleaning up FluCoMa temporary files: {e}") | |
def save_and_analyze_segments(segments): | |
""" | |
Save segments to disk and analyze them, adding to the segment data. | |
Args: | |
segments (list): List of audio segments to process. | |
Returns: | |
int: Number of new samples processed. | |
""" | |
new_samples = 0 | |
for segment in segments: | |
segment_file = f"{audio_segments_path}/segment_{len(segment_data)}.wav" | |
save_audio(segment.tobytes(), segment_file) | |
normalized_feature = analyze_audio(segment_file) | |
segment_data.append({'file': segment_file, 'feature': normalized_feature}) | |
new_samples += 1 | |
return new_samples | |
def chinese_whispers_process(): | |
""" | |
Perform the Chinese Whispers process to create a chain of similar audio segments. | |
Returns: | |
list: List of file paths for the chain of audio segments. | |
""" | |
chain_length = random.randint(MIN_CHAIN_LENGTH, min(MAX_CHAIN_LENGTH, len(segment_data))) | |
chain = [] | |
used_indices = set() | |
query_index = random.randint(0, len(segment_data) - 1) | |
while len(chain) < chain_length: | |
chain.append(segment_data[query_index]['file']) | |
used_indices.add(query_index) | |
distances, indices = nn_tree.kneighbors([segment_data[query_index]['feature']], n_neighbors=len(segment_data)) | |
for idx in indices[0]: | |
if idx not in used_indices: | |
query_index = idx | |
break | |
else: | |
break # No unused neighbors found | |
return chain | |
def main(): | |
global segment_data | |
""" | |
Main loop of the audio processing system. | |
""" | |
while True: | |
# 1. Record audio for a random duration | |
duration = random.uniform(MIN_RECORDING_LENGTH, MAX_RECORDING_LENGTH) | |
logging.info(f"Recording for {duration:.2f} seconds...") | |
print(f"Recording for {duration:.2f} seconds...") | |
audio_data, ch2_amplitude = record_audio(duration) | |
temp_file = 'temp_recording_buffer.wav' | |
save_audio(audio_data, temp_file) | |
# 2. Segment the audio | |
# If you want to add parameters to FLUCOMA Novelty Slice, you would do it here. | |
novelty_slice = fluid.noveltyslice(temp_file, threshold=NOVELTY_THRESHOLD, filtersize=NOVELTY_FILTER, fftsettings=NOVELTY_FFT, kernelsize=NOVELTY_KERNEL) | |
if len(novelty_slice) <= 3: | |
logging.info("Not enough segments, continuing...") | |
continue | |
# 3. Process middle segments | |
processed_segments = process_segments(temp_file, novelty_slice) | |
# Save and analyze new segments | |
new_samples = save_and_analyze_segments(processed_segments) | |
logging.info(f"Processed {new_samples} new segments. Total samples: {len(segment_data)}") | |
# Update the KD-tree with all features | |
if len(segment_data) > 3: | |
# if segment_data is > max_segments, remove the oldest segments | |
if len(segment_data) > MAX_SEGMENTS: | |
segment_data = segment_data[-MAX_SEGMENTS:] | |
features = [seg['feature'] for seg in segment_data] | |
nn_tree.fit(features) | |
# 4-5. Chinese whispers process (with 50% probability) | |
if random.random() < PLAY_PROBABILITY and len(segment_data) > 2: | |
chain = chinese_whispers_process() | |
logging.info(f"Created chain of length: {len(chain)}") | |
# 6. Concatenate with crossfade and apply overall fade | |
output_file = f'{output_audio_path}/output_{int(time.time())}.wav' | |
try: | |
concatenated_audio = None | |
for file in chain: | |
rate, audio = wavfile.read(file) | |
if concatenated_audio is None: | |
concatenated_audio = audio | |
else: | |
fade_length = int(CROSSFADE_DURATION * rate) | |
concatenated_audio = crossfade(concatenated_audio, audio, fade_length) | |
if concatenated_audio is not None: | |
overall_fade_length = int(FADE_DURATION * rate) | |
concatenated_audio = apply_fade(concatenated_audio, overall_fade_length) | |
# Scale the output volume based on channel 2 amplitude | |
scaling_factor = ch2_amplitude | |
concatenated_audio = (concatenated_audio * scaling_factor).astype(np.int16) | |
wavfile.write(output_file, rate, concatenated_audio.astype(np.int16)) | |
logging.info(f"Saved audio to {output_file}") | |
# 7. Play the wav file | |
play_audio_nonblocking(output_file) | |
logging.info(f"Playing {output_file} in the background") | |
# Clean up old files | |
cleanup_files() | |
else: | |
logging.warning("No audio to concatenate") | |
except Exception as e: | |
logging.error(f"Error processing audio: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment