Last active
August 19, 2024 11:48
-
-
Save leoimpett/2864d33549b90381e1e74bf1a92bdf67 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
print("Importing libraries...") | |
import numpy as np | |
import pyaudio | |
import wave | |
from flucoma import fluid | |
from scipy.io import wavfile | |
import simpleaudio as sa | |
from sklearn.neighbors import NearestNeighbors | |
import os | |
import random | |
import time | |
import logging | |
from sklearn.exceptions import NotFittedError | |
import threading | |
import time | |
from contextlib import contextmanager | |
import pickle, sys, subprocess, signal | |
print("Imported libraries") | |
# Leo Impett 2024 | |
# Remember to start any terminal by adding the flucoma library to the PATH environment variable | |
# for instance export PATH=/Users/impett/Documentz/Code/dad_audio_lausanne/FluidCorpusManipulation/bin:$PATH | |
# this only has to be done once per terminal session (in the terminal in which you are running python) | |
# Tunable parameters | |
# ----------------- | |
# Audio recording parameters | |
RATE = 44100 # Sample rate | |
CHUNK = 1024 # Buffer size for recording | |
# Segmentation and analysis parameters | |
MIN_SEGMENT_LENGTH = RATE # Minimum length of a segment (1 second) | |
MFCC_COEFFS = 13 # Number of MFCC coefficients to use. Default 13 | |
MFCC_BANDS = 40 # Number of mel bands for MFCC. Default 40 | |
# MEMORY MANAGEMENT PARAMETERS | |
MAX_SEGMENTS = 20 # Maximum number of segments to keep in memory. Default 100 | |
MAX_OUTPUT_FILES = 10 # Maximum number of output files to keep. Default 10 | |
# LISTENING PARAMETERS | |
MIN_RECORDING_LENGTH = 3 # Minimum recording length in seconds. Default 3 | |
MAX_RECORDING_LENGTH = 6 # Maximum recording length in seconds. Default 20 | |
REVERSE_CHANNELS = 0 # Reverse the channels. Default 0 (false). Turn it to 1 to reverse the channels | |
SIMILARITY_THRESHOLD = 0.1 # Similarity threshold for segment comparison. Default 0.1. 0 means only identical segments are considered similar, | |
# 1 means all segments are considered similar. | |
# Slicing parameters - using flucoma novelty slice | |
NOVELTY_THRESHOLD = 0.5 # Novelty threshold. Default 0.5 | |
NOVELTY_FILTER = 1 # Novelty filter size. Default 1 | |
NOVELTY_FFT = [1024, -1, -1] # FFT settings for novelty slice. Default [1024, -1, -1] | |
NOVELTY_KERNEL = 3 # Kernel size for novelty slice. Default 3 | |
# For info see FluidCorpusManipulation/docs/fluid-noveltyslice.html | |
# Playback parameters | |
CROSSFADE_DURATION = 0.05 # Crossfade duration in seconds. Default 0.05 | |
FADE_DURATION = 0.05 # Fade in/out duration in seconds. Default 0.05 | |
# Chinese whispers parameters | |
MIN_CHAIN_LENGTH = 3 # Minimum chain length. Default 3 | |
MAX_CHAIN_LENGTH = 10 # Maximum chain length. Default 10 | |
# NB a range of 3-10 means a minimum of 3 seconds because each segments is at least 1 second long | |
# Probability of creating and playing concatenated audio | |
PLAY_PROBABILITY = 0.5 | |
segment_counter = 0 | |
# Paths | |
audio_segments_path = './audio_segments' | |
output_audio_path = './audio_output' | |
os.makedirs(audio_segments_path, exist_ok=True) | |
os.makedirs(output_audio_path, exist_ok=True) | |
# We now delete past outputs but not inputs | |
for file in os.listdir(output_audio_path): | |
os.remove(os.path.join(output_audio_path, file)) | |
# FluCoMa temporary directory | |
home_dir = os.getenv('HOME') | |
FLUCOMA_TEMP_DIR = os.path.join(home_dir, '.python-flucoma/') | |
# Set up logging | |
# Uncomment the desired logging level and comment out the others | |
logging_level = logging.INFO | |
# logging_level = logging.DEBUG | |
# logging_level = logging.WARNING | |
# logging_level = logging.ERROR | |
# logging_level = logging.CRITICAL | |
log_file = 'audio_processing.log' | |
if os.path.exists(log_file): | |
os.remove(log_file) | |
logging.basicConfig(filename=log_file, level=logging_level, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
# segment logger | |
segment_logger = logging.getLogger('segment_logger') | |
segment_logger.setLevel(logging.INFO) | |
segment_handler = logging.FileHandler('segment_log.txt') | |
segment_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) | |
segment_logger.addHandler(segment_handler) | |
print("Starting PyAudio") | |
# Print available audio devices | |
p = pyaudio.PyAudio() | |
print("Available audio input devices:") | |
input_devices = [] | |
for i in range(p.get_device_count()): | |
device_info = p.get_device_info_by_index(i) | |
if device_info['maxInputChannels'] > 0: | |
input_devices.append(f"Index {i}: {device_info['name']}") | |
print(f"Index {i}: {device_info['name']}") | |
print("\nAvailable audio output devices:") | |
output_devices = [] | |
for i in range(p.get_device_count()): | |
device_info = p.get_device_info_by_index(i) | |
if device_info['maxOutputChannels'] > 0: | |
output_devices.append(f"Index {i}: {device_info['name']}") | |
print(f"Index {i}: {device_info['name']}") | |
p.terminate() | |
print("\nNote: The script is currently set to use the default input and output devices.") | |
print("To change the input or output device, modify the 'record_audio' and 'play_audio_nonblocking' functions.") | |
# Audio format for PyAudio | |
FORMAT = pyaudio.paInt16 | |
# Global variables for state | |
global_state = { | |
'segment_data': [], | |
'nn_tree': None, | |
'segment_counter': 0, | |
'feature_sum': None, | |
'feature_count': 0 | |
} | |
STATE_FILE = 'chirp_state.pkl' | |
def save_state(): | |
print("Saving state") | |
with open(STATE_FILE, 'wb') as f: | |
pickle.dump(global_state, f) | |
def load_state(): | |
global global_state | |
try: | |
with open(STATE_FILE, 'rb') as f: | |
global_state = pickle.load(f) | |
print("Loaded previous state") | |
except FileNotFoundError: | |
print("No previous state found. Starting fresh.") | |
def restart_coreaudio(): | |
subprocess.run(["sudo", "killall", "coreaudiod"]) | |
time.sleep(5) # Wait for coreaudio to restart | |
def graceful_exit(signum, frame): | |
print("Received signal to exit. Saving state and exiting...") | |
save_state() | |
sys.exit(0) | |
# Register the signal handler | |
signal.signal(signal.SIGTERM, graceful_exit) | |
signal.signal(signal.SIGINT, graceful_exit) | |
# PyAudio context manager | |
@contextmanager | |
def pyaudio_context(): | |
p = pyaudio.PyAudio() | |
try: | |
yield p | |
finally: | |
p.terminate() | |
def record_audio(duration): | |
""" | |
Record audio for a specified duration. | |
Args: | |
duration (float): Recording duration in seconds. | |
Returns: | |
tuple: (bytes: Recorded audio data for channel 1, float: (one minus) the average amplitude of channel 2 or 1.0 if only one channel is detected) | |
""" | |
frames_ch1 = [] | |
ch2_amplitudes = [] | |
recording_complete = threading.Event() | |
recording_error = threading.Event() | |
def record_thread(): | |
try: | |
with pyaudio_context() as p: | |
default_input = p.get_default_input_device_info() | |
channels = min(default_input['maxInputChannels'], 2) | |
stream = p.open(format=FORMAT, channels=channels, rate=RATE, input=True, frames_per_buffer=CHUNK) | |
start_time = time.time() | |
while time.time() - start_time < duration: | |
try: | |
data = stream.read(CHUNK, exception_on_overflow=False) | |
audio_chunk = np.frombuffer(data, dtype=np.int16) | |
if channels == 2: | |
ch1_data = audio_chunk[0::2] | |
ch2_data = audio_chunk[1::2] | |
if REVERSE_CHANNELS: | |
ch1_data, ch2_data = ch2_data, ch1_data | |
frames_ch1.append(ch1_data.tobytes()) | |
ch2_amplitudes.append(np.mean(np.abs(ch2_data))) | |
else: | |
frames_ch1.append(audio_chunk.tobytes()) | |
except IOError as e: | |
logging.warning(f"IOError during recording: {e}") | |
# wait for 2 seconds before trying again | |
time.sleep(2) | |
continue | |
stream.stop_stream() | |
stream.close() | |
except Exception as e: | |
logging.error(f"Error in recording thread: {e}") | |
recording_error.set() | |
# wait for 2 seconds before trying again | |
time.sleep(2) | |
finally: | |
recording_complete.set() | |
record_thread = threading.Thread(target=record_thread) | |
record_thread.start() | |
# Wait for the recording to complete or timeout | |
if not recording_complete.wait(timeout=duration + 5): # 5 second grace period | |
logging.error("Recording timed out") | |
recording_error.set() | |
# return None, None | |
if recording_error.is_set(): | |
print("Error in recording thread. Saving state and exiting...") | |
save_state() | |
# sys.exit(2) | |
print("Bye! Please restart me") | |
os._exit(2) # Exit the script immediately | |
channel1 = b''.join(frames_ch1) | |
if not channel1: | |
logging.error("No audio data captured") | |
return None, 1.0 # Return None for audio data and default amplitude | |
if len(ch2_amplitudes) > 0: | |
ch2_amplitude = np.mean(ch2_amplitudes) | |
ch2_amplitude = ch2_amplitude / 32767.5 | |
ch2_amplitude = 1.0 - ch2_amplitude | |
ch2_amplitude = np.clip(ch2_amplitude, 0, 1) | |
else: | |
ch2_amplitude = 1.00 | |
print(f"Detected {2 if len(ch2_amplitudes) > 0 else 1} channel(s)") | |
print("Amplitude control:", ch2_amplitude) | |
print("Duration:", duration) | |
return channel1, ch2_amplitude | |
def save_audio(audio_data, filename): | |
""" | |
Save audio data to a WAV file. | |
Args: | |
audio_data (bytes): Audio data to save. | |
filename (str): Path to save the WAV file. | |
""" | |
if audio_data is None: | |
logging.error(f"Cannot save audio to {filename}: No audio data") | |
return | |
with wave.open(filename, 'wb') as wf: | |
wf.setnchannels(1) | |
wf.setsampwidth(pyaudio.get_sample_size(FORMAT)) | |
wf.setframerate(RATE) | |
wf.writeframes(audio_data) | |
# Initialize global variables | |
feature_sum = None | |
feature_count = 0 | |
def analyze_audio(audio_path): | |
""" | |
Analyze audio file using MFCC and pitch, then calculate statistics. | |
Args: | |
audio_path (str): Path to the audio file. | |
Returns: | |
np.array: Feature vector of the audio, including MFCC and pitch statistics. | |
""" | |
global feature_sum, feature_count | |
mfcc = fluid.mfcc(audio_path, numcoeffs=MFCC_COEFFS, numbands=MFCC_BANDS) | |
pitch = fluid.pitch(audio_path) | |
feature = np.concatenate([np.mean(mfcc, axis=1), np.mean(pitch, axis=1)]) | |
if feature_sum is None: | |
feature_sum = feature | |
else: | |
feature_sum += feature | |
feature_count += 1 | |
# Avoid division by zero and handle NaN values | |
with np.errstate(divide='ignore', invalid='ignore'): | |
normalized_feature = np.where( | |
feature_sum != 0, | |
feature / (feature_sum / feature_count), | |
0 | |
) | |
# Replace any remaining NaN or inf values with 0 | |
normalized_feature = np.nan_to_num(normalized_feature, nan=0.0, posinf=0.0, neginf=0.0) | |
return normalized_feature | |
def crossfade(audio1, audio2, fade_length): | |
""" | |
Apply crossfade between two audio segments. | |
Args: | |
audio1, audio2 (np.array): Audio segments to crossfade. | |
fade_length (int): Length of the crossfade in samples. | |
Returns: | |
np.array: Crossfaded audio. | |
""" | |
audio1 = audio1.astype(np.float64) | |
audio2 = audio2.astype(np.float64) | |
fade_length = min(fade_length, len(audio1), len(audio2)) | |
fade_in = np.linspace(0, 1, fade_length) | |
fade_out = np.linspace(1, 0, fade_length) | |
audio1[-fade_length:] *= fade_out | |
audio2[:fade_length] *= fade_in | |
result = np.concatenate([audio1[:-fade_length], audio1[-fade_length:] + audio2[:fade_length], audio2[fade_length:]]) | |
# Normalize to prevent clipping | |
max_val = np.max(np.abs(result)) | |
if max_val > 32767: | |
result = result * (32767 / max_val) | |
return result.astype(np.int16) | |
def apply_fade(audio, fade_length): | |
""" | |
Apply fade in and fade out to an audio segment. | |
Args: | |
audio (np.array): Audio segment to apply fades to. | |
fade_length (int): Length of the fade in samples. | |
Returns: | |
np.array: Audio with fades applied. | |
""" | |
# if fade_length is greater than the length of the audio, just return the audio | |
if fade_length > len(audio) // 2: | |
return audio | |
audio = audio.astype(np.float64) | |
fade_in = np.linspace(0, 1, fade_length) | |
fade_out = np.linspace(1, 0, fade_length) | |
audio[:fade_length] *= fade_in | |
audio[-fade_length:] *= fade_out | |
return audio | |
def play_audio_nonblocking(file_path): | |
""" | |
Play audio file without blocking the main thread. | |
Args: | |
file_path (str): Path to the audio file to play. | |
Returns: | |
simpleaudio.PlayObject or None: Play object if successful, None otherwise. | |
""" | |
if not os.path.exists(file_path): | |
logging.error(f"File {file_path} does not exist.") | |
return None | |
try: | |
wave_obj = sa.WaveObject.from_wave_file(file_path) | |
play_obj = wave_obj.play() | |
return play_obj | |
except Exception as e: | |
logging.error(f"Error playing audio file: {e}") | |
return None | |
def process_segments(audio_file, novelty_slice, min_length=MIN_SEGMENT_LENGTH): | |
""" | |
Process audio segments based on novelty slices and minimum length. | |
Args: | |
audio_file (str): Path to the audio file. | |
novelty_slice (list): List of novelty slice points. | |
min_length (int): Minimum length of a segment in samples. | |
Returns: | |
list: List of processed audio segments. | |
""" | |
with wave.open(audio_file, 'rb') as wf: | |
audio_array = np.frombuffer(wf.readframes(-1), dtype=np.int16) | |
# Create initial segments | |
segments = [audio_array[int(novelty_slice[i]):int(novelty_slice[i+1])] for i in range(1, len(novelty_slice) - 2)] | |
# Concatenate short segments | |
i = 0 | |
while i < len(segments): | |
if len(segments[i]) < min_length: | |
if i > 0 and len(segments[i-1]) + len(segments[i]) < 2 * min_length: | |
segments[i-1] = np.concatenate((segments[i-1], segments[i])) | |
segments.pop(i) | |
elif i < len(segments) - 1: | |
segments[i] = np.concatenate((segments[i], segments[i+1])) | |
segments.pop(i+1) | |
else: | |
i += 1 | |
else: | |
i += 1 | |
return segments | |
def cleanup_files(): | |
""" | |
Remove older output files and FluCoMa temporary files. | |
""" | |
# Clean up output files | |
output_files = sorted( | |
[f for f in os.listdir(output_audio_path) if f.startswith('output_') and f.endswith('.wav')], | |
key=lambda x: os.path.getmtime(os.path.join(output_audio_path, x)), | |
reverse=True | |
) | |
for old_file in output_files[MAX_OUTPUT_FILES:]: | |
try: | |
os.remove(os.path.join(output_audio_path, old_file)) | |
logging.info(f"Removed old output file: {old_file}") | |
except Exception as e: | |
logging.error(f"Error removing old file {old_file}: {e}") | |
# Clean up FluCoMa temporary files | |
try: | |
for file in os.listdir(FLUCOMA_TEMP_DIR): | |
if file.endswith('.wav'): | |
os.remove(os.path.join(FLUCOMA_TEMP_DIR, file)) | |
logging.info(f"Removed FluCoMa temporary file: {file}") | |
except Exception as e: | |
logging.error(f"Error cleaning up FluCoMa temporary files: {e}") | |
def save_and_analyze_segments(segments): | |
global nn_tree, segment_counter # Add segment_counter as a global variable | |
new_samples = 0 | |
for segment in segments: | |
segment_file = f"{audio_segments_path}/segment_{segment_counter}.wav" | |
save_audio(segment.tobytes(), segment_file) | |
normalized_feature = analyze_audio(segment_file) | |
# Check for similar existing segments | |
if len(segment_data) > 0: | |
try: | |
distances, indices = nn_tree.kneighbors([normalized_feature], n_neighbors=1) | |
if normalize_distance(distances[0][0]) < SIMILARITY_THRESHOLD: | |
continue # Skip this segment if it's too similar to an existing one | |
except NotFittedError: | |
# If nn_tree is not fitted yet, we can't check for similarity | |
pass | |
segment_data.append({'file': segment_file, 'feature': normalized_feature, 'id': segment_counter}) | |
segment_counter += 1 | |
new_samples += 1 | |
# Refit the nn_tree with the updated segment_data | |
if len(segment_data) > 1: # We need at least 2 points to fit the tree | |
features = [seg['feature'] for seg in segment_data] | |
nn_tree.fit(features) | |
return new_samples | |
def normalize_distance(distance): | |
max_distance = np.sqrt(len(segment_data[0]['feature'])) # Maximum possible Euclidean distance for normalized features | |
return distance / max_distance | |
def chinese_whispers_process(): | |
""" | |
Perform the Chinese Whispers process to create a chain of similar audio segments. | |
Returns: | |
list: List of dictionaries containing file paths and indices for the chain of audio segments. | |
""" | |
chain_length = random.randint(MIN_CHAIN_LENGTH, min(MAX_CHAIN_LENGTH, len(segment_data))) | |
chain = [] | |
used_ids = set() | |
query_index = random.randint(0, len(segment_data) - 1) | |
while len(chain) < chain_length: | |
current_id = segment_data[query_index]['id'] | |
if current_id not in used_ids: | |
chain.append({'file': segment_data[query_index]['file'], 'index': current_id}) | |
used_ids.add(current_id) | |
distances, indices = nn_tree.kneighbors([segment_data[query_index]['feature']], n_neighbors=len(segment_data)) | |
for idx in indices[0]: | |
if segment_data[idx]['id'] not in used_ids: | |
query_index = idx | |
break | |
else: | |
break # No unused neighbors found | |
return chain | |
def main(): | |
global segment_data, nn_tree, segment_counter, feature_sum, feature_count | |
# Load state at the beginning | |
load_state() | |
segment_data = global_state['segment_data'] | |
nn_tree = global_state['nn_tree'] | |
segment_counter = global_state['segment_counter'] | |
feature_sum = global_state['feature_sum'] | |
feature_count = global_state['feature_count'] | |
# Initialize nn_tree if it's None | |
if nn_tree is None: | |
nn_tree = NearestNeighbors(n_neighbors=(MAX_CHAIN_LENGTH+5), algorithm='ball_tree') | |
""" | |
Main loop of the audio processing system. | |
""" | |
while True: | |
global_state.update({ | |
'segment_data': segment_data, | |
'nn_tree': nn_tree, | |
'segment_counter': segment_counter, | |
'feature_sum': feature_sum, | |
'feature_count': feature_count | |
}) | |
save_state() | |
# 1. Record audio for a random duration | |
duration = random.uniform(MIN_RECORDING_LENGTH, MAX_RECORDING_LENGTH) | |
logging.info(f"Recording for {duration:.2f} seconds...") | |
print(f"Recording for {duration:.2f} seconds...") | |
audio_data, ch2_amplitude = record_audio(duration) | |
if audio_data is None: | |
logging.warning("Failed to record audio, skipping this iteration") | |
continue | |
temp_file = 'temp_recording_buffer.wav' | |
save_audio(audio_data, temp_file) | |
# 2. Segment the audio | |
# If you want to add parameters to FLUCOMA Novelty Slice, you would do it here. | |
logging.info("Segmenting audio (sending to Flucoma)...") | |
novelty_slice = fluid.noveltyslice(temp_file, threshold=NOVELTY_THRESHOLD, filtersize=NOVELTY_FILTER, fftsettings=NOVELTY_FFT, kernelsize=NOVELTY_KERNEL) | |
logging.info("FluCoMa novelty slice complete") | |
if len(novelty_slice) <= 3: | |
logging.info("Not enough segments, continuing...") | |
continue | |
# 3. Process middle segments | |
logging.info("Processing segments...") | |
processed_segments = process_segments(temp_file, novelty_slice) | |
logging.info(f"Saving and analysing segments") | |
# Save and analyze new segments | |
new_samples = save_and_analyze_segments(processed_segments) | |
logging.info(f"Processed {new_samples} new segments. Total samples: {len(segment_data)}") | |
# Update the KD-tree with all features | |
if len(segment_data) > 3: | |
# if segment_data is > max_segments, remove the oldest segments | |
if len(segment_data) > MAX_SEGMENTS: | |
segment_data = segment_data[-MAX_SEGMENTS:] | |
features = [seg['feature'] for seg in segment_data] | |
nn_tree.fit(features) | |
# 4-5. Chinese whispers process (with 50% probability) | |
if random.random() < PLAY_PROBABILITY and len(segment_data) > 2: | |
chain = chinese_whispers_process() | |
logging.info(f"Created chain of length: {len(chain)}") | |
# 6. Concatenate with crossfade and apply overall fade | |
output_file = f'{output_audio_path}/output_{int(time.time())}.wav' | |
try: | |
concatenated_audio = None | |
for segment in chain: # Changed: 'file' to 'segment' | |
rate, audio = wavfile.read(segment['file']) # Changed: 'file' to 'segment['file']' | |
if concatenated_audio is None: | |
concatenated_audio = audio | |
else: | |
fade_length = int(CROSSFADE_DURATION * rate) | |
concatenated_audio = crossfade(concatenated_audio, audio, fade_length) | |
if concatenated_audio is not None: | |
overall_fade_length = int(FADE_DURATION * rate) | |
concatenated_audio = apply_fade(concatenated_audio, overall_fade_length) | |
# Scale the output volume based on channel 2 amplitude | |
scaling_factor = ch2_amplitude | |
concatenated_audio = (concatenated_audio * scaling_factor).astype(np.int16) | |
wavfile.write(output_file, rate, concatenated_audio.astype(np.int16)) | |
logging.info(f"Saved audio to {output_file}") | |
# Added: Log the input segments used for this output | |
segment_logger.info(f"Output: {output_file}") | |
for idx, segment in enumerate(chain): | |
segment_logger.info(f" Input {idx + 1}: {segment['file']} (Index: {segment['index']})") | |
# 7. Play the wav file | |
play_audio_nonblocking(output_file) | |
logging.info(f"Playing {output_file} in the background") | |
# Clean up old files | |
cleanup_files() | |
else: | |
logging.warning("No audio to concatenate") | |
except Exception as e: | |
logging.error(f"Error processing audio: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment