Last active
February 28, 2024 14:37
-
-
Save Sharrnah/8d906a3657f097702079451ff762ed95 to your computer and use it in GitHub Desktop.
Retrieval-based Voice Conversion Whispering Tiger Plugin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ============================================================ | |
# RVC Speech to Speech Plugin for Whispering Tiger | |
# V1.0.9 | |
# RVC WebUI: https://github.com/RVC-Project/Retrieval-based-Voice-Conversion | |
# Whispering Tiger: https://github.com/Sharrnah/whispering-ui | |
# ============================================================ | |
# | |
import base64 | |
import io | |
import json | |
import queue | |
import random | |
import shutil | |
import wave | |
from typing import Union, BinaryIO | |
import librosa | |
import numpy as np | |
import torch | |
import Plugins | |
from pathlib import Path | |
import os | |
import sys | |
from scipy.io.wavfile import write as write_wav | |
from Models.STS import DeepFilterNet | |
import audio_tools | |
import downloader | |
import settings | |
import websocket | |
rvc_sts_plugin_dir = Path(Path.cwd() / "Plugins" / "rvc_sts_plugin") | |
os.makedirs(rvc_sts_plugin_dir, exist_ok=True) | |
rvc_sts_plugin_weights_dir = Path(Path.cwd() / "Plugins" / "rvc_sts_plugin" / "weights") | |
os.makedirs(rvc_sts_plugin_weights_dir, exist_ok=True) | |
rvc_webui_dependency = { | |
"urls": [ | |
"https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI/archive/f431f8fb3f13aa6dfedf33383f70de35fe07dfbd.zip", | |
"https://eu2.contabostorage.com/bf1a89517e2643359087e5d8219c0c67:projects/rvc-plugin/Retrieval-based-Voice-Conversion-WebUI-f431f8fb3f13aa6dfedf33383f70de35fe07dfbd.zip", | |
"https://usc1.contabostorage.com/8fcf133c506f4e688c7ab9ad537b5c18:projects/rvc-plugin2/Retrieval-based-Voice-Conversion-WebUI-f431f8fb3f13aa6dfedf33383f70de35fe07dfbd.zip" | |
], | |
"sha256": "75eb5f3bcadf9bb56ef73415d56940ded6d3c2d1feae34b5252ae15266459d73", | |
"zip_path": "Retrieval-based-Voice-Conversion-WebUI-f431f8fb3f13aa6dfedf33383f70de35fe07dfbd", | |
"target_path": "Retrieval-based-Voice-Conversion-WebUI" | |
} | |
rvc_models = { | |
"urls": [ | |
"https://eu2.contabostorage.com/bf1a89517e2643359087e5d8219c0c67:projects/rvc-plugin/rvc_models.zip", | |
"https://usc1.contabostorage.com/8fcf133c506f4e688c7ab9ad537b5c18:projects/rvc-plugin2/rvc_models.zip" | |
], | |
"sha256": "75df758e11605fde28f3d82cf7415503deee0fb4de95d838db8b320474823816" | |
} | |
rmvpe_model = { | |
"urls": [ | |
"https://eu2.contabostorage.com/bf1a89517e2643359087e5d8219c0c67:projects/rvc-plugin/rmvpe_model.zip", | |
"https://usc1.contabostorage.com/8fcf133c506f4e688c7ab9ad537b5c18:projects/rvc-plugin2/rmvpe_model.zip" | |
], | |
"sha256": "63d9f0b001eb0749a0ec6a7f12d7b5193b1b54a1a259fcfc4201eb81d7dc0627" | |
} | |
rvc_infer_script = { | |
"urls": [ | |
"https://eu2.contabostorage.com/bf1a89517e2643359087e5d8219c0c67:projects/rvc-plugin/rvc_infer.py", | |
"https://usc1.contabostorage.com/8fcf133c506f4e688c7ab9ad537b5c18:projects/rvc-plugin2/rvc_infer.py" | |
], | |
"sha256": "7d011acff3f730c01aa8ecb08918c77d8e0809c32a26c394ee92b4c07614a083" | |
} | |
CONSTANTS = { | |
"DISABLED": 'Disabled', | |
"STS": 'Own Voice', | |
#"STS_RT": 'Own Voice (Realtime)', | |
"SILERO_TTS": 'Integrated Text-to-Speech (Silero TTS)', | |
"PLUGIN_TTS": 'Plugin Text-to-Speech', | |
} | |
#sys.path.append(str(rvc_sts_plugin_dir.resolve())) | |
#sys.path.append(os.path.join(rvc_sts_plugin_dir, "Retrieval-based-Voice-Conversion-WebUI")) | |
# | |
#from tools import rvc_for_realtime | |
#from configs.config import Config | |
#from tools.torchgate import TorchGate | |
# | |
#import torchaudio.transforms as tat | |
#import threading | |
#import time | |
#import sounddevice as sd | |
def printt(strr, *args): | |
if len(args) == 0: | |
print(strr) | |
else: | |
print(strr % args) | |
class RVCStsPlugin(Plugins.Base): | |
output_sample_rate = 16000 | |
audio_denoiser = None | |
# audio_processing_queue = queue.Queue() | |
# | |
# device_index = None | |
# audio_playback_queue = None | |
# streaming_playback_thread = None | |
# | |
# audio_buffer = b'' | |
# audio_buffer_duration = 0 # Duration of audio in buffer in milliseconds | |
# target_duration = 500 # Target duration in milliseconds for processing | |
def model_file_valid(self, file_path: str): | |
# check if file exists | |
if os.path.exists(file_path): | |
return True | |
return False | |
def on_enable(self): | |
self.init() | |
pass | |
def on_disable(self): | |
pass | |
def init(self): | |
self.init_plugin_settings( | |
{ | |
# voice conversion settings | |
"index_rate": {"type": "slider", "min": 0, "max": 1, "step": 0.01, "value": 0.75}, | |
"f0up_key": {"type": "slider", "min": -50.0, "max": 50.0, "step": 0.1, "value": -13.0}, | |
"f0up_key_info": {"label": "f0up_key (pitch setting) : lower (ca.-12) if voice conversion is female to male, higher (ca.+12) if male to female.", "type": "label", "style": "left"}, | |
"f0method": {"type": "select", "value": "harvest", "values": ["harvest", "pm", "crepe", "rmvpe"]}, | |
"filter_radius": {"type": "slider", "min": 1, "max": 10, "step": 1, "value": 3}, | |
"rms_mix_rate": {"type": "slider", "min": 0, "max": 1, "step": 0.01, "value": 0.25}, | |
"protect": {"type": "slider", "min": 0, "max": 1, "step": 0.01, "value": 0.33}, | |
# audio conversion | |
"audio_file": {"type": "file_open", "accept": ".wav", "value": ""}, | |
"convert_btn": {"label": "convert audio file", "type": "button", "style": "primary"}, | |
"voice_change_source": {"type": "select", "value": CONSTANTS["STS"], | |
"values": [value for key, value in CONSTANTS.items()]}, | |
# model settings | |
"model_file": {"type": "file_open", "accept": ".pth", "value": ""}, | |
"index_file": {"type": "file_open", "accept": ".index", "value": ""}, | |
"model_load_btn": {"label": "Load model", "type": "button", "style": "primary"}, | |
"half_precision": False, | |
"device": {"type": "select", "value": "cpu:0", | |
"values": ["cpu:0", "cpu:1", "cpu:2", "cuda:0", "cuda:1", "cuda:2"]}, | |
"result_noise_filter": False, | |
"unload_on_finish": False, | |
}, | |
settings_groups={ | |
"General": ["index_rate", "f0up_key", "f0up_key_info", "f0method", "filter_radius", "rms_mix_rate", "protect"], | |
"Audio conversion": ["voice_change_source", "audio_file", "convert_btn"], | |
"Model": ["model_file", "index_file", "model_load_btn", "half_precision", "device", "result_noise_filter", "unload_on_finish"], | |
} | |
) | |
if self.is_enabled(False) and self.model_file_valid(self.get_plugin_setting("model_file")): | |
# download infer script | |
if not Path(rvc_sts_plugin_dir / "rvc_infer.py").is_file() or downloader.sha256_checksum(str(Path(rvc_sts_plugin_dir / "rvc_infer.py").resolve())) != rvc_infer_script["sha256"]: | |
# delete rvc_infer.py if it already exists | |
if Path(rvc_sts_plugin_dir / "rvc_infer.py").is_file(): | |
os.remove(str(Path(rvc_sts_plugin_dir / "rvc_infer.py").resolve())) | |
infer_script_url = random.choice(rvc_infer_script["urls"]) | |
downloader.download_extract([infer_script_url], | |
str(rvc_sts_plugin_dir.resolve()), | |
rvc_infer_script["sha256"], | |
alt_fallback=True, | |
title="RVC Inference script", extract_format="none") | |
# download rvc_webui | |
if not Path(rvc_sts_plugin_dir / rvc_webui_dependency["target_path"] / "infer-web.py").is_file(): | |
print("rvc_webui downloading...") | |
# download from random url in list | |
voice_clone_url = random.choice(rvc_webui_dependency["urls"]) | |
downloader.download_extract([voice_clone_url], | |
str(rvc_sts_plugin_dir.resolve()), | |
rvc_webui_dependency["sha256"], | |
alt_fallback=True, | |
fallback_extract_func=downloader.extract_zip, | |
fallback_extract_func_args=( | |
str(rvc_sts_plugin_dir / os.path.basename(voice_clone_url)), | |
str(rvc_sts_plugin_dir.resolve()), | |
), | |
title="RVC", extract_format="zip") | |
# rename folder | |
shutil.move(str(rvc_sts_plugin_dir / rvc_webui_dependency["zip_path"]), str(rvc_sts_plugin_dir / rvc_webui_dependency["target_path"])) | |
rvc_models_path = Path(rvc_sts_plugin_dir / rvc_webui_dependency["target_path"] / "assets") | |
if not Path(rvc_models_path / "hubert" / "hubert_base.pt").is_file(): | |
print("rvc models downloading...") | |
# download from random url in list | |
rvc_model_url = random.choice(rvc_models["urls"]) | |
downloader.download_extract([rvc_model_url], | |
str(rvc_models_path.resolve()), | |
rvc_models["sha256"], | |
alt_fallback=True, | |
fallback_extract_func=downloader.extract_zip, | |
fallback_extract_func_args=( | |
str(rvc_models_path / os.path.basename(rvc_model_url)), | |
str(rvc_models_path.resolve()), | |
), | |
title="RVC Models", extract_format="zip") | |
if not Path(rvc_models_path / "rmvpe" / "rmvpe.pt").is_file(): | |
print("rmvpe model downloading...") | |
# download from random url in list | |
rmvpe_model_url = random.choice(rmvpe_model["urls"]) | |
downloader.download_extract([rmvpe_model_url], | |
str(rvc_models_path.resolve()), | |
rmvpe_model["sha256"], | |
alt_fallback=True, | |
fallback_extract_func=downloader.extract_zip, | |
fallback_extract_func_args=( | |
str(rvc_models_path / os.path.basename(rmvpe_model_url)), | |
str(rvc_models_path.resolve()), | |
), | |
title="rmvpe model", extract_format="zip") | |
sys.path.append(str(rvc_sts_plugin_dir.resolve())) | |
sys.path.append(os.path.join(rvc_sts_plugin_dir, "Retrieval-based-Voice-Conversion-WebUI")) | |
rvc_path = self.get_plugin_setting("model_file") | |
self.index_path = self.get_plugin_setting("index_file") | |
# device = "cuda:0" | |
device = self.get_plugin_setting("device") | |
# is_half = True | |
is_half = self.get_plugin_setting("half_precision") | |
from rvc_infer import get_vc, vc_single, release_model | |
############################# | |
## realtime voice conversion | |
############################# | |
##from multiprocessing import Queue, cpu_count | |
#self.n_cpu = min(2, 8) | |
#self.inp_q = queue.Queue() | |
#self.opt_q = queue.Queue() | |
# | |
#self.gui_config = GUIConfig() | |
#self.config = Config() | |
# | |
#self.gui_config.pitch = self.get_plugin_setting("f0up_key") | |
#self.gui_config.pth_path = rvc_path | |
#self.gui_config.index_path = self.index_path | |
# | |
#input_devices, output_devices, _, _ = self.get_devices() | |
#( | |
# input_devices, | |
# output_devices, | |
# input_device_indices, | |
# output_device_indices, | |
#) = self.get_devices() | |
#sd.default.device[0] = input_device_indices[ | |
# input_devices.index(settings.GetOption("audio_input_device")[:32] + " (MME)") | |
#] | |
#sd.default.device[1] = output_device_indices[ | |
# output_devices.index(settings.GetOption("audio_output_device")[:32] + " (MME)") | |
#] | |
self.vc_single = vc_single | |
self.release_model = release_model | |
get_vc(rvc_path, device, is_half) | |
if self.is_enabled(False) and not self.model_file_valid(self.get_plugin_setting("model_file")): | |
websocket.BroadcastMessage(json.dumps({"type": "info", | |
"data": "No model file found. Please select a model file first."})) | |
#if self.is_enabled(False): | |
# if self.get_plugin_setting("voice_change_source") == CONSTANTS["STS_RT"]: | |
# self.audio_playback_queue, self.streaming_playback_thread = audio_tools.start_streaming_audio_playback( | |
# self.device_index, | |
# channels=2, | |
# sample_rate=self.output_sample_rate, | |
# ) | |
def load_audio_file(self, wav_file_path): | |
with wave.open(wav_file_path, 'rb') as wav_file: | |
# Extract Raw Audio from Wav File | |
signal = wav_file.readframes(-1) | |
# Get the number of channels and sample width | |
n_channels = wav_file.getnchannels() | |
sample_width = wav_file.getsampwidth() | |
frame_rate = wav_file.getframerate() | |
# Determine the appropriate numpy data type for the audio array | |
dtype_map = {1: np.int8, 2: np.int16, 4: np.int32} | |
float_dtype_map = {3: np.float32} # 3 is just a placeholder for 32-bit float sample width | |
if sample_width in dtype_map: | |
dtype = dtype_map[sample_width] | |
max_value = np.iinfo(dtype).max | |
audio = np.frombuffer(signal, dtype=dtype) | |
audio_normalized = (audio / max_value).astype(np.float32) | |
elif sample_width == 4 and wav_file.getcomptype() == 'IEEE_FLOAT': | |
# For 32-bit float WAV files, data is already in float32 format | |
dtype = float_dtype_map[3] | |
audio_normalized = np.frombuffer(signal, dtype=dtype) | |
else: | |
raise ValueError(f"Unsupported sample width: {sample_width}") | |
# If the WAV file is stereo (2 channels), down-mix to mono | |
is_mono = True | |
if n_channels == 2: | |
is_mono = False | |
# This example averages the two channels to down-mix to mono | |
# Replace the following line with your own downmixing function as needed | |
#audio = audio.reshape(-1, 2).mean(axis=1) | |
# resample the audio | |
audio = audio_tools.resample_audio(audio_normalized, frame_rate, self.output_sample_rate, target_channels=-1, is_mono=is_mono, dtype="float32") | |
return audio | |
def play_audio_on_device(self, wav, audio_device, source_sample_rate=22050, audio_device_channel_num=2, | |
target_channels=2, is_mono=True, dtype="int16"): | |
secondary_audio_device = None | |
if settings.GetOption("tts_use_secondary_playback") and ( | |
(settings.GetOption("tts_secondary_playback_device") == -1 and audio_device != settings.GetOption( | |
"device_default_out_index")) or | |
(settings.GetOption("tts_secondary_playback_device") > -1 and audio_device != settings.GetOption( | |
"tts_secondary_playback_device"))): | |
secondary_audio_device = settings.GetOption("tts_secondary_playback_device") | |
if secondary_audio_device == -1: | |
secondary_audio_device = settings.GetOption("device_default_out_index") | |
audio_tools.play_audio(wav, audio_device, | |
source_sample_rate=source_sample_rate, | |
audio_device_channel_num=audio_device_channel_num, | |
target_channels=target_channels, | |
is_mono=is_mono, | |
dtype=dtype, | |
secondary_device=secondary_audio_device, tag="tts") | |
# bytes_dtype is used if audio is bytes | |
def do_conversion(self, audio, sample_rate, bytes_dtype="int16"): | |
# index_rate = 0.75 | |
index_rate = self.get_plugin_setting("index_rate") | |
# f0up_key = -6 | |
f0up_key = self.get_plugin_setting("f0up_key") | |
# filter_radius = 3 | |
filter_radius = self.get_plugin_setting("filter_radius") | |
# rms_mix_rate = 0.25 | |
rms_mix_rate = self.get_plugin_setting("rms_mix_rate") | |
# protect = 0.33 | |
protect = self.get_plugin_setting("protect") | |
# f0method = "harvest" # harvest or pm | |
f0method = self.get_plugin_setting("f0method") # harvest or pm | |
if isinstance(audio, bytes): | |
b_dtype = np.int16 | |
if bytes_dtype == "float32": | |
b_dtype = np.float32 | |
audio = np.frombuffer(audio, dtype=b_dtype) | |
if audio.dtype == np.float32: | |
wav_data_float32 = audio | |
else: | |
wav_data_int16 = np.frombuffer(audio, dtype=np.int16) | |
wav_data_float32 = wav_data_int16.astype(np.float32) / np.iinfo(np.int16).max | |
try: | |
audio_array = self.vc_single(0, wav_data_float32, f0up_key, None, f0method, self.index_path, index_rate, | |
filter_radius=filter_radius, resample_sr=sample_rate, | |
rms_mix_rate=rms_mix_rate, protect=protect) | |
except Exception as e: | |
print("error. falling back: ", e) | |
audio_array = self.vc_single(0, wav_data_float32, f0up_key, None, 'pm', self.index_path, index_rate, | |
filter_radius=filter_radius, resample_sr=sample_rate, | |
rms_mix_rate=rms_mix_rate, protect=protect) | |
if self.get_plugin_setting("result_noise_filter"): | |
if self.audio_denoiser is None: | |
self.audio_denoiser = DeepFilterNet.DeepFilterNet(post_filter=False) | |
if self.audio_denoiser is not None: | |
audio_array = self.audio_denoiser.enhance_audio(audio_array) | |
if self.get_plugin_setting("unload_on_finish"): | |
self.release_model() | |
return audio_array | |
def sts(self, wavefiledata, sample_rate): | |
if self.is_enabled(False) and self.get_plugin_setting("voice_change_source") == CONSTANTS["STS"] and self.model_file_valid(self.get_plugin_setting("model_file")) and settings.GetOption("tts_answer"): | |
audio_array = self.do_conversion(wavefiledata, sample_rate, bytes_dtype="int16") | |
# create wav audio for playback | |
buff = io.BytesIO() | |
write_wav(buff, sample_rate, audio_array) | |
buff.seek(0) | |
device_index = settings.GetOption("device_out_index") | |
if device_index is None or device_index == -1: | |
device_index = settings.GetOption("device_default_out_index") | |
self.play_audio_on_device(buff.getvalue(), device_index, | |
source_sample_rate=sample_rate, | |
audio_device_channel_num=2, | |
target_channels=2, | |
is_mono=True, | |
dtype="int16" | |
) | |
@staticmethod | |
def calculate_duration(num_bytes, sample_rate): | |
# Assuming 16-bit (2 bytes) samples and 2 channels | |
sample_duration_ms = 1000 / sample_rate | |
return (num_bytes / 2 / 2) * sample_duration_ms | |
# def sts_chunk(self, wavefiledata, sample_rate): | |
# if self.is_enabled(False) and self.streaming_playback_thread is not None and self.get_plugin_setting("voice_change_source") == CONSTANTS["STS_RT"] and self.model_file_valid(self.get_plugin_setting("model_file")) and settings.GetOption("tts_answer"): | |
# # Convert to bytearray before extending | |
# buffer_data = wavefiledata.tobytes() if isinstance(wavefiledata, np.ndarray) else wavefiledata | |
# self.audio_buffer += buffer_data | |
# self.audio_buffer_duration += self.calculate_duration(len(buffer_data), sample_rate) | |
# | |
# if self.audio_buffer_duration >= self.target_duration: | |
# audio_array = self.do_conversion(self.audio_buffer, sample_rate, bytes_dtype="int16") | |
# self.audio_playback_queue.put(audio_array) | |
# | |
# # Reset buffer | |
# self.audio_buffer = b'' | |
# self.audio_buffer_duration = 0 | |
def get_devices(self, update: bool = True): | |
"""获取设备列表""" | |
if update: | |
sd._terminate() | |
sd._initialize() | |
devices = sd.query_devices() | |
hostapis = sd.query_hostapis() | |
for hostapi in hostapis: | |
for device_idx in hostapi["devices"]: | |
devices[device_idx]["hostapi_name"] = hostapi["name"] | |
input_devices = [ | |
f"{d['name']} ({d['hostapi_name']})" | |
for d in devices | |
if d["max_input_channels"] > 0 | |
] | |
output_devices = [ | |
f"{d['name']} ({d['hostapi_name']})" | |
for d in devices | |
if d["max_output_channels"] > 0 | |
] | |
input_devices_indices = [ | |
d["index"] if "index" in d else d["name"] | |
for d in devices | |
if d["max_input_channels"] > 0 | |
] | |
output_devices_indices = [ | |
d["index"] if "index" in d else d["name"] | |
for d in devices | |
if d["max_output_channels"] > 0 | |
] | |
return ( | |
input_devices, | |
output_devices, | |
input_devices_indices, | |
output_devices_indices, | |
) | |
def set_devices(self, input_device, output_device): | |
"""设置输出设备""" | |
( | |
input_devices, | |
output_devices, | |
input_device_indices, | |
output_device_indices, | |
) = self.get_devices() | |
sd.default.device[0] = input_device_indices[ | |
input_devices.index(input_device) | |
] | |
sd.default.device[1] = output_device_indices[ | |
output_devices.index(output_device) | |
] | |
printt("Input device: %s:%s", str(sd.default.device[0]), input_device) | |
printt("Output device: %s:%s", str(sd.default.device[1]), output_device) | |
def start_vc(self): | |
torch.cuda.empty_cache() | |
self.flag_vc = True | |
self.rvc = rvc_for_realtime.RVC( | |
self.gui_config.pitch, | |
self.gui_config.pth_path, | |
self.gui_config.index_path, | |
self.gui_config.index_rate, | |
self.gui_config.n_cpu, | |
self.inp_q, | |
self.opt_q, | |
self.config, | |
self.rvc if hasattr(self, "rvc") else None, | |
) | |
self.gui_config.samplerate = self.rvc.tgt_sr | |
self.zc = self.rvc.tgt_sr // 100 | |
self.block_frame = ( | |
int( | |
np.round( | |
self.gui_config.block_time | |
* self.gui_config.samplerate | |
/ self.zc | |
) | |
) | |
* self.zc | |
) | |
self.block_frame_16k = 160 * self.block_frame // self.zc | |
self.crossfade_frame = ( | |
int( | |
np.round( | |
self.gui_config.crossfade_time | |
* self.gui_config.samplerate | |
/ self.zc | |
) | |
) | |
* self.zc | |
) | |
self.sola_search_frame = self.zc | |
self.extra_frame = ( | |
int( | |
np.round( | |
self.gui_config.extra_time | |
* self.gui_config.samplerate | |
/ self.zc | |
) | |
) | |
* self.zc | |
) | |
self.input_wav: torch.Tensor = torch.zeros( | |
self.extra_frame | |
+ self.crossfade_frame | |
+ self.sola_search_frame | |
+ self.block_frame, | |
device=self.config.device, | |
dtype=torch.float32, | |
) | |
self.input_wav_res: torch.Tensor = torch.zeros( | |
160 * self.input_wav.shape[0] // self.zc, | |
device=self.config.device, | |
dtype=torch.float32, | |
) | |
self.pitch: np.ndarray = np.zeros( | |
self.input_wav.shape[0] // self.zc, | |
dtype="int32", | |
) | |
self.pitchf: np.ndarray = np.zeros( | |
self.input_wav.shape[0] // self.zc, | |
dtype="float64", | |
) | |
self.sola_buffer: torch.Tensor = torch.zeros( | |
self.crossfade_frame, device=self.config.device, dtype=torch.float32 | |
) | |
self.nr_buffer: torch.Tensor = self.sola_buffer.clone() | |
self.output_buffer: torch.Tensor = self.input_wav.clone() | |
self.res_buffer: torch.Tensor = torch.zeros( | |
2 * self.zc, device=self.config.device, dtype=torch.float32 | |
) | |
self.valid_rate = 1 - (self.extra_frame - 1) / self.input_wav.shape[0] | |
self.fade_in_window: torch.Tensor = ( | |
torch.sin( | |
0.5 | |
* np.pi | |
* torch.linspace( | |
0.0, | |
1.0, | |
steps=self.crossfade_frame, | |
device=self.config.device, | |
dtype=torch.float32, | |
) | |
) | |
** 2 | |
) | |
self.fade_out_window: torch.Tensor = 1 - self.fade_in_window | |
self.resampler = tat.Resample( | |
orig_freq=self.gui_config.samplerate, | |
new_freq=16000, | |
dtype=torch.float32, | |
).to(self.config.device) | |
self.tg = TorchGate( | |
sr=self.gui_config.samplerate, n_fft=4 * self.zc, prop_decrease=0.9 | |
).to(self.config.device) | |
thread_vc = threading.Thread(target=self.soundinput) | |
thread_vc.start() | |
def soundinput(self): | |
""" | |
接受音频输入 | |
""" | |
channels = 1 if sys.platform == "darwin" else 2 | |
with sd.Stream( | |
channels=channels, | |
callback=self.audio_callback, | |
blocksize=self.block_frame, | |
samplerate=self.gui_config.samplerate, | |
dtype="float32", | |
) as stream: | |
global stream_latency | |
stream_latency = stream.latency[-1] | |
while self.flag_vc: | |
time.sleep(self.gui_config.block_time) | |
printt("Audio block passed.") | |
printt("ENDing VC") | |
def audio_callback( | |
self, indata: np.ndarray, outdata: np.ndarray, frames, times, status | |
): | |
threhold = -55 | |
self.zc = self.rvc.tgt_sr // 100 | |
indata = librosa.to_mono(indata.T) | |
if threhold > -60: | |
rms = librosa.feature.rms( | |
y=indata, frame_length=4 * self.zc, hop_length=self.zc | |
) | |
db_threhold = ( | |
librosa.amplitude_to_db(rms, ref=1.0)[0] < self.gui_config.threhold | |
) | |
for i in range(db_threhold.shape[0]): | |
if db_threhold[i]: | |
indata[i * self.zc : (i + 1) * self.zc] = 0 | |
self.input_wav[: -self.block_frame] = self.input_wav[ | |
self.block_frame : | |
].clone() | |
self.input_wav[-self.block_frame :] = torch.from_numpy(indata).to( | |
self.config.device | |
) | |
self.input_wav_res[: -self.block_frame_16k] = self.input_wav_res[ | |
self.block_frame_16k : | |
].clone() | |
def on_event_received(self, message, websocket_connection=None): | |
if self.is_enabled(False): | |
if "type" not in message: | |
return | |
if message["type"] == "plugin_button_press": | |
if message["value"] == "model_load_btn": | |
self.init() | |
websocket.BroadcastMessage(json.dumps({"type": "info", | |
"data": "Model loaded."})) | |
pass | |
if message["value"] == "convert_btn": | |
input_sample_rate = 16000 | |
audio_file = self.get_plugin_setting("audio_file") | |
audio_data = self.load_audio_file(audio_file) | |
audio_array = self.do_conversion(audio_data, input_sample_rate) | |
buff = io.BytesIO() | |
write_wav(buff, input_sample_rate, audio_array) | |
buff.seek(0) | |
wav_data = base64.b64encode(buff.getvalue()).decode('utf-8') | |
websocket.AnswerMessage(websocket_connection, json.dumps({"type": "tts_save", "wav_data": wav_data})) | |
pass | |
def on_silero_tts_after_audio_call(self, data_obj): | |
if self.is_enabled(False) and self.get_plugin_setting("voice_change_source") == CONSTANTS["SILERO_TTS"] and self.model_file_valid(self.get_plugin_setting("model_file")): | |
audio = data_obj['audio'] | |
sample_rate = 48000 | |
# tensor to numpy | |
audio_tmp = audio.detach().cpu().numpy() | |
# from float32 to int16 | |
audio_tmp = audio_tools.convert_audio_datatype_to_integer(audio_tmp) | |
# to bytes | |
buff = io.BytesIO() | |
write_wav(buff, sample_rate, audio_tmp) | |
audio_tmp = audio_tools.resample_audio(buff.read(), sample_rate, self.output_sample_rate, target_channels=-1, is_mono=True, dtype="int16") | |
audio_tmp = self.do_conversion(audio_tmp, sample_rate, bytes_dtype="int16") | |
# back to float32 | |
audio_tmp = audio_tools.convert_audio_datatype_to_float(audio_tmp) | |
# back to tensor | |
audio = torch.from_numpy(audio_tmp) | |
data_obj['audio'] = audio | |
return data_obj | |
return None | |
def on_plugin_tts_after_audio_call(self, data_obj): | |
if self.is_enabled(False) and self.get_plugin_setting("voice_change_source") == CONSTANTS["PLUGIN_TTS"] and self.model_file_valid(self.get_plugin_setting("model_file")): | |
audio = data_obj['audio'] | |
sample_rate = data_obj['sample_rate'] | |
audiodata = audio | |
if hasattr(audio, 'getvalue'): | |
audiodata = audio.getvalue() | |
loaded_audio = audio_tools.resample_audio(audiodata, sample_rate, self.output_sample_rate, target_channels=-1, is_mono=True, dtype="int16") | |
wav_rvc = self.do_conversion(loaded_audio, sample_rate, bytes_dtype="int16") | |
raw_data = audio_tools.numpy_array_to_wav_bytes(wav_rvc, sample_rate) | |
if hasattr(audio, 'getvalue'): | |
data_obj['audio'] = raw_data | |
else: | |
if hasattr(raw_data, 'getvalue'): | |
data_obj['audio'] = raw_data.getvalue() | |
else: | |
return None | |
return data_obj | |
return None | |
class GUIConfig: | |
def __init__(self) -> None: | |
self.pth_path: str = "" | |
self.index_path: str = "" | |
self.pitch: int = 0 | |
self.samplerate: int = 40000 | |
self.block_time: float = 1.0 # s | |
self.buffer_num: int = 1 | |
self.threhold: int = -60 | |
self.crossfade_time: float = 0.05 | |
self.extra_time: float = 2.5 | |
self.I_noise_reduce = False | |
self.O_noise_reduce = False | |
self.rms_mix_rate = 0.0 | |
self.index_rate = 0.3 | |
self.n_cpu = min(2, 6) | |
self.f0method = "rmvpe" | |
self.sg_input_device = "" | |
self.sg_output_device = "" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment