Skip to content

Instantly share code, notes, and snippets.

@leoimpett
Last active August 19, 2024 11:48
Show Gist options
  • Save leoimpett/2864d33549b90381e1e74bf1a92bdf67 to your computer and use it in GitHub Desktop.
Save leoimpett/2864d33549b90381e1e74bf1a92bdf67 to your computer and use it in GitHub Desktop.
print("Importing libraries...")
import numpy as np
import pyaudio
import wave
from flucoma import fluid
from scipy.io import wavfile
import simpleaudio as sa
from sklearn.neighbors import NearestNeighbors
import os
import random
import time
import logging
from sklearn.exceptions import NotFittedError
import threading
import time
from contextlib import contextmanager
import pickle, sys, subprocess, signal
print("Imported libraries")
# Leo Impett 2024
# Remember to start any terminal by adding the flucoma library to the PATH environment variable
# for instance export PATH=/Users/impett/Documentz/Code/dad_audio_lausanne/FluidCorpusManipulation/bin:$PATH
# this only has to be done once per terminal session (in the terminal in which you are running python)
# Tunable parameters
# -----------------
# Audio recording parameters
RATE = 44100 # Sample rate
CHUNK = 1024 # Buffer size for recording
# Segmentation and analysis parameters
MIN_SEGMENT_LENGTH = RATE # Minimum length of a segment (1 second)
MFCC_COEFFS = 13 # Number of MFCC coefficients to use. Default 13
MFCC_BANDS = 40 # Number of mel bands for MFCC. Default 40
# MEMORY MANAGEMENT PARAMETERS
MAX_SEGMENTS = 20 # Maximum number of segments to keep in memory. Default 100
MAX_OUTPUT_FILES = 10 # Maximum number of output files to keep. Default 10
# LISTENING PARAMETERS
MIN_RECORDING_LENGTH = 3 # Minimum recording length in seconds. Default 3
MAX_RECORDING_LENGTH = 6 # Maximum recording length in seconds. Default 20
REVERSE_CHANNELS = 0 # Reverse the channels. Default 0 (false). Turn it to 1 to reverse the channels
SIMILARITY_THRESHOLD = 0.1 # Similarity threshold for segment comparison. Default 0.1. 0 means only identical segments are considered similar,
# 1 means all segments are considered similar.
# Slicing parameters - using flucoma novelty slice
NOVELTY_THRESHOLD = 0.5 # Novelty threshold. Default 0.5
NOVELTY_FILTER = 1 # Novelty filter size. Default 1
NOVELTY_FFT = [1024, -1, -1] # FFT settings for novelty slice. Default [1024, -1, -1]
NOVELTY_KERNEL = 3 # Kernel size for novelty slice. Default 3
# For info see FluidCorpusManipulation/docs/fluid-noveltyslice.html
# Playback parameters
CROSSFADE_DURATION = 0.05 # Crossfade duration in seconds. Default 0.05
FADE_DURATION = 0.05 # Fade in/out duration in seconds. Default 0.05
# Chinese whispers parameters
MIN_CHAIN_LENGTH = 3 # Minimum chain length. Default 3
MAX_CHAIN_LENGTH = 10 # Maximum chain length. Default 10
# NB a range of 3-10 means a minimum of 3 seconds because each segments is at least 1 second long
# Probability of creating and playing concatenated audio
PLAY_PROBABILITY = 0.5
segment_counter = 0
# Paths
audio_segments_path = './audio_segments'
output_audio_path = './audio_output'
os.makedirs(audio_segments_path, exist_ok=True)
os.makedirs(output_audio_path, exist_ok=True)
# We now delete past outputs but not inputs
for file in os.listdir(output_audio_path):
os.remove(os.path.join(output_audio_path, file))
# FluCoMa temporary directory
home_dir = os.getenv('HOME')
FLUCOMA_TEMP_DIR = os.path.join(home_dir, '.python-flucoma/')
# Set up logging
# Uncomment the desired logging level and comment out the others
logging_level = logging.INFO
# logging_level = logging.DEBUG
# logging_level = logging.WARNING
# logging_level = logging.ERROR
# logging_level = logging.CRITICAL
log_file = 'audio_processing.log'
if os.path.exists(log_file):
os.remove(log_file)
logging.basicConfig(filename=log_file, level=logging_level,
format='%(asctime)s - %(levelname)s - %(message)s')
# segment logger
segment_logger = logging.getLogger('segment_logger')
segment_logger.setLevel(logging.INFO)
segment_handler = logging.FileHandler('segment_log.txt')
segment_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
segment_logger.addHandler(segment_handler)
print("Starting PyAudio")
# Print available audio devices
p = pyaudio.PyAudio()
print("Available audio input devices:")
input_devices = []
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0:
input_devices.append(f"Index {i}: {device_info['name']}")
print(f"Index {i}: {device_info['name']}")
print("\nAvailable audio output devices:")
output_devices = []
for i in range(p.get_device_count()):
device_info = p.get_device_info_by_index(i)
if device_info['maxOutputChannels'] > 0:
output_devices.append(f"Index {i}: {device_info['name']}")
print(f"Index {i}: {device_info['name']}")
p.terminate()
print("\nNote: The script is currently set to use the default input and output devices.")
print("To change the input or output device, modify the 'record_audio' and 'play_audio_nonblocking' functions.")
# Audio format for PyAudio
FORMAT = pyaudio.paInt16
# Global variables for state
global_state = {
'segment_data': [],
'nn_tree': None,
'segment_counter': 0,
'feature_sum': None,
'feature_count': 0
}
STATE_FILE = 'chirp_state.pkl'
def save_state():
print("Saving state")
with open(STATE_FILE, 'wb') as f:
pickle.dump(global_state, f)
def load_state():
global global_state
try:
with open(STATE_FILE, 'rb') as f:
global_state = pickle.load(f)
print("Loaded previous state")
except FileNotFoundError:
print("No previous state found. Starting fresh.")
def restart_coreaudio():
subprocess.run(["sudo", "killall", "coreaudiod"])
time.sleep(5) # Wait for coreaudio to restart
def graceful_exit(signum, frame):
print("Received signal to exit. Saving state and exiting...")
save_state()
sys.exit(0)
# Register the signal handler
signal.signal(signal.SIGTERM, graceful_exit)
signal.signal(signal.SIGINT, graceful_exit)
# PyAudio context manager
@contextmanager
def pyaudio_context():
p = pyaudio.PyAudio()
try:
yield p
finally:
p.terminate()
def record_audio(duration):
"""
Record audio for a specified duration.
Args:
duration (float): Recording duration in seconds.
Returns:
tuple: (bytes: Recorded audio data for channel 1, float: (one minus) the average amplitude of channel 2 or 1.0 if only one channel is detected)
"""
frames_ch1 = []
ch2_amplitudes = []
recording_complete = threading.Event()
recording_error = threading.Event()
def record_thread():
try:
with pyaudio_context() as p:
default_input = p.get_default_input_device_info()
channels = min(default_input['maxInputChannels'], 2)
stream = p.open(format=FORMAT, channels=channels, rate=RATE, input=True, frames_per_buffer=CHUNK)
start_time = time.time()
while time.time() - start_time < duration:
try:
data = stream.read(CHUNK, exception_on_overflow=False)
audio_chunk = np.frombuffer(data, dtype=np.int16)
if channels == 2:
ch1_data = audio_chunk[0::2]
ch2_data = audio_chunk[1::2]
if REVERSE_CHANNELS:
ch1_data, ch2_data = ch2_data, ch1_data
frames_ch1.append(ch1_data.tobytes())
ch2_amplitudes.append(np.mean(np.abs(ch2_data)))
else:
frames_ch1.append(audio_chunk.tobytes())
except IOError as e:
logging.warning(f"IOError during recording: {e}")
# wait for 2 seconds before trying again
time.sleep(2)
continue
stream.stop_stream()
stream.close()
except Exception as e:
logging.error(f"Error in recording thread: {e}")
recording_error.set()
# wait for 2 seconds before trying again
time.sleep(2)
finally:
recording_complete.set()
record_thread = threading.Thread(target=record_thread)
record_thread.start()
# Wait for the recording to complete or timeout
if not recording_complete.wait(timeout=duration + 5): # 5 second grace period
logging.error("Recording timed out")
recording_error.set()
# return None, None
if recording_error.is_set():
print("Error in recording thread. Saving state and exiting...")
save_state()
# sys.exit(2)
print("Bye! Please restart me")
os._exit(2) # Exit the script immediately
channel1 = b''.join(frames_ch1)
if not channel1:
logging.error("No audio data captured")
return None, 1.0 # Return None for audio data and default amplitude
if len(ch2_amplitudes) > 0:
ch2_amplitude = np.mean(ch2_amplitudes)
ch2_amplitude = ch2_amplitude / 32767.5
ch2_amplitude = 1.0 - ch2_amplitude
ch2_amplitude = np.clip(ch2_amplitude, 0, 1)
else:
ch2_amplitude = 1.00
print(f"Detected {2 if len(ch2_amplitudes) > 0 else 1} channel(s)")
print("Amplitude control:", ch2_amplitude)
print("Duration:", duration)
return channel1, ch2_amplitude
def save_audio(audio_data, filename):
"""
Save audio data to a WAV file.
Args:
audio_data (bytes): Audio data to save.
filename (str): Path to save the WAV file.
"""
if audio_data is None:
logging.error(f"Cannot save audio to {filename}: No audio data")
return
with wave.open(filename, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(pyaudio.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(audio_data)
# Initialize global variables
feature_sum = None
feature_count = 0
def analyze_audio(audio_path):
"""
Analyze audio file using MFCC and pitch, then calculate statistics.
Args:
audio_path (str): Path to the audio file.
Returns:
np.array: Feature vector of the audio, including MFCC and pitch statistics.
"""
global feature_sum, feature_count
mfcc = fluid.mfcc(audio_path, numcoeffs=MFCC_COEFFS, numbands=MFCC_BANDS)
pitch = fluid.pitch(audio_path)
feature = np.concatenate([np.mean(mfcc, axis=1), np.mean(pitch, axis=1)])
if feature_sum is None:
feature_sum = feature
else:
feature_sum += feature
feature_count += 1
# Avoid division by zero and handle NaN values
with np.errstate(divide='ignore', invalid='ignore'):
normalized_feature = np.where(
feature_sum != 0,
feature / (feature_sum / feature_count),
0
)
# Replace any remaining NaN or inf values with 0
normalized_feature = np.nan_to_num(normalized_feature, nan=0.0, posinf=0.0, neginf=0.0)
return normalized_feature
def crossfade(audio1, audio2, fade_length):
"""
Apply crossfade between two audio segments.
Args:
audio1, audio2 (np.array): Audio segments to crossfade.
fade_length (int): Length of the crossfade in samples.
Returns:
np.array: Crossfaded audio.
"""
audio1 = audio1.astype(np.float64)
audio2 = audio2.astype(np.float64)
fade_length = min(fade_length, len(audio1), len(audio2))
fade_in = np.linspace(0, 1, fade_length)
fade_out = np.linspace(1, 0, fade_length)
audio1[-fade_length:] *= fade_out
audio2[:fade_length] *= fade_in
result = np.concatenate([audio1[:-fade_length], audio1[-fade_length:] + audio2[:fade_length], audio2[fade_length:]])
# Normalize to prevent clipping
max_val = np.max(np.abs(result))
if max_val > 32767:
result = result * (32767 / max_val)
return result.astype(np.int16)
def apply_fade(audio, fade_length):
"""
Apply fade in and fade out to an audio segment.
Args:
audio (np.array): Audio segment to apply fades to.
fade_length (int): Length of the fade in samples.
Returns:
np.array: Audio with fades applied.
"""
# if fade_length is greater than the length of the audio, just return the audio
if fade_length > len(audio) // 2:
return audio
audio = audio.astype(np.float64)
fade_in = np.linspace(0, 1, fade_length)
fade_out = np.linspace(1, 0, fade_length)
audio[:fade_length] *= fade_in
audio[-fade_length:] *= fade_out
return audio
def play_audio_nonblocking(file_path):
"""
Play audio file without blocking the main thread.
Args:
file_path (str): Path to the audio file to play.
Returns:
simpleaudio.PlayObject or None: Play object if successful, None otherwise.
"""
if not os.path.exists(file_path):
logging.error(f"File {file_path} does not exist.")
return None
try:
wave_obj = sa.WaveObject.from_wave_file(file_path)
play_obj = wave_obj.play()
return play_obj
except Exception as e:
logging.error(f"Error playing audio file: {e}")
return None
def process_segments(audio_file, novelty_slice, min_length=MIN_SEGMENT_LENGTH):
"""
Process audio segments based on novelty slices and minimum length.
Args:
audio_file (str): Path to the audio file.
novelty_slice (list): List of novelty slice points.
min_length (int): Minimum length of a segment in samples.
Returns:
list: List of processed audio segments.
"""
with wave.open(audio_file, 'rb') as wf:
audio_array = np.frombuffer(wf.readframes(-1), dtype=np.int16)
# Create initial segments
segments = [audio_array[int(novelty_slice[i]):int(novelty_slice[i+1])] for i in range(1, len(novelty_slice) - 2)]
# Concatenate short segments
i = 0
while i < len(segments):
if len(segments[i]) < min_length:
if i > 0 and len(segments[i-1]) + len(segments[i]) < 2 * min_length:
segments[i-1] = np.concatenate((segments[i-1], segments[i]))
segments.pop(i)
elif i < len(segments) - 1:
segments[i] = np.concatenate((segments[i], segments[i+1]))
segments.pop(i+1)
else:
i += 1
else:
i += 1
return segments
def cleanup_files():
"""
Remove older output files and FluCoMa temporary files.
"""
# Clean up output files
output_files = sorted(
[f for f in os.listdir(output_audio_path) if f.startswith('output_') and f.endswith('.wav')],
key=lambda x: os.path.getmtime(os.path.join(output_audio_path, x)),
reverse=True
)
for old_file in output_files[MAX_OUTPUT_FILES:]:
try:
os.remove(os.path.join(output_audio_path, old_file))
logging.info(f"Removed old output file: {old_file}")
except Exception as e:
logging.error(f"Error removing old file {old_file}: {e}")
# Clean up FluCoMa temporary files
try:
for file in os.listdir(FLUCOMA_TEMP_DIR):
if file.endswith('.wav'):
os.remove(os.path.join(FLUCOMA_TEMP_DIR, file))
logging.info(f"Removed FluCoMa temporary file: {file}")
except Exception as e:
logging.error(f"Error cleaning up FluCoMa temporary files: {e}")
def save_and_analyze_segments(segments):
global nn_tree, segment_counter # Add segment_counter as a global variable
new_samples = 0
for segment in segments:
segment_file = f"{audio_segments_path}/segment_{segment_counter}.wav"
save_audio(segment.tobytes(), segment_file)
normalized_feature = analyze_audio(segment_file)
# Check for similar existing segments
if len(segment_data) > 0:
try:
distances, indices = nn_tree.kneighbors([normalized_feature], n_neighbors=1)
if normalize_distance(distances[0][0]) < SIMILARITY_THRESHOLD:
continue # Skip this segment if it's too similar to an existing one
except NotFittedError:
# If nn_tree is not fitted yet, we can't check for similarity
pass
segment_data.append({'file': segment_file, 'feature': normalized_feature, 'id': segment_counter})
segment_counter += 1
new_samples += 1
# Refit the nn_tree with the updated segment_data
if len(segment_data) > 1: # We need at least 2 points to fit the tree
features = [seg['feature'] for seg in segment_data]
nn_tree.fit(features)
return new_samples
def normalize_distance(distance):
max_distance = np.sqrt(len(segment_data[0]['feature'])) # Maximum possible Euclidean distance for normalized features
return distance / max_distance
def chinese_whispers_process():
"""
Perform the Chinese Whispers process to create a chain of similar audio segments.
Returns:
list: List of dictionaries containing file paths and indices for the chain of audio segments.
"""
chain_length = random.randint(MIN_CHAIN_LENGTH, min(MAX_CHAIN_LENGTH, len(segment_data)))
chain = []
used_ids = set()
query_index = random.randint(0, len(segment_data) - 1)
while len(chain) < chain_length:
current_id = segment_data[query_index]['id']
if current_id not in used_ids:
chain.append({'file': segment_data[query_index]['file'], 'index': current_id})
used_ids.add(current_id)
distances, indices = nn_tree.kneighbors([segment_data[query_index]['feature']], n_neighbors=len(segment_data))
for idx in indices[0]:
if segment_data[idx]['id'] not in used_ids:
query_index = idx
break
else:
break # No unused neighbors found
return chain
def main():
global segment_data, nn_tree, segment_counter, feature_sum, feature_count
# Load state at the beginning
load_state()
segment_data = global_state['segment_data']
nn_tree = global_state['nn_tree']
segment_counter = global_state['segment_counter']
feature_sum = global_state['feature_sum']
feature_count = global_state['feature_count']
# Initialize nn_tree if it's None
if nn_tree is None:
nn_tree = NearestNeighbors(n_neighbors=(MAX_CHAIN_LENGTH+5), algorithm='ball_tree')
"""
Main loop of the audio processing system.
"""
while True:
global_state.update({
'segment_data': segment_data,
'nn_tree': nn_tree,
'segment_counter': segment_counter,
'feature_sum': feature_sum,
'feature_count': feature_count
})
save_state()
# 1. Record audio for a random duration
duration = random.uniform(MIN_RECORDING_LENGTH, MAX_RECORDING_LENGTH)
logging.info(f"Recording for {duration:.2f} seconds...")
print(f"Recording for {duration:.2f} seconds...")
audio_data, ch2_amplitude = record_audio(duration)
if audio_data is None:
logging.warning("Failed to record audio, skipping this iteration")
continue
temp_file = 'temp_recording_buffer.wav'
save_audio(audio_data, temp_file)
# 2. Segment the audio
# If you want to add parameters to FLUCOMA Novelty Slice, you would do it here.
logging.info("Segmenting audio (sending to Flucoma)...")
novelty_slice = fluid.noveltyslice(temp_file, threshold=NOVELTY_THRESHOLD, filtersize=NOVELTY_FILTER, fftsettings=NOVELTY_FFT, kernelsize=NOVELTY_KERNEL)
logging.info("FluCoMa novelty slice complete")
if len(novelty_slice) <= 3:
logging.info("Not enough segments, continuing...")
continue
# 3. Process middle segments
logging.info("Processing segments...")
processed_segments = process_segments(temp_file, novelty_slice)
logging.info(f"Saving and analysing segments")
# Save and analyze new segments
new_samples = save_and_analyze_segments(processed_segments)
logging.info(f"Processed {new_samples} new segments. Total samples: {len(segment_data)}")
# Update the KD-tree with all features
if len(segment_data) > 3:
# if segment_data is > max_segments, remove the oldest segments
if len(segment_data) > MAX_SEGMENTS:
segment_data = segment_data[-MAX_SEGMENTS:]
features = [seg['feature'] for seg in segment_data]
nn_tree.fit(features)
# 4-5. Chinese whispers process (with 50% probability)
if random.random() < PLAY_PROBABILITY and len(segment_data) > 2:
chain = chinese_whispers_process()
logging.info(f"Created chain of length: {len(chain)}")
# 6. Concatenate with crossfade and apply overall fade
output_file = f'{output_audio_path}/output_{int(time.time())}.wav'
try:
concatenated_audio = None
for segment in chain: # Changed: 'file' to 'segment'
rate, audio = wavfile.read(segment['file']) # Changed: 'file' to 'segment['file']'
if concatenated_audio is None:
concatenated_audio = audio
else:
fade_length = int(CROSSFADE_DURATION * rate)
concatenated_audio = crossfade(concatenated_audio, audio, fade_length)
if concatenated_audio is not None:
overall_fade_length = int(FADE_DURATION * rate)
concatenated_audio = apply_fade(concatenated_audio, overall_fade_length)
# Scale the output volume based on channel 2 amplitude
scaling_factor = ch2_amplitude
concatenated_audio = (concatenated_audio * scaling_factor).astype(np.int16)
wavfile.write(output_file, rate, concatenated_audio.astype(np.int16))
logging.info(f"Saved audio to {output_file}")
# Added: Log the input segments used for this output
segment_logger.info(f"Output: {output_file}")
for idx, segment in enumerate(chain):
segment_logger.info(f" Input {idx + 1}: {segment['file']} (Index: {segment['index']})")
# 7. Play the wav file
play_audio_nonblocking(output_file)
logging.info(f"Playing {output_file} in the background")
# Clean up old files
cleanup_files()
else:
logging.warning("No audio to concatenate")
except Exception as e:
logging.error(f"Error processing audio: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment