Last active
May 29, 2023 16:54
-
-
Save Sharrnah/582b8a390e2462bcec77332cac2eb570 to your computer and use it in GitHub Desktop.
Volume and audio direction over OSC Whispering Tiger Plugin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ============================================================ | |
# Sends volume and audio direction over OSC using Whispering Tiger | |
# Version 1.0.2 | |
# | |
# See https://github.com/Sharrnah/whispering | |
# Inspired by https://github.com/Codel1417/VRC-OSC-Audio-Reaction | |
# ============================================================ | |
import Plugins | |
import VRC_OSCLib | |
import audio_tools | |
import settings | |
import pyaudiowpatch as pyaudio | |
import numpy as np | |
import threading | |
class AudioDirectionPlugin(Plugins.Base): | |
thread = None | |
audio_stream = None | |
continue_recording = True | |
needs_sample_rate_conversion = False | |
py_audio = None | |
prev_ema_left = 0 | |
prev_ema_right = 0 | |
prev_ema_average = 0 | |
def exponential_moving_average(self, current_value, previous_ema, smoothing_factor): | |
inverted_smoothing_factor = 1 - smoothing_factor | |
return (current_value * inverted_smoothing_factor) + (previous_ema * smoothing_factor) | |
def get_audio_amplitude(self, audio_data): | |
peak_amplitude = np.max(np.abs(audio_data)) | |
return peak_amplitude | |
def audio_to_mono(self, audio_data, channels, down_sample_method='average'): | |
if channels > 1: | |
mono_data = None | |
# Reshape the array into a 2D array with two columns (one for each channel) | |
audio_data = audio_data.reshape(-1, channels) | |
if down_sample_method == 'average': | |
# Average the two channels, and convert back to int16 | |
mono_data = audio_data.mean(axis=1, dtype=np.int16) | |
elif down_sample_method == 'left': | |
mono_data = audio_data[:, 0] | |
elif down_sample_method == 'right': | |
mono_data = audio_data[:, 1] | |
return mono_data | |
else: | |
return audio_data | |
def normalize_value(self, value, max_value=32767): | |
gain_setting = self.get_plugin_setting("gain", 0.8) | |
if not isinstance(gain_setting, float) and not isinstance(gain_setting, int): | |
return value / max_value | |
return (value / max_value) * gain_setting | |
def clamp_float(self, value, min_value=0, max_value=1): | |
return max(min(value, max_value), min_value) | |
def calculate_audio_direction(self, left_amplitude, right_amplitude): | |
if left_amplitude == 0 and right_amplitude == 0: | |
return 0.5 | |
else: | |
return self.clamp_float(((-1 * left_amplitude) * 2) + (right_amplitude * 2) + 0.5) | |
def start_audio_device(self): | |
FORMAT = pyaudio.paInt16 | |
CHANNELS = 2 | |
SAMPLE_RATE = 16000 | |
CHUNK = int(SAMPLE_RATE / 10) | |
device_index = self.get_plugin_setting("loopback_device_index", settings.GetOption("device_out_index")) | |
if device_index is None or device_index == -1: | |
device_index = settings.GetOption("device_default_out_index") | |
py_audio = pyaudio.PyAudio() | |
print("Using device index: " + str(device_index)) | |
try: | |
stream, self.needs_sample_rate_conversion, recorded_sample_rate, is_mono = audio_tools.start_recording_audio_stream( | |
device_index, | |
sample_format=FORMAT, | |
sample_rate=SAMPLE_RATE, | |
channels=CHANNELS, | |
chunk=CHUNK, | |
py_audio=py_audio, | |
) | |
return stream | |
except Exception as e: | |
print(e) | |
return None | |
def lerp(self, a: float, b: float, t: float) -> float: | |
"""Linear interpolate on the scale given by a to b, using t as the point on that scale. | |
Examples | |
-------- | |
50 == lerp(0, 100, 0.5) | |
4.2 == lerp(1, 5, 0.8) | |
""" | |
return (1 - t) * a + t * b | |
def inv_lerp(self, a: float, b: float, v: float) -> float: | |
"""Inverse Linar Interpolation, get the fraction between a and b on which v resides. | |
Examples | |
-------- | |
0.5 == inv_lerp(0, 100, 50) | |
0.8 == inv_lerp(1, 5, 4.2) | |
""" | |
return (v - a) / (b - a) | |
def remap(self, i_min: float, i_max: float, o_min: float, o_max: float, v: float) -> float: | |
"""Remap values from one linear scale to another, a combination of lerp and inv_lerp. | |
i_min and i_max are the scale on which the original value resides, | |
o_min and o_max are the scale to which it should be mapped. | |
Examples | |
-------- | |
45 == remap(0, 100, 40, 50, 50) | |
6.2 == remap(1, 5, 3, 7, 4.2) | |
""" | |
return self.lerp(o_min, o_max, self.inv_lerp(i_min, i_max, v)) | |
def audio_loop(self): | |
print(self.__class__.__name__ + " thread is started.") | |
osc_ip = settings.GetOption("osc_ip") | |
osc_port = settings.GetOption("osc_port") | |
default_sample_rate = 33100 | |
device_index = self.get_plugin_setting("loopback_device_index", settings.GetOption("device_out_index")) | |
if device_index is None or device_index == -1: | |
device_index = settings.GetOption("device_default_out_index") | |
dev_info = self.py_audio.get_device_info_by_index(device_index) | |
recorded_sample_rate = int(dev_info['defaultSampleRate']) | |
self.continue_recording = True | |
while self.continue_recording: | |
audio_chunk = self.audio_stream.read(self.get_plugin_setting("num_samples", 32), | |
exception_on_overflow=False) | |
if self.needs_sample_rate_conversion: | |
audio_chunk = audio_tools.resample_audio(audio_chunk, recorded_sample_rate, default_sample_rate, 2, | |
is_mono=False).tobytes() | |
audio_int16 = np.frombuffer(audio_chunk, np.int16) | |
mono_left_audio = self.audio_to_mono(audio_int16, 2, 'left') | |
peak_amplitude_left = self.get_audio_amplitude(mono_left_audio) | |
mono_right_audio = self.audio_to_mono(audio_int16, 2, 'right') | |
peak_amplitude_right = self.get_audio_amplitude(mono_right_audio) | |
# self.prev_ema_left = self.lerp(self.prev_ema_left, peak_amplitude_left, self.get_plugin_setting("smoothing_factor", 0.3)) | |
# self.prev_ema_right = self.lerp(self.prev_ema_right, peak_amplitude_right, self.get_plugin_setting("smoothing_factor", 0.3)) | |
self.prev_ema_left = self.exponential_moving_average(peak_amplitude_left, self.prev_ema_left, | |
self.get_plugin_setting("smoothing_factor", 0.3)) | |
self.prev_ema_right = self.exponential_moving_average(peak_amplitude_right, self.prev_ema_right, | |
self.get_plugin_setting("smoothing_factor", 0.3)) | |
normalized_amplitude_left = self.normalize_value(self.prev_ema_left) | |
normalized_amplitude_right = self.normalize_value(self.prev_ema_right) | |
# Boost volume to usable level | |
normalized_amplitude_left *= 10 | |
normalized_amplitude_right *= 10 | |
audio_volume = self.clamp_float((normalized_amplitude_left + normalized_amplitude_right) / 2) | |
audio_direction = self.calculate_audio_direction(normalized_amplitude_left, normalized_amplitude_right) | |
VRC_OSCLib.Float(audio_volume, "/avatar/parameters/audio_volume", osc_ip, osc_port) | |
VRC_OSCLib.Float(audio_direction, "/avatar/parameters/audio_direction", osc_ip, osc_port) | |
if self.get_plugin_setting("debug", False): | |
print("Audio volume: " + str(audio_volume)) | |
print("Audio direction: " + str(audio_direction)) | |
if not self.continue_recording: | |
print(self.__class__.__name__ + " thread is stopped.") | |
break | |
def init(self): | |
# prepare all possible settings | |
self.init_plugin_settings( | |
{ | |
"loopback_device_index": settings.GetOption("device_out_index"), | |
"debug": False, | |
"gain": {"type": "slider", "min": 0.0, "max": 2.0, "step": 0.05, "value": 0.8}, | |
"smoothing_factor": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "value": 0.3}, # A value between 0 and 1, higher value means less smoothing | |
"num_samples": 128, # Number of samples to read at a time | |
} | |
) | |
if self.is_enabled(False): | |
print(self.__class__.__name__ + " is enabled") | |
self.py_audio = pyaudio.PyAudio() | |
self.audio_stream = self.start_audio_device() | |
if self.thread is None and self.audio_stream is not None: | |
self.thread = threading.Thread(target=self.audio_loop) | |
self.thread.start() | |
else: | |
print(self.__class__.__name__ + " is disabled") | |
if self.thread is not None: | |
self.continue_recording = False | |
self.thread.join() | |
self.thread = None | |
if self.audio_stream is not None: | |
self.audio_stream.stop_stream() | |
self.audio_stream.close() | |
self.audio_stream = None | |
def on_enable(self): | |
self.init() | |
pass | |
def on_disable(self): | |
self.init() | |
pass | |
def timer(self): | |
pass | |
def stt(self, text, result_obj): | |
return | |
def tts(self, text, device_index, websocket_connection=None, download=False): | |
return |
Author
Sharrnah
commented
Mar 29, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment