Skip to content

Instantly share code, notes, and snippets.

@Sharrnah
Last active June 27, 2024 11:48
Show Gist options
  • Save Sharrnah/5b19b4a7fa22d43c503c33d24c85e778 to your computer and use it in GitHub Desktop.
Save Sharrnah/5b19b4a7fa22d43c503c33d24c85e778 to your computer and use it in GitHub Desktop.
Bark TTS Whispering Tiger Plugin
# ============================================================
# Bark Text to Speech Plugin for Whispering Tiger
# V0.3.24
# Bark: https://github.com/suno-ai/bark
# Whispering Tiger: https://github.com/Sharrnah/whispering-ui
# ============================================================
#
import base64
import io
import json
import random
import subprocess
import sys
from importlib import util
import importlib
import pkgutil
from io import BytesIO
import numpy as np
import torch
import torchaudio
import Plugins
from scipy.io.wavfile import write as write_wav
from pathlib import Path
import os
import audio_tools
import settings
import websocket
import downloader
# from Models.STS import DeepFilterNet
# from df.enhance import enhance, init_df, load_audio
def load_module(package_dir, recursive=False):
package_dir = os.path.abspath(package_dir)
package_name = os.path.basename(package_dir)
# Add the parent directory of the package to sys.path
parent_dir = os.path.dirname(package_dir)
sys.path.insert(0, parent_dir)
# Load the package
spec = util.find_spec(package_name)
if spec is None:
raise ImportError(f"Cannot find package '{package_name}'")
module = util.module_from_spec(spec)
spec.loader.exec_module(module)
if recursive:
# Recursively load all submodules
for _, name, _ in pkgutil.walk_packages(module.__path__, module.__name__ + '.'):
importlib.import_module(name)
# Remove the parent directory from sys.path
sys.path.pop(0)
return module
bark_plugin_dir = Path(Path.cwd() / "Plugins" / "bark_plugin")
os.makedirs(bark_plugin_dir, exist_ok=True)
einops_dependency_module = {
"url": "https://files.pythonhosted.org/packages/68/24/b05452c986e8eff11f47e123a40798ae693f2fa1ed2f9546094997d2f6be/einops-0.6.1-py3-none-any.whl",
"sha256": "99149e46cc808956b174932fe563d920db4d6e5dadb8c6ecdaa7483b7ef7cfc3",
"path": "einops"
}
encodec_dependency = {
"url": "https://files.pythonhosted.org/packages/62/59/e47bbd0542d0e6f4ce9983d5eb458a01d4b42c81e5c410cb9e159b1061ae/encodec-0.1.1.tar.gz",
"sha256": "36dde98ccfe6c51a15576476cadfcb3b35a63507b8b8555abd69889a6fba6772",
"path": "encodec-0.1.1/encodec"
}
funcy_dependency_module = {
"url": "https://files.pythonhosted.org/packages/d5/08/c2409cb01d5368dcfedcbaffa7d044cc8957d57a9d0855244a5eb4709d30/funcy-2.0-py2.py3-none-any.whl",
"sha256": "53df23c8bb1651b12f095df764bfb057935d49537a56de211b098f4c79614bb0",
"path": "funcy"
}
bark_dependency_module = {
"url": "https://github.com/Sharrnah/bark-with-voice-clone/archive/de9ca6c466be358b895706f3ff6332ca8a8dcd4a.zip",
"sha256": "f7e5768adebd78c4eecfd384f52a8e80dcc9cf5be6e21264f16ce3ea2069a99a",
"path": "bark-with-voice-clone-de9ca6c466be358b895706f3ff6332ca8a8dcd4a",
}
vocos_dependency_module = {
"url": "https://files.pythonhosted.org/packages/98/b3/445694d1059688a76a997c61936fef938b7d90f905a00754b4a441e7fcbd/vocos-0.0.3-py3-none-any.whl",
"sha256": "0578b20b4ba57533a9d9b3e5ec3f81982f6fabd07ef02eb175fa9ee5da1e3cac",
"path": "vocos"
}
# used for optional audio normalization
pyloudnorm_dependency_module = {
"url": "https://files.pythonhosted.org/packages/58/f5/6724805521ab4e723a12182f92374031032aff28a8a89dc8505c52b79032/pyloudnorm-0.1.1-py3-none-any.whl",
"sha256": "d7f12ebdd097a464d87ce2878fc4d942f15f8233e26cc03f33fefa226f869a14",
"path": "pyloudnorm"
}
# pyloudnorm dependency future
pyloudnorm_future_dependency_module = {
"url": "https://files.pythonhosted.org/packages/9e/cf/95b17d4430942dbf291fa5411d8189374a2e6dba91d9ef077e7fb8e869bc/future-0.18.0-cp36-none-any.whl",
"sha256": "3f9c52f6c3f4e287bdd9b13de6cfd72373fb694aa391b5e511deef3db15d6a62",
"path": "future"
}
bark_voice_clone_tool = {
"urls": [
"https://eu2.contabostorage.com/bf1a89517e2643359087e5d8219c0c67:projects/voice-cloning-bark/barkVoiceClone_v0.0.2.zip",
"https://usc1.contabostorage.com/8fcf133c506f4e688c7ab9ad537b5c18:projects/voice-cloning-bark/barkVoiceClone_v0.0.2.zip"
],
"sha256": "a32afb7a2a9e4b706ecfeefb9a010e6e975096e7573ec8ab70fefbdfaabd4bd3",
"path": "barkVoiceClone",
}
class BarkTTSPlugin(Plugins.Base):
bark_module = None
sample_rate = 24000
pyloudnorm_module = None
encodec = None
hubert_module = None
hubert_model = None
hubert_tokenizer = None
vocos = None
#split_character_goal_length = 110
#split_character_max_length = 170
#split_character_jitter = 0
audio_enhancer = None
def get_plugin(self, class_name):
for plugin_inst in Plugins.plugins:
if plugin_inst.__class__.__name__ == class_name:
return plugin_inst # return plugin instance
return None
def numpy_array_to_wav_bytes(self, audio: np.ndarray, sample_rate: int = 22050) -> BytesIO:
buff = io.BytesIO()
write_wav(buff, sample_rate, audio)
buff.seek(0)
return buff
# Function to calculate LUFS
def calculate_lufs(self, audio, sample_rate):
meter = self.pyloudnorm_module.Meter(sample_rate) # create BS.1770 meter
loudness = meter.integrated_loudness(audio)
return loudness
# Function to normalize the audio based on LUFS
def normalize_audio_lufs(self, audio, sample_rate, lower_threshold=-24.0, upper_threshold=-16.0, gain_factor=2.0):
lufs = self.calculate_lufs(audio, sample_rate)
print(f"LUFS: {lufs}")
# If LUFS is lower than the lower threshold, increase volume
if lufs < lower_threshold:
print(f"audio is too quiet, increasing volume")
gain = (lower_threshold - lufs) / gain_factor
audio = audio * np.power(10.0, gain/20.0)
# If LUFS is higher than the upper threshold, decrease volume
elif lufs > upper_threshold:
print(f"audio is too loud, decreasing volume")
gain = (upper_threshold - lufs) * gain_factor
audio = audio * np.power(10.0, gain/20.0)
# Limit audio values to [-1, 1] (this is important to avoid clipping when converting to 16-bit PCM)
audio = np.clip(audio, -1, 1)
return audio, lufs
def trim_silence(self, audio, silence_threshold=0.01):
# Compute absolute value of audio waveform
audio_abs = np.abs(audio)
# Find the first index where the absolute value of the waveform exceeds the threshold
start_index = np.argmax(audio_abs > silence_threshold)
# Reverse the audio waveform and do the same thing to find the end index
end_index = len(audio) - np.argmax(audio_abs[::-1] > silence_threshold)
# If start_index is not 0, some audio at the start has been trimmed
if start_index > 0:
print(f"Trimmed {start_index} samples from the start of the audio")
# If end_index is not the length of the audio, some audio at the end has been trimmed
if end_index < len(audio):
print(f"Trimmed {len(audio) - end_index} samples from the end of the audio")
# Return the trimmed audio
return audio[start_index:end_index]
def remove_silence_parts(self, audio, sample_rate, silence_threshold=0.01, max_silence_length=1.1, keep_silence_length=0.06):
audio_abs = np.abs(audio)
above_threshold = audio_abs > silence_threshold
# Convert length parameters to number of samples
max_silence_samples = int(max_silence_length * sample_rate)
keep_silence_samples = int(keep_silence_length * sample_rate)
last_silence_end = 0
silence_start = None
chunks = []
for i, sample in enumerate(above_threshold):
if not sample:
if silence_start is None:
silence_start = i
else:
if silence_start is not None:
silence_duration = i - silence_start
if silence_duration > max_silence_samples:
# Subtract keep_silence_samples from the start and add it to the end
start = max(last_silence_end - keep_silence_samples, 0)
end = min(silence_start + keep_silence_samples, len(audio))
chunks.append(audio[start:end])
last_silence_end = i
silence_start = None
# Append the final chunk of audio after the last silence
if last_silence_end < len(audio):
start = max(last_silence_end - keep_silence_samples, 0)
end = len(audio)
chunks.append(audio[start:end])
if len(chunks) == 0:
print("No non-silent sections found in audio.")
return np.array([])
else:
print(f"found {len(chunks)} non-silent sections in audio")
return np.concatenate(chunks)
def _load_vocos_model(self):
# load the vocos module (optional vocoder)
if self.get_plugin_setting("use_vocos", True) and self.vocos is None:
if not Path(bark_plugin_dir / vocos_dependency_module["path"] / "__init__.py").is_file():
downloader.download_extract([vocos_dependency_module["url"]],
str(bark_plugin_dir.resolve()),
vocos_dependency_module["sha256"],
alt_fallback=True,
fallback_extract_func=downloader.extract_zip,
fallback_extract_func_args=(
str(bark_plugin_dir / os.path.basename(vocos_dependency_module["url"])),
str(bark_plugin_dir.resolve()),
),
title="Bark - vocos module", extract_format="zip")
vocos_module = load_module(str(Path(bark_plugin_dir / vocos_dependency_module["path"]).resolve()))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.vocos = vocos_module.Vocos.from_pretrained("charactr/vocos-encodec-24khz").to(device)
def init(self):
# prepare all possible settings
self.init_plugin_settings(
{
#"history_prompt": None,
"history_prompt": {"type": "file_open", "accept": ".npz", "value": ""},
"prompt_wrap": "##",
"seed": -1,
"long_text": False,
"long_text_stable_frequency": {"type": "slider", "min": 0, "max": 10, "step": 1, "value": 1},
"long_text_split_pause": {"type": "slider", "min": 0, "max": 5, "step": 0.01, "value": 0},
"split_character_goal_length": {"type": "slider", "min": 1, "max": 300, "step": 1, "value": 130},
"split_character_max_length": {"type": "slider", "min": 1, "max": 300, "step": 1, "value": 170},
"split_character_jitter": {"type": "slider", "min": 0, "max": 100, "step": 1, "value": 0},
"use_previous_history_for_last_segment": False,
"long_text_stable_frequency_info": {"label": "stable_frequency\n0 = each continuation uses the history prompt of the previous.\n1 = each generation uses same history prompt.\n2+ = each *n generation uses the first history prompt.", "type": "label", "style": "left"},
"use_offload_cpu": True,
"use_small_models": True,
"use_half_precision": False,
"use_mps": False,
"use_gpu": True,
"use_vocos": True,
"use_vocos_on_result": False,
"temperature_text": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "value": 0.7},
"temperature_waveform": {"type": "slider", "min": 0.0, "max": 1.0, "step": 0.01, "value": 0.7},
"min_eos_p": {"type": "slider", "min": 0.01, "max": 1.0, "step": 0.01, "value": 0.05},
"min_eos_p_info": {"label": "min_eos_p - min. end of sentence probability (default: 0.05).\nLower = speech generation ends earlier.\nCan reduce additional words at the end.", "type": "label", "style": "left"},
"write_last_history_prompt": False,
"write_last_history_prompt_file": {"type": "file_save", "accept": ".npz", "value": "bark_prompts/last_prompt.npz"},
"batch_size": 1,
"batch_prompts": {"type": "textarea", "rows": 5, "value": ""},
"batch_folder": {"type": "folder_open", "accept": "", "value": "bark_prompts/multi_generations"},
"zz_batch_button": {"label": "Batch Generate", "type": "button", "style": "primary"},
"clone_voice_audio_filepath": {"type": "file_open", "accept": ".wav", "value": "bark_clone_voice/clone_voice.wav"},
"clone_voice_prompt": "",
"zz_clone_voice_button": {"label": "Start Voice Clone", "type": "button", "style": "primary"},
"zz_clone_voice_better_button": {"label": "Start Voice Clone", "type": "button", "style": "primary"},
"zzz_clone_voice_better_info": {"label": "To get a more stable voice, use the cloned *.npz file as history prompt\nand use Batch Processing with that to find a similar voice.", "type": "label", "style": "center"},
"normalize": True,
"normalize_lower_threshold": -24.0,
"normalize_upper_threshold": -16.0,
"normalize_gain_factor": 1.3,
# "audio_denoise": True,
"trim_silence": True,
"remove_silence_parts": True,
"silence_threshold": {"type": "slider", "min": 0.0, "max": 2.0, "step": 0.01, "value": 0.03},
"max_silence_length": {"type": "slider", "min": 0.0, "max": 3.0, "step": 0.1, "value": 0.8},
"keep_silence_length": {"type": "slider", "min": 0.0, "max": 3.0, "step": 0.01, "value": 0.20},
"vocos_file": {"type": "file_open", "accept": ".wav",
"value": "bark_clone_voice/clone_voice.wav"},
"vocos_file_button": {"label": "apply Vocos to file", "type": "button", "style": "default"},
},
settings_groups={
"General": ["history_prompt", "prompt_wrap", "temperature_text", "temperature_waveform", "min_eos_p", "min_eos_p_info", "seed"],
"Long Text Gen.": ["long_text", "long_text_stable_frequency", "long_text_stable_frequency_info", "long_text_split_pause", "split_character_goal_length", "split_character_max_length", "split_character_jitter", "use_previous_history_for_last_segment"],
"History Prompt": ["write_last_history_prompt", "write_last_history_prompt_file"],
"Voice Cloning": ["clone_voice_audio_filepath", "clone_voice_prompt", "zz_clone_voice_button"],
"Model Settings": ["use_offload_cpu", "use_small_models", "use_mps", "use_gpu", "use_vocos", "use_vocos_on_result", "use_half_precision"],
"Voice Cloning Better": ["clone_voice_audio_filepath", "zz_clone_voice_better_button", "zzz_clone_voice_better_info"],
"Batch Processing": ["batch_prompts", "batch_size", "batch_folder", "zz_batch_button"],
"Audio Processing": ["normalize", "normalize_lower_threshold", "normalize_upper_threshold", "normalize_gain_factor", "vocos_file_button", "vocos_file"],
"Audio Processing 2": ["trim_silence", "remove_silence_parts", "silence_threshold",
"max_silence_length", "keep_silence_length"],
}
)
if self.is_enabled(False):
# disable default tts engine
settings.SetOption("tts_enabled", False)
# torch backend settings
torch.backends.cuda.matmul.allow_tf32 = True
seed = self.get_plugin_setting("seed")
if seed is not None and seed >= 0:
# make pytorch fully deterministic (disabling CuDNN benchmarking can slow down computations)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
os.environ["SUNO_OFFLOAD_CPU"] = str(self.get_plugin_setting("use_offload_cpu", True))
os.environ["SUNO_ENABLE_MPS"] = str(self.get_plugin_setting("use_mps", False))
use_gpu = self.get_plugin_setting("use_gpu", True)
websocket.set_loading_state("bark_plugin_loading", True)
# load the einops module
einops_path = Path(bark_plugin_dir / einops_dependency_module["path"])
if not Path(einops_path / "__init__.py").is_file():
downloader.download_extract([einops_dependency_module["url"]],
str(bark_plugin_dir.resolve()),
einops_dependency_module["sha256"],
alt_fallback=True,
fallback_extract_func=downloader.extract_zip,
fallback_extract_func_args=(
str(bark_plugin_dir / os.path.basename(einops_dependency_module["url"])),
str(bark_plugin_dir.resolve()),
),
title="Bark - einops module", extract_format="zip")
einops = load_module(str(Path(bark_plugin_dir / einops_dependency_module["path"]).resolve()))
# load the encodec module
encodec_path = Path(bark_plugin_dir / encodec_dependency["path"])
if not Path(encodec_path / "__init__.py").is_file():
downloader.download_extract([encodec_dependency["url"]],
str(bark_plugin_dir.resolve()),
encodec_dependency["sha256"],
alt_fallback=True,
fallback_extract_func=downloader.extract_tar_gz,
fallback_extract_func_args=(
str(bark_plugin_dir / os.path.basename(encodec_dependency["url"])),
str(bark_plugin_dir.resolve()),
),
title="Bark - encodec module", extract_format="tar.gz")
self.encodec = load_module(str(Path(bark_plugin_dir / encodec_dependency["path"]).resolve()))
# load the funcy module
if not Path(bark_plugin_dir / funcy_dependency_module["path"] / "__init__.py").is_file():
downloader.download_extract([funcy_dependency_module["url"]],
str(bark_plugin_dir.resolve()),
funcy_dependency_module["sha256"],
alt_fallback=True,
fallback_extract_func=downloader.extract_zip,
fallback_extract_func_args=(
str(bark_plugin_dir / os.path.basename(funcy_dependency_module["url"])),
str(bark_plugin_dir.resolve()),
),
title="Bark - funcy module", extract_format="zip")
funcy = load_module(str(Path(bark_plugin_dir / funcy_dependency_module["path"]).resolve()))
# load the vocos module (optional vocoder)
self._load_vocos_model()
# load the bark module
if not Path(bark_plugin_dir / bark_dependency_module["path"] / "bark" / "__init__.py").is_file():
downloader.download_extract([bark_dependency_module["url"]],
str(bark_plugin_dir.resolve()),
bark_dependency_module["sha256"],
alt_fallback=True,
fallback_extract_func=downloader.extract_zip,
fallback_extract_func_args=(
str(bark_plugin_dir / os.path.basename(bark_dependency_module["url"])),
str(bark_plugin_dir.resolve()),
),
title="Bark - bark module", extract_format="zip")
self.bark_module = load_module(str(Path(bark_plugin_dir / bark_dependency_module["path"] / "bark").resolve()))
# load the future module
future_path = Path(bark_plugin_dir / pyloudnorm_future_dependency_module["path"])
if not Path(future_path / "__init__.py").is_file():
downloader.download_extract([pyloudnorm_future_dependency_module["url"]],
str(bark_plugin_dir.resolve()),
pyloudnorm_future_dependency_module["sha256"],
alt_fallback=True,
fallback_extract_func=downloader.extract_zip,
fallback_extract_func_args=(
str(bark_plugin_dir / os.path.basename(pyloudnorm_future_dependency_module["url"])),
str(bark_plugin_dir.resolve()),
),
title="Bark - pyloudnorm future module", extract_format="zip")
future = load_module(str(Path(bark_plugin_dir / pyloudnorm_future_dependency_module["path"]).resolve()))
# load the audio normalization module
if not Path(bark_plugin_dir / pyloudnorm_dependency_module["path"] / "__init__.py").is_file():
downloader.download_extract([pyloudnorm_dependency_module["url"]],
str(bark_plugin_dir.resolve()),
pyloudnorm_dependency_module["sha256"],
alt_fallback=True,
fallback_extract_func=downloader.extract_zip,
fallback_extract_func_args=(
str(bark_plugin_dir / os.path.basename(pyloudnorm_dependency_module["url"])),
str(bark_plugin_dir.resolve()),
),
title="Bark - pyloudnorm module", extract_format="zip")
self.pyloudnorm_module = load_module(str(Path(bark_plugin_dir / pyloudnorm_dependency_module["path"]).resolve()))
# download and load all models
use_small_models = self.get_plugin_setting("use_small_models", True)
print("download and load all bark models", ("small" if use_small_models else "large"))
self.bark_module.preload_models(
text_use_gpu=use_gpu,
text_use_small=use_small_models,
coarse_use_gpu=use_gpu,
coarse_use_small=use_small_models,
fine_use_gpu=use_gpu,
fine_use_small=use_small_models,
codec_use_gpu=use_gpu,
path=bark_plugin_dir / "bark_models",
)
if use_gpu and self.get_plugin_setting("use_half_precision"):
self.bark_module.models_to(torch.float16)
print("bark models loaded")
websocket.set_loading_state("bark_plugin_loading", False)
pass
def set_seed(self, seed: int):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
def audio_processing(self, audio_data, skip_infinity_lufs=True):
# Normalize audio
if self.get_plugin_setting("normalize", True):
lower_threshold = self.get_plugin_setting("normalize_lower_threshold", -24.0)
upper_threshold = self.get_plugin_setting("normalize_upper_threshold", -16.0)
gain_factor = self.get_plugin_setting("normalize_gain_factor", 1.3)
audio_data, lufs = self.normalize_audio_lufs(audio_data, self.sample_rate, lower_threshold, upper_threshold, gain_factor)
if lufs == float('-inf') and skip_infinity_lufs:
print("Audio seems to be unusable. skipping")
return None
# Trim silence
if self.get_plugin_setting("trim_silence", True):
audio_data = self.trim_silence(audio_data)
# Remove silence parts
if self.get_plugin_setting("remove_silence_parts", False):
silence_threshold = self.get_plugin_setting("silence_threshold")
keep_silence_length = self.get_plugin_setting("keep_silence_length")
max_silence_length = self.get_plugin_setting("max_silence_length")
audio_data = self.remove_silence_parts(audio_data, self.sample_rate, silence_threshold=silence_threshold,
keep_silence_length=keep_silence_length,
max_silence_length=max_silence_length)
# return early if no audio data
if len(audio_data) == 0:
return None
return audio_data
def chunk_up_text(self, text_prompt, split_character_goal_length, split_character_max_length, split_character_jitter=0):
if split_character_jitter > 0:
split_character_goal_length = random.randint(split_character_goal_length - split_character_jitter, split_character_goal_length + split_character_jitter)
split_character_max_length = random.randint(split_character_max_length - split_character_jitter, split_character_max_length + split_character_jitter)
audio_segments = self.bark_module.split_general_purpose(text_prompt, split_character_goal_length=split_character_goal_length, split_character_max_length=split_character_max_length)
print(f"Splitting long text aiming for {split_character_goal_length} chars max {split_character_max_length}")
return audio_segments
def generate_segments_vocos(self, text, text_temp=0.7, waveform_temp=0.7, min_eos_p=0.05,
history_prompt=None):
semantic_tokens = self.bark_module.text_to_semantic(text, history_prompt=history_prompt,
temp=text_temp, min_eos_p=min_eos_p, silent=False, )
history_prompt_data = self.bark_module.semantic_to_audio_tokens(
semantic_tokens, history_prompt=history_prompt, temp=waveform_temp, silent=False,
output_full=True,
)
# reconstruct with Vocos
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
audio_tokens_torch = torch.from_numpy(history_prompt_data["fine_prompt"]).to(device)
features = self.vocos.codes_to_features(audio_tokens_torch)
audio_data_np_array = self.vocos.decode(features,
bandwidth_id=torch.tensor([2], device=device)).cpu().numpy() # 6 kbps
audio_data_np_array = audio_tools.resample_audio(audio_data_np_array, 24000, 24000, target_channels=-1,
is_mono=True, dtype="float32")
return history_prompt_data, audio_data_np_array
def apply_vocos_on_audio(self, audio_data):
# check if audio_data is bytes
wav_file = audio_data
if isinstance(audio_data, bytes):
wav_file = io.BytesIO(audio_data)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
y, sr = torchaudio.load(wav_file)
if y.size(0) > 1: # mix to mono
y = y.mean(dim=0, keepdim=True)
y = torchaudio.functional.resample(y, orig_freq=sr, new_freq=24000)
y = y.to(device)
bandwidth_id = torch.tensor([2]).to(device) # 6 kbps
y_hat = self.vocos(y, bandwidth_id=bandwidth_id)
audio_data_np_array = audio_tools.resample_audio(y_hat, 24000, 24000, target_channels=-1,
is_mono=True, dtype="float32")
audio_data_16bit = np.int16(audio_data_np_array * 32767) # Convert to 16-bit PCM
buff = io.BytesIO()
write_wav(buff, self.sample_rate, audio_data_16bit)
buff.seek(0)
return buff
def generate_tts(self, text, text_temp=0.7, waveform_temp=0.7, write_last_history_prompt=False, last_hisory_prompt_file=None, prompt_wrap="##", skip_infinity_lufs=True, long_text=False, long_text_stable_frequency=0, long_text_split_pause=0.0):
history_prompt = self.get_plugin_setting("history_prompt", None)
if history_prompt == "":
history_prompt = None
worker_seed = self.get_plugin_setting("seed", -1)
if worker_seed is None or worker_seed <= -1:
worker_seed = random.randint(0, 2**32 - 1)
print("Bark: using seed %d" % worker_seed)
self.set_seed(worker_seed)
min_eos_p = self.get_plugin_setting("min_eos_p")
use_vocos = self.get_plugin_setting("use_vocos", True) and self.vocos is not None
self._load_vocos_model()
if long_text:
audio_arr_segments = []
estimated_time = self.bark_module.estimate_spoken_time(text)
print(f"estimated_time: {estimated_time}")
split_character_goal_length = self.get_plugin_setting("split_character_goal_length")
split_character_max_length = self.get_plugin_setting("split_character_max_length")
split_character_jitter = self.get_plugin_setting("split_character_jitter")
use_previous_history_for_last_segment = self.get_plugin_setting("use_previous_history_for_last_segment")
audio_segments = self.chunk_up_text(text,
split_character_goal_length=split_character_goal_length,
split_character_max_length=split_character_max_length,
split_character_jitter=split_character_jitter
)
print(f"audio_segments: {len(audio_segments)}")
history_prompt_for_next_segment = history_prompt
for i, segment_text in enumerate(audio_segments):
estimated_time = self.bark_module.estimate_spoken_time(segment_text)
print(f"segment: {i}")
print(f"estimated_time: {estimated_time}")
segment_text = prompt_wrap.replace("##", segment_text)
if not use_vocos:
history_prompt_data, audio_data_np_array = self.bark_module.generate_audio(segment_text,
history_prompt=history_prompt_for_next_segment,
text_temp=text_temp,
waveform_temp=waveform_temp,
min_eos_p=min_eos_p,
output_full=True
)
else:
history_prompt_data, audio_data_np_array = self.generate_segments_vocos(segment_text,
text_temp=text_temp,
waveform_temp=waveform_temp,
min_eos_p=min_eos_p,
history_prompt=history_prompt_for_next_segment)
audio_data_np_array = self.audio_processing(audio_data_np_array, skip_infinity_lufs=skip_infinity_lufs)
audio_arr_segments.append(audio_data_np_array)
# set history_prompt when it's initially None
if history_prompt is None and history_prompt_data is not None:
history_prompt = history_prompt_data
# Check if it's the last segment and the setting is enabled
if use_previous_history_for_last_segment and i == len(audio_segments) - 1:
history_prompt_for_next_segment = history_prompt
# use history prompt in configured frequency
elif long_text_stable_frequency > 0 and (i + 1) % long_text_stable_frequency == 0:
history_prompt_for_next_segment = history_prompt
else:
history_prompt_for_next_segment = history_prompt_data
# put all audio together
if len(audio_arr_segments) > 0 and long_text_split_pause > 0.0:
audio_with_pauses = []
pause_samples = np.zeros(int(long_text_split_pause * self.sample_rate))
# Iterate over each audio segment
for segment in audio_arr_segments:
# Add the audio segment
audio_with_pauses.append(segment)
# Add a pause
audio_with_pauses.append(pause_samples)
# Remove the last added pause as it's not needed after the last segment
audio_arr_segments = audio_with_pauses[:-1]
# put all audio together
audio_data_np_array = np.concatenate(audio_arr_segments)
else:
text = prompt_wrap.replace("##", text)
if write_last_history_prompt:
if not use_vocos:
history_prompt_data, audio_data_np_array = self.bark_module.generate_audio(text,
history_prompt=history_prompt,
text_temp=text_temp,
waveform_temp=waveform_temp,
min_eos_p=min_eos_p,
output_full=write_last_history_prompt)
else:
# vocos_output = torchaudio.functional.resample(vocos_output, orig_freq=24000, new_freq=44100).cpu()
history_prompt_data, audio_data_np_array = self.generate_segments_vocos(text,
text_temp=text_temp,
waveform_temp=waveform_temp,
min_eos_p=min_eos_p,
history_prompt=history_prompt)
self.bark_module.save_as_prompt(last_hisory_prompt_file, history_prompt_data)
else:
if not use_vocos:
audio_data_np_array = self.bark_module.generate_audio(text,
history_prompt=history_prompt,
text_temp=text_temp,
waveform_temp=waveform_temp,
min_eos_p=min_eos_p)
else:
_, audio_data_np_array = self.generate_segments_vocos(text,
text_temp=text_temp,
waveform_temp=waveform_temp,
min_eos_p=min_eos_p,
history_prompt=history_prompt)
audio_data_np_array = self.audio_processing(audio_data_np_array, skip_infinity_lufs=skip_infinity_lufs)
#if self.get_plugin_setting("audio_denoise", False):
# if self.audio_enhancer is None:
# self.audio_enhancer = DeepFilterNet.DeepFilterNet(post_filter=True)
# print("denoising audio")
# #audio_data_np_array = np.asarray(audio_data_np_array, dtype=np.float32)
# #audio_data_16bit = self.audio_enhancer.enhance_audio(audio_data_np_array, sample_rate=44100, output_sample_rate=self.sample_rate)
# audio_data_16bit = self.audio_enhancer.simple_enhance(audio_data_np_array, audio_sample_rate=self.sample_rate, output_sample_rate=self.sample_rate)
#else:
#if self.get_plugin_setting("audio_denoise", False):
# if self.audio_enhancer is None:
# self.audio_enhancer = DeepFilterNet.DeepFilterNet(post_filter=True)
# audio_data_np_array = self.audio_enhancer.simple_enhance(audio_data_np_array, audio_sample_rate=self.sample_rate, output_sample_rate=self.sample_rate)
audio_data_16bit = np.int16(audio_data_np_array * 32767) # Convert to 16-bit PCM
buff = io.BytesIO()
write_wav(buff, self.sample_rate, audio_data_16bit)
buff.seek(0)
if self.get_plugin_setting("use_vocos_on_result"):
print("applying vocos on result")
buff = self.apply_vocos_on_audio(buff)
# call custom plugin event method
plugin_audio = Plugins.plugin_custom_event_call('plugin_tts_after_audio', {'audio': buff, 'sample_rate': self.sample_rate})
if plugin_audio is not None and 'audio' in plugin_audio and plugin_audio['audio'] is not None:
buff = plugin_audio['audio']
return buff.getvalue()
def timer(self):
pass
def play_audio_on_device(self, wav, audio_device, source_sample_rate=24000, audio_device_channel_num=2, target_channels=2, is_mono=True, dtype="int16"):
secondary_audio_device = None
if settings.GetOption("tts_use_secondary_playback") and (
(settings.GetOption("tts_secondary_playback_device") == -1 and audio_device != settings.GetOption("device_default_out_index")) or
(settings.GetOption("tts_secondary_playback_device") > -1 and audio_device != settings.GetOption("tts_secondary_playback_device"))):
secondary_audio_device = settings.GetOption("tts_secondary_playback_device")
if secondary_audio_device == -1:
secondary_audio_device = settings.GetOption("device_default_out_index")
audio_tools.play_audio(wav, audio_device,
source_sample_rate=source_sample_rate,
audio_device_channel_num=audio_device_channel_num,
target_channels=target_channels,
is_mono=is_mono,
dtype=dtype,
secondary_device=secondary_audio_device, tag="tts")
def stt(self, text, result_obj):
if self.is_enabled(False) and settings.GetOption("tts_answer") and text.strip() != "":
prompt_wrap = self.get_plugin_setting("prompt_wrap", "##")
text_temp = self.get_plugin_setting("temperature_text")
waveform_temp = self.get_plugin_setting("temperature_waveform")
long_text = self.get_plugin_setting("long_text", False)
long_text_stable_frequency = self.get_plugin_setting("long_text_stable_frequency")
long_text_split_pause = self.get_plugin_setting("long_text_split_pause")
write_last_history_prompt = self.get_plugin_setting("write_last_history_prompt", False)
write_history_prompt_file = self.get_plugin_setting("write_last_history_prompt_file", "bark_prompts/last_prompt.npz")
if write_last_history_prompt:
os.makedirs(os.path.dirname(write_history_prompt_file), exist_ok=True)
audio_device = settings.GetOption("device_out_index")
if audio_device is None or audio_device == -1:
audio_device = settings.GetOption("device_default_out_index")
wav = self.generate_tts(text.strip(),
text_temp=text_temp,
waveform_temp=waveform_temp,
write_last_history_prompt=write_last_history_prompt,
last_hisory_prompt_file=write_history_prompt_file,
prompt_wrap=prompt_wrap,
long_text=long_text, long_text_stable_frequency=long_text_stable_frequency, long_text_split_pause=long_text_split_pause,)
if wav is not None:
self.play_audio_on_device(wav, audio_device,
source_sample_rate=self.sample_rate,
audio_device_channel_num=2,
target_channels=2,
is_mono=True,
dtype="int16"
)
return
def clone_voice(self):
clone_voice_prompt = ""
if self.get_plugin_setting("clone_voice_prompt", "") != "":
clone_voice_prompt = self.get_plugin_setting("clone_voice_prompt", "")
if clone_voice_prompt == "":
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "Please enter a prompt for the voice cloning."}))
return
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "Voice Cloning started..."}))
clone_voice_audio_filepath = self.get_plugin_setting("clone_voice_audio_filepath", "bark_clone_voice/clone_voice.wav")
# get the directory of the clone audio file string
os.makedirs(os.path.dirname(clone_voice_audio_filepath), exist_ok=True)
clone_history_prompt_save = os.path.splitext(clone_voice_audio_filepath)[0]
# check if clone_voice_audio_filepath is a file and exists
if not os.path.isfile(clone_voice_audio_filepath):
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "No clone audio file found. Please record a clone audio file first (between 4 - 8 seconds."}))
return
self.bark_module.clone_voice(
audio_filepath=clone_voice_audio_filepath,
text=clone_voice_prompt,
dest_filename=clone_history_prompt_save,
)
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "Voice Cloning finished.\n\nLook in folder '"+clone_history_prompt_save+"'."}))
def clone_voice_better(self):
use_offload_cpu = self.get_plugin_setting("use_offload_cpu", True)
use_gpu = self.get_plugin_setting("use_gpu", True)
use_small_models = self.get_plugin_setting("use_small_models", True)
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "Better Voice Cloning started..."}))
if not Path(bark_plugin_dir / bark_voice_clone_tool["path"] / "barkVoiceClone.exe").is_file():
# download from random url in list
voice_clone_url = random.choice(bark_voice_clone_tool["urls"])
downloader.download_extract([voice_clone_url],
str(bark_plugin_dir.resolve()),
bark_voice_clone_tool["sha256"],
alt_fallback=True,
fallback_extract_func=downloader.extract_zip,
fallback_extract_func_args=(
str(bark_plugin_dir / os.path.basename(voice_clone_url)),
str(bark_plugin_dir.resolve()),
),
title="Bark - voiceclone app", extract_format="zip")
if Path(bark_plugin_dir / bark_voice_clone_tool["path"] / "barkVoiceClone.exe").is_file():
clone_voice_audio_filepath = self.get_plugin_setting("clone_voice_audio_filepath", "bark_clone_voice/clone_voice.wav")
os.makedirs(os.path.dirname(clone_voice_audio_filepath), exist_ok=True)
clone_history_prompt_save = os.path.splitext(clone_voice_audio_filepath)[0]+".npz"
# run command line tool with parameters
try:
process_arguments = [str(bark_plugin_dir / bark_voice_clone_tool["path"] / "barkVoiceClone.exe"), "--audio_file", clone_voice_audio_filepath, "--npz_file", clone_history_prompt_save]
if use_offload_cpu:
process_arguments.append("--offload_cpu")
if use_gpu:
process_arguments.append("--use_gpu")
if use_small_models:
process_arguments.append("--small_model")
# add min_eos_p setting
process_arguments.append("--min_eos_p")
process_arguments.append(str(self.get_plugin_setting("min_eos_p")))
subprocess.run(process_arguments, check=True)
except subprocess.CalledProcessError as e:
websocket.BroadcastMessage(json.dumps({"type": "error", "data": "Better Voice Cloning failed: "+str(e)}))
return
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "Better Voice Cloning finished.\n\nLook in folder '"+clone_history_prompt_save+"'."}))
else:
websocket.BroadcastMessage(json.dumps({"type": "error", "data": "Better Voice Cloning failed: barkVoiceClone.exe not found."}))
return
def batch_generate(self):
# generate multiple voices in a batch
write_last_history_prompt = True
prompt_wrap = "##"
text_temp = self.get_plugin_setting("temperature_text")
waveform_temp = self.get_plugin_setting("temperature_waveform")
batch_prompts = self.get_plugin_setting("batch_prompts")
batch_size = self.get_plugin_setting("batch_size")
batch_folder = self.get_plugin_setting("batch_folder")
os.makedirs(batch_folder, exist_ok=True)
text_list = batch_prompts.split("\n")
# remove empty lines
text_list = [x for x in text_list if x.strip() != ""]
if batch_size > 0:
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "Batch Generating " + str(batch_size * len(text_list)) + " audios...\n("+str(batch_size)+" per prompt)\nstarted.\n\nlook for them in '"+batch_folder+"' directory."}))
promt_num = 0
for text_line in text_list:
if text_line.strip() != "":
prmpt_dir = batch_folder + "/prompt-"+str(promt_num)
os.makedirs(prmpt_dir, exist_ok=True)
# write prompt text to file
with open(prmpt_dir + "/prompt.txt", "w", encoding='utf-8') as f:
f.write(text_line.strip())
for i in range(batch_size):
file_name = prmpt_dir + "/" + str(i)
# generate wav and history prompt
wav = self.generate_tts(text_line.strip(),
text_temp=text_temp,
waveform_temp=waveform_temp,
write_last_history_prompt=write_last_history_prompt,
last_hisory_prompt_file=file_name + ".npz",
prompt_wrap=prompt_wrap,
skip_infinity_lufs=False,)
# write wav to file
if wav is not None:
# write wav to file
wav_file_name = file_name + ".wav"
with open(wav_file_name, "wb") as f:
f.write(wav)
promt_num += 1
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "Batch Generating finished.\n\nlook for them in '"+batch_folder+"' directory."}))
else:
error_msg = "Invalid batch size. must be number of runs per line"
print(error_msg)
websocket.BroadcastMessage(json.dumps({"type": "error", "data": error_msg}))
def tts(self, text, device_index, websocket_connection=None, download=False):
if self.is_enabled(False):
prompt_wrap = self.get_plugin_setting("prompt_wrap", "##")
text_temp = self.get_plugin_setting("temperature_text")
waveform_temp = self.get_plugin_setting("temperature_waveform")
long_text = self.get_plugin_setting("long_text", False)
long_text_stable_frequency = self.get_plugin_setting("long_text_stable_frequency")
long_text_split_pause = self.get_plugin_setting("long_text_split_pause")
write_last_history_prompt = self.get_plugin_setting("write_last_history_prompt", False)
last_hisory_prompt_file = self.get_plugin_setting("write_last_history_prompt_file", "bark_prompts/last_prompt.npz")
if write_last_history_prompt:
os.makedirs(os.path.dirname(last_hisory_prompt_file), exist_ok=True)
if device_index is None or device_index == -1:
device_index = settings.GetOption("device_default_out_index")
wav = self.generate_tts(text.strip(),
text_temp=text_temp,
waveform_temp=waveform_temp,
write_last_history_prompt=write_last_history_prompt,
last_hisory_prompt_file=last_hisory_prompt_file,
prompt_wrap=prompt_wrap,
long_text=long_text, long_text_stable_frequency=long_text_stable_frequency, long_text_split_pause=long_text_split_pause,)
if wav is not None:
if download and websocket_connection is not None:
wav_data = base64.b64encode(wav).decode('utf-8')
websocket.AnswerMessage(websocket_connection,
json.dumps({"type": "tts_save", "wav_data": wav_data}))
else:
self.play_audio_on_device(wav, device_index,
source_sample_rate=self.sample_rate,
audio_device_channel_num=2,
target_channels=2,
is_mono=True,
dtype="int16"
)
return
def on_event_received(self, message, websocket_connection=None):
if self.is_enabled(False):
if "type" not in message:
return
if message["type"] == "plugin_button_press":
if message["value"] == "zz_clone_voice_better_button":
self.clone_voice_better()
if message["value"] == "zz_clone_voice_button":
self.clone_voice()
if message["value"] == "zz_batch_button":
self.batch_generate()
if message["value"] == "vocos_file_button":
wav_file = self.get_plugin_setting("vocos_file")
wav = self.apply_vocos_on_audio(wav_file).getvalue()
if wav is not None and websocket.UI_CONNECTED["websocket"] is not None:
wav_data = base64.b64encode(wav).decode('utf-8')
websocket.AnswerMessage(websocket.UI_CONNECTED["websocket"],
json.dumps({"type": "tts_save", "wav_data": wav_data}))
else:
websocket.BroadcastMessage(json.dumps({"type": "info", "data": "Plugin is disabled."}))
def on_enable(self):
self.init()
pass
def on_disable(self):
pass
@Sharrnah
Copy link
Author

Sharrnah commented Jul 14, 2023

bark-sample2.mp4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment