Created
May 12, 2025 10:19
-
-
Save tin2tin/6bcb861931fb8110ebfdbe96530fc6d5 to your computer and use it in GitHub Desktop.
f5-tts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ruff: noqa: F401, F821 | |
# Above allows ruff to ignore F401: unused import (some imports are conditional) | |
# F821: undefined name (bpy is defined when run in Blender) | |
bl_info = { | |
"name": "F5/E2 TTS Synthesis", # Restore original name | |
"author": "Based on SWivid/F5-TTS Gradio Demo (Modified)", | |
"version": (1, 0, 12), # Increment version for fixes | |
"blender": (4, 1, 0), # Adjust Blender version if needed (current script aims for >=4.1) | |
"location": "Sequence Editor > Sidebar > F5/E2 TTS", | |
"description": "Synthesize speech using F5-TTS or E2-TTS models from text and reference audio. (Background process)", | |
"category": "Sequencer", | |
} | |
# Standard library imports | |
import gc | |
import json | |
import tempfile | |
import os | |
import threading # Added for background processing | |
import traceback # Added for better error reporting | |
from datetime import datetime # Correctly import datetime module | |
import queue # Added for thread-safe communication | |
# Third-party imports - These must be installed in Blender's Python environment! | |
# Example (adjust path): C:\Program Files\Blender Foundation\Blender 4.1\4.1\python\bin\python.exe -m pip install f5-tts transformers torchaudio soundfile cached_path numpy torch torcheval | |
dependencies_loaded = False # Assume not loaded until imports succeed | |
try: | |
import numpy as np | |
import soundfile as sf | |
import torch | |
import torcheval # Required by newer f5-tts/torch builds? Or transformers? Added based on potential dependency chains. | |
import torchaudio | |
from cached_path import cached_path | |
# Check if torch is available with CUDA | |
if torch.cuda.is_available(): | |
# Get the CUDA device count | |
cuda_count = torch.cuda.device_count() | |
print(f"CUDA available. Found {cuda_count} device(s). Using device 0: {torch.cuda.get_device_name(0)}") | |
# Set the device to use (optional, f5-tts might handle this, but explicit is safer) | |
# torch.cuda.set_device(0) # Commented out, let f5-tts manage device internally if it does. | |
else: | |
print("CUDA not available. Synthesis will run on CPU, which may be slow.") | |
# Check if f5_tts is actually importable | |
from f5_tts.infer.utils_infer import ( | |
infer_process, | |
load_model, | |
load_vocoder, | |
preprocess_ref_audio_text, | |
remove_silence_for_generated_wav, | |
) | |
from f5_tts.model import DiT, UNetT | |
# We pass progress=None to infer_process, so tqdm is not needed by our code, | |
# but it might be an internal dependency of f5-tts or transformers. | |
# No need to import tqdm here unless f5-tts fails without the import specifically. | |
dependencies_loaded = True | |
except ImportError as e: | |
print("\n--------------------------------------------------") | |
print(f"WARNING: TTS dependencies not found or failed to import: {e}") | |
print("Please install required libraries in Blender's Python environment:") | |
print(" Example: <Blender Install Dir>/4.1/python/bin/python.exe -m pip install f5-tts transformers torchaudio soundfile cached_path numpy torch torcheval") # Added torcheval to install list | |
print("--------------------------------------------------\n") | |
# Define dummy functions/classes to prevent errors if imports fail | |
# The add-on might load, but synthesis won't work. | |
class DummyModule: | |
def __getattr__(self, name): | |
# Delay the error until synthesis is attempted | |
def dummy_func(*args, **kwargs): | |
raise RuntimeError(f"TTS dependency missing. Cannot access '{name}'. Install f5-tts, torch, etc.") | |
return dummy_func | |
np = DummyModule() | |
sf = DummyModule() | |
torch = DummyModule() | |
torch.cuda = DummyModule() # Ensure cuda access also raises error | |
torcheval = DummyModule() # Dummy for torcheval | |
torchaudio = DummyModule() | |
cached_path = DummyModule() | |
class DummyModel: pass | |
# Assign dummy functions/classes directly to the expected names | |
infer_process = DummyModule().infer_process | |
load_model = DummyModule().load_model | |
load_vocoder = DummyModule().load_vocoder | |
preprocess_ref_audio_text = DummyModule().preprocess_ref_audio_text | |
remove_silence_for_generated_wav = DummyModule().remove_silence_for_generated_wav | |
DiT = DummyModel | |
UNetT = DummyModel | |
dependencies_loaded = False # Ensure flag is False | |
# Blender imports | |
import bpy | |
from bpy.props import ( | |
StringProperty, | |
EnumProperty, | |
BoolProperty, | |
IntProperty, | |
FloatProperty, | |
PointerProperty, | |
) | |
from bpy.types import ( | |
Panel, | |
Operator, | |
PropertyGroup, | |
) | |
# --- Configuration and Model Loading --- | |
# Configuration for default F5-TTS model | |
DEFAULT_F5TTS_CFG = [ | |
"hf://SWivid/F5-TTS/F5TTS_v1_Base/model_1250000.safetensors", | |
"hf://SWivid/F5-TTS/F5TTS_v1_Base/vocab.txt", | |
json.dumps(dict(dim=1024, depth=22, heads=16, ff_mult=2, text_dim=512, conv_layers=4)), | |
] | |
# E2-TTS model config | |
E2TTS_CKPT_PATH = "hf://SWivid/E2-TTS/E2TTS_Base/model_1200000.safetensors" | |
E2TTS_MODEL_CFG = dict(dim=1024, depth=24, heads=16, ff_mult=4, text_mask_padding=False, pe_attn_head=1) | |
# Global variables to hold loaded models and vocoder for reuse | |
vocoder = None | |
F5TTS_ema_model = None | |
E2TTS_ema_model = None | |
custom_ema_model = None | |
pre_custom_path = "" # Simple state to avoid reloading same custom model immediately | |
# Flag to indicate if models are currently loading or synthesizing | |
# Defined globally at the top | |
# is_busy = False | |
# Function to get the correct model based on choice (loads if not already loaded) | |
def get_model(model_type: str, custom_config: tuple = None): | |
""" | |
Loads or retrieves the appropriate model based on type. | |
custom_config should be a tuple: (ckpt_path: str, vocab_path: str, model_cfg_json: str) | |
""" | |
global vocoder, F5TTS_ema_model, E2TTS_ema_model, custom_ema_model, pre_custom_path | |
if not dependencies_loaded: | |
raise RuntimeError("TTS dependencies are not loaded. Cannot load models.") | |
# Ensure vocoder is loaded | |
if vocoder is None: | |
print("Loading vocoder...") | |
try: | |
# load_vocoder should handle device placement internally | |
vocoder = load_vocoder() | |
print("Vocoder loaded successfully.") | |
except Exception as e: | |
raise RuntimeError(f"Vocoder failed to load: {e}") from e | |
model_type_lower = model_type.lower() | |
if model_type_lower == "f5-tts": | |
if F5TTS_ema_model is None: | |
print("Loading F5-TTS model...") | |
try: | |
# Convert cached_path result to string path | |
ckpt_path_str = str(cached_path(DEFAULT_F5TTS_CFG[0])) | |
# load_model should handle device placement | |
F5TTS_model_cfg_dict = json.loads(DEFAULT_F5TTS_CFG[2]) | |
F5TTS_ema_model = load_model(DiT, F5TTS_model_cfg_dict, ckpt_path_str) | |
print("F5-TTS model loaded.") | |
except Exception as e: | |
raise RuntimeError(f"Failed to load F5-TTS model: {e}") from e | |
return F5TTS_ema_model | |
elif model_type_lower == "e2-tts": | |
if E2TTS_ema_model is None: | |
print("Loading E2-TTS model...") | |
try: | |
# Convert cached_path result to string path | |
ckpt_path_str = str(cached_path(E2TTS_CKPT_PATH)) | |
# load_model should handle device placement | |
E2TTS_ema_model = load_model(UNetT, E2TTS_MODEL_CFG, ckpt_path_str) | |
print("E2-TTS model loaded.") | |
except Exception as e: | |
raise RuntimeError(f"Failed to load E2-TTS model: {e}") from e | |
return E2TTS_ema_model | |
elif model_type_lower == "custom": | |
if custom_config is None or len(custom_config) != 3: | |
raise ValueError("Custom model requires a config tuple (ckpt_path, vocab_path, model_cfg_json).") | |
ckpt_path_input, vocab_path_input, model_cfg_json_str = custom_config | |
ckpt_path_input = ckpt_path_input.strip() | |
vocab_path_input = vocab_path_input.strip() | |
model_cfg_json_str = model_cfg_json_str.strip() # Ensure JSON string is stripped | |
# Check if paths are empty | |
if not ckpt_path_input: | |
raise ValueError("Custom model checkpoint path cannot be empty.") | |
# Vocab path can be empty if not needed by the model | |
# Parse JSON config string first | |
model_cfg = None | |
try: | |
model_cfg = json.loads(model_cfg_json_str) | |
if not isinstance(model_cfg, dict): | |
raise ValueError("Custom model config JSON must parse to a dictionary.") | |
except json.JSONDecodeError as e: | |
raise ValueError(f"Invalid Custom model config JSON: {e}") from e | |
# Only reload if the checkpoint path has changed OR the model isn't loaded yet | |
if pre_custom_path != ckpt_path_input or custom_ema_model is None: | |
# Free up memory from previous custom model if different one is loaded | |
if custom_ema_model is not None: | |
print("Unloading previous custom model...") | |
try: | |
if hasattr(custom_ema_model, 'cpu'): # Check if it has a .cpu() method | |
custom_ema_model.cpu() | |
del custom_ema_model | |
custom_ema_model = None # Explicitly set to None | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
print("Previous custom model unloaded.") | |
except Exception as e: | |
print(f"Warning: Error during previous custom model unload: {e}") # Log warning, don't fail | |
try: | |
print(f"Loading Custom TTS model from {ckpt_path_input}...") | |
# Handle cached_path for custom paths as well, convert result to string | |
actual_ckpt_path_str = str(cached_path(ckpt_path_input)) if ckpt_path_input.startswith("hf://") else ckpt_path_input | |
# Convert result to string explicitly, handle empty/None case for vocab | |
actual_vocab_path_str = str(cached_path(vocab_path_input)) if vocab_path_input and vocab_path_input.startswith("hf://") else vocab_path_input if vocab_path_input else None # Pass None if vocab path is empty or "" | |
# Determine model class (simplified heuristic) - Default to DiT | |
model_class = DiT | |
# Check for keys common in UNetT based on provided E2 config and source examples | |
unet_keys = ["text_mask_padding", "pe_attn_head"] | |
dit_keys = ["conv_layers"] # Example | |
# Let's try to infer based on keys | |
if any(key in model_cfg for key in unet_keys): | |
print("Info: Custom model config contains UNetT-like keys. Attempting to load with UNetT class.") | |
model_class = UNetT | |
elif any(key in model_cfg for key in dit_keys): | |
print("Info: Custom model config contains DiT-like keys. Attempting to load with DiT class.") | |
model_class = DiT | |
else: | |
# Default if keys aren't clear, or if user specifies model_type=Custom but provides an E2 config | |
# Check if the *default* config provided looks like E2 | |
if model_cfg == E2TTS_MODEL_CFG or model_cfg.get("depth") == 24: # Rough check | |
print("Info: Custom model config is similar to E2-TTS. Attempting to load with UNetT class.") | |
model_class = UNetT | |
else: | |
print("Info: Custom model config structure not clearly DiT or UNetT. Defaulting to DiT class.") | |
model_class = DiT # Default fallback | |
# Pass string paths and the determined class | |
custom_ema_model = load_model(model_class, model_cfg, actual_ckpt_path_str, vocab_file=actual_vocab_path_str) | |
pre_custom_path = ckpt_path_input # Update state | |
print("Custom TTS model loaded.") | |
except Exception as e: | |
# Reset state if loading fails | |
custom_ema_model = None | |
pre_custom_path = "" | |
# Ensure previous state is properly cleaned if loading failed mid-way? | |
# This is hard. Rely on next load attempt to clean previous failed state. | |
raise RuntimeError(f"Failed to load custom model from {ckpt_path_input}: {e}") from e | |
# Check if the loaded custom model matches the class inferred from the config | |
# This adds robustness - if the user provides an E2 config but a DiT checkpoint, | |
# load_model might succeed but the model class might be different than expected. | |
# Let's just return the model; the synthesis step will likely fail if mismatch occurs. | |
# Or, we could raise a warning here. | |
# E.g., if isinstance(custom_ema_model, DiT) and model_class is UNetT: print warning. | |
# Decided against this complexity for now. Trust load_model. | |
return custom_ema_model | |
else: | |
# This case should ideally be caught by the EnumProperty default/validation | |
raise ValueError(f"Internal Error: Unknown model type: {model_type}. Choose from 'F5-TTS', 'E2-TTS', 'Custom'.") | |
# --- Core Synthesis Function (Blocking) --- | |
def synthesize_speech( | |
ref_audio_path: str, | |
gen_text: str, | |
output_audio_path: str, | |
ref_text: str = "", | |
model_type: str = "F5-TTS", | |
custom_model_config: tuple = None, | |
remove_silence: bool = False, | |
seed: int = -1, | |
cross_fade_duration: float = 0.15, | |
nfe_step: int = 32, | |
speed: float = 1.0, | |
# Progress reporting callback (used by the background thread) | |
progress_callback=None, # Now takes value only | |
# Status reporting callback (used by the background thread) | |
status_callback=None, # Now takes msg, icon | |
# Completion callback (used by the background thread to signal main thread) | |
completion_callback=None # Takes result, exception | |
): | |
""" | |
Synthesizes speech from text using a reference audio clip and saves it to a file. | |
This function is blocking and should be run in a separate thread. | |
Args: | |
ref_audio_path (str): Absolute path to the reference audio file. | |
gen_text (str): Text to generate speech from. | |
output_audio_path (str): Absolute path to save the output audio file. | |
ref_text (str, optional): Transcript for reference audio. Defaults to "". | |
model_type (str, optional): Model type ('F5-TTS', 'E2-TTS', 'Custom'). Defaults to "F5-TTS". | |
custom_model_config (tuple, optional): (ckpt_path, vocab_path, model_cfg_json_str) for Custom model. Defaults to None. | |
remove_silence (bool, optional): Attempt to remove silence. Defaults to False. | |
seed (int, optional): Seed for reproducibility. Defaults to -1 (random). | |
cross_fade_duration (float, optional): Cross-fade duration. Defaults to 0.15. | |
nfe_step (int, optional): Denoising steps. Defaults to 32. | |
speed (float, optional): Speech speed multiplier. Defaults to 1.0. | |
progress_callback (callable, optional): Callback for progress updates (0.0 to 1.0). Defaults to None. | |
status_callback (callable, optional): Callback for status updates (msg, icon). Defaults to None. | |
completion_callback (callable, optional): Callback to signal completion/error. Takes (result, exception). Defaults to None. | |
Returns: | |
tuple: (output_audio_path, ref_text_used, used_seed) on success, | |
None on failure. | |
""" | |
# Use the provided callbacks for UI updates and completion signaling | |
_status_callback = status_callback if status_callback else print # Fallback to print | |
_progress_callback = progress_callback if progress_callback else (lambda val: None) # Fallback to no-op | |
_completion_callback = completion_callback if completion_callback else (lambda res, exc: None) # Fallback to no-op | |
result = None | |
exception = None | |
try: | |
_status_callback("Starting synthesis thread...", icon='INFO') | |
if not dependencies_loaded: | |
raise RuntimeError("TTS dependencies are not loaded. Cannot run synthesis.") | |
if remove_silence: | |
print("Warning: remove_silence is enabled. This feature may sometimes cut off the very end of the generated audio.") | |
print("If the last syllable is clipped, try setting remove_silence=False.") | |
# Check required inputs (already done in operator, but double-check basic ones) | |
if not ref_audio_path or not os.path.exists(ref_audio_path): | |
raise ValueError(f"Reference audio file not found at {ref_audio_path}") | |
gen_text_stripped = gen_text.strip() | |
if not gen_text_stripped: | |
raise ValueError("Text to generate is empty.") | |
# Set inference seed | |
used_seed = seed if seed != -1 else np.random.randint(0, 2**31 - 1) | |
# Validate seed range even if randomized | |
if not 0 <= used_seed <= 2**31 - 1: | |
print(f"Warning: Seed {used_seed} out of valid range 0 ~ 2147483647. Using random seed instead.") | |
used_seed = np.random.randint(0, 2**31 - 1) | |
try: | |
torch.manual_seed(used_seed) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed(used_seed) | |
torch.cuda.manual_seed_all(used_seed) # For multiple GPUs | |
print(f"Using seed: {used_seed}") | |
except Exception as e: | |
print(f"Warning: Could not set torch manual seed {used_seed}: {e}") | |
# Continue without setting seed, will be less reproducible | |
# Load the selected model (blocking, happens in the thread) | |
ema_model = None | |
_status_callback(f"Loading model: {model_type}...", icon='COLOR_GREEN') # Use a different icon while loading | |
ema_model = get_model(model_type, custom_model_config) | |
if ema_model is None: # Should be caught by get_model, but being safe | |
raise RuntimeError(f"Failed to load or get model '{model_type}'.") | |
_status_callback("Model loaded. Preprocessing audio...", icon='FILE_SOUND') | |
# Preprocess reference audio and text (blocking, happens in the thread) | |
ref_audio_processed = None | |
ref_text_used = ref_text.strip() # Use stripped ref text | |
# Note: It might transcribe if ref_text_used is empty. This requires transformers/whisper. | |
_status_callback("Preprocessing reference audio...", icon='FILE_SOUND') | |
ref_audio_processed, ref_text_used = preprocess_ref_audio_text( | |
ref_audio_path, # Pass the path string | |
ref_text_used, # Pass the stripped ref text | |
show_info=print, # Use print instead of gr.Info | |
) | |
print(f"Reference text used by preprocess: \"{ref_text_used}\"") | |
_status_callback("Preprocessing complete. Starting inference...", icon='PLAY') | |
# Perform inference (blocking, happens in the thread) | |
print(f"Starting inference for text: \"{gen_text_stripped[:min(len(gen_text_stripped), 70)]}...\"") # Improved print | |
final_wave = None | |
final_sample_rate = None | |
# Report start of inference (after preprocessing) | |
_progress_callback(0.1) # Arbitrary start progress after preproc | |
# infer_process returns (wave, sr, info) | |
# IMPORTANT FIX: Pass progress=None to disable internal tqdm progress that caused AttributeError | |
final_wave, final_sample_rate, _ = infer_process( | |
ref_audio_processed, # Pass the processed tuple from preprocess | |
ref_text_used, # Pass the potentially auto-transcribed text from preprocess | |
gen_text_stripped, # Pass the stripped generation text | |
ema_model, | |
vocoder, # vocoder is loaded globally, accessed here | |
cross_fade_duration=cross_fade_duration, | |
nfe_step=nfe_step, | |
speed=speed, | |
show_info=print, | |
progress=None, # <--- FIX: Pass None to disable f5-tts internal tqdm progress | |
# local_progress_callback is NOT passed here, progress is updated manually by stage | |
) | |
print("Inference complete.") | |
_progress_callback(0.9) # Arbitrary end progress for inference stage | |
# Remove silence (blocking, happens in the thread) | |
# Check if final_wave is valid before attempting silence removal | |
if remove_silence and final_wave is not None and len(final_wave) > 0: | |
_status_callback("Attempting to remove silence...", icon='FILE_REFRESH') | |
tmp_wav_path = None | |
try: | |
# Use a more robust way to ensure the temp file exists and is closed before remove_silence_for_generated_wav opens it | |
tmp_fd, tmp_wav_path = tempfile.mkstemp(suffix=".wav") | |
os.close(tmp_fd) # Close the file descriptor immediately | |
sf.write(tmp_wav_path, final_wave, final_sample_rate) | |
remove_silence_for_generated_wav(tmp_wav_path) | |
# Reload the potentially modified audio from the temporary file | |
loaded_audio, loaded_sr = torchaudio.load(tmp_wav_path) | |
final_wave = loaded_audio.squeeze().cpu().numpy() # Ensure 1D numpy array | |
final_sample_rate = loaded_sr # Update sample rate if it changed (unlikely but safe) | |
print("Silence removal successful.") | |
_progress_callback(0.95) # Arbitrary progress after silence removal stage | |
except Exception as e: | |
_status_callback(f"Warning during silence removal: {e}", icon='WARNING') # Use warning icon? | |
print(f"Error during silence removal: {e}") # Print to console | |
traceback.print_exc() | |
# Continue with the original wave if silence removal fails | |
finally: | |
# Ensure temp file is removed even if silence removal fails | |
if tmp_wav_path and os.path.exists(tmp_wav_path): | |
try: | |
os.remove(tmp_wav_path) | |
# print(f"Cleaned up temporary file: {tmp_wav_path}") # Optional: verbose cleanup log | |
except OSError as e: | |
print(f"Warning: Could not remove temporary file {tmp_wav_path}: {e}") | |
# Save the final audio (blocking, happens in the thread) | |
# Check if final_wave is valid before attempting to save | |
if final_wave is not None and len(final_wave) > 0: | |
try: | |
output_dir = os.path.dirname(output_audio_path) | |
if output_dir and not os.path.exists(output_dir): | |
try: | |
os.makedirs(output_dir) | |
print(f"Created output directory: {output_dir}") | |
except OSError as e: | |
# Handle case where directory creation fails (e.g., permissions) | |
raise OSError(f"Error creating output directory {output_dir}: {e}") # Raise exception to be caught below | |
# sf.write expects numpy array, ensure correct dtype | |
sf.write(output_audio_path, final_wave.astype(np.float32), final_sample_rate) | |
print(f"Synthesized audio saved to {output_audio_path}") | |
_progress_callback(1.0) # Report final progress stage | |
result = (output_audio_path, ref_text_used, used_seed) # Set result tuple | |
except Exception as e: | |
# Catch save errors or directory creation errors | |
exception = e # Store exception | |
print(f"Error saving output audio to {output_audio_path}: {e}") # Print to console | |
traceback.print_exc() | |
else: | |
# No audio data to save | |
exception = RuntimeError("Synthesis failed, no audio data generated.") | |
print("Synthesis failed, no audio data to save.") | |
except Exception as e: | |
# Catch any exception raised during model loading, preprocessing, inference, or silence removal | |
exception = e # Store the exception object | |
print(f"Exception caught in synthesis thread: {e}") | |
traceback.print_exc() # Print detailed traceback | |
finally: | |
# --- Thread is finished --- | |
print("Synthesis thread finally block entered.") | |
# Signal completion (success or failure) via callback | |
_completion_callback(result, exception) | |
# --- Blender Add-on UI and Operator --- | |
# Property Group to hold TTS settings | |
class F5TTSSettings(PropertyGroup): | |
ref_audio_path: StringProperty( | |
name="Reference Audio", | |
description="Path to the reference audio file (.wav, .mp3, etc.)", | |
subtype='FILE_PATH', | |
) | |
gen_text: StringProperty( | |
name="Text to Generate", | |
description="The text to convert to speech", | |
subtype='NONE', # Corrected subtype | |
# default="Enter text here...", # Avoid default text | |
) | |
ref_text: StringProperty( | |
name="Reference Text (Optional)", | |
description="Optional transcript for the reference audio. Leave empty for auto-transcription.", | |
subtype='NONE', # Corrected subtype | |
) | |
model_type: EnumProperty( | |
name="Model", | |
items=[ | |
('F5-TTS', "F5-TTS", "Use the F5-TTS model"), | |
('E2-TTS', "E2-TTS", "Use the E2-TTS model"), | |
('Custom', "Custom", "Use a custom model"), | |
], | |
default='F5-TTS', | |
description="Choose the TTS model to use", | |
) | |
custom_ckpt_path: StringProperty( | |
name="Custom Checkpoint", | |
description="Path or HuggingFace ID for the custom model checkpoint (e.g., hf://user/repo/model.safetensors)", | |
subtype='FILE_PATH', # Use FILE_PATH for file browser functionality | |
# default=DEFAULT_F5TTS_CFG[0], # Maybe set a sensible default or leave empty | |
) | |
custom_vocab_path: StringProperty( | |
name="Custom Vocab (Optional)", | |
description="Path or HuggingFace ID for the custom model vocabulary file (e.g., hf://user/repo/vocab.txt)", | |
subtype='FILE_PATH', # Still FILE_PATH for file browser | |
# default=DEFAULT_F5TTS_CFG[1], # Maybe set a sensible default or leave empty | |
) | |
custom_model_cfg_json: StringProperty( | |
name="Custom Config (JSON)", | |
description="Model configuration as a JSON string (e.g., {\"dim\": 1024, ...})", | |
subtype='NONE', # Corrected subtype | |
default=DEFAULT_F5TTS_CFG[2], # Use default F5 config as example | |
) | |
remove_silence: BoolProperty( | |
name="Remove Silences", | |
description="If enabled, automatically remove leading/trailing silence. *Warning: May sometimes clip the very end of audio.*", | |
default=False, # Changed default based on clipping issue | |
) | |
randomize_seed: BoolProperty( | |
name="Randomize Seed", | |
description="If enabled, use a random seed for each synthesis. If disabled, use the Seed value below.", | |
default=True, | |
) | |
seed: IntProperty( | |
name="Seed", | |
description="Seed for reproducible generation (used if Randomize Seed is off)", | |
default=0, | |
min=0, | |
max=2**31 - 1, # Max value for signed 32-bit integer | |
step=1, | |
) | |
cross_fade_duration: FloatProperty( | |
name="Cross-Fade (s)", | |
description="Duration of cross-fade between internal audio segments (if applicable)", | |
default=0.15, | |
min=0.0, | |
max=1.0, | |
step=0.01, # Step of 0.01 | |
precision=2, # 2 decimal places | |
) | |
nfe_step: IntProperty( | |
name="NFE Steps", | |
description="Number of denoising steps for the diffusion process", | |
default=32, | |
min=4, # Reasonable minimum | |
max=64, # Reasonable maximum, higher takes longer | |
step=1, | |
) | |
speed: FloatProperty( | |
name="Speed", | |
description="Adjust the speed of the generated speech", | |
default=1.0, | |
min=0.3, # Sane minimum | |
max=2.0, # Sane maximum | |
step=0.1, # Step of 0.1 | |
precision=2, # 2 decimal places | |
) | |
output_dir: StringProperty( | |
name="Output Directory", | |
description="Directory to save the synthesized audio file(s)", | |
subtype='DIR_PATH', | |
default="//generated_audio/", # Use relative path by default | |
) | |
last_output_file: StringProperty( # Store path of last generated file | |
name="Last Output", | |
description="Path to the last generated audio file", | |
subtype='FILE_PATH', # Use FILE_PATH subtype | |
) | |
synthesis_progress: FloatProperty( | |
name="Progress", | |
description="Synthesis Progress (0.0 to 1.0)", | |
default=0.0, | |
min=0.0, | |
max=1.0, | |
subtype='FACTOR', # Use FACTOR subtype for 0-1 range slider | |
options={'ANIMATABLE'}, # Can be animated, though we just update it | |
# No 'HIDDEN' option here, control visibility in draw | |
) | |
# Add a message property to display errors/info in the UI | |
status_message: StringProperty( | |
name="Status", | |
description="Current status or error message", | |
default="", | |
options={'SKIP_SAVE'}, # Don't save this with the blend file | |
) | |
# Operator to perform the synthesis in a background thread | |
class SEQUENCER_OT_synthesize_tts(Operator): | |
bl_idname = "sequencer.synthesize_tts" | |
bl_label = "Synthesize Speech" | |
bl_description = "Synthesize speech using F5/E2 TTS model from text and reference audio (Runs blocking in thread)" | |
# Operator is NOT modal | |
# bl_options = {'REGISTER'} # REGISTER is default | |
# Class variables for managing the background task state | |
_thread = None # Reference to the background thread | |
_settings_bpy_pointer = None # Store pointer to settings property group for thread updates | |
# Class variable to hold the result from the thread | |
# Accessed and processed ONLY on the main thread | |
_thread_result = None # Stores (output_path, ref_text_used, used_seed) or None | |
_thread_exception = None # Stores Exception object or None | |
_thread_completed = False # Flag to indicate thread has finished and result is ready | |
# Timer for checking the thread completion state periodically | |
_completion_timer = None | |
@staticmethod | |
def update_progress_static(value, settings_pointer): | |
"""Thread-safe UI progress update.""" | |
# This is still technically unsafe access, but common practice for simple properties | |
# Needs to run on main thread ideally, but updating property often works | |
if settings_pointer: | |
try: | |
settings_pointer.synthesis_progress = value | |
# No redraw tag from thread | |
except Exception as e: | |
print(f"Warning: Failed to update UI progress property from thread: {e}") | |
@staticmethod | |
def update_status_static(msg, icon, settings_pointer): | |
"""Thread-safe UI status update.""" | |
# This is still technically unsafe access, but common practice for simple properties | |
# Needs to run on main thread ideally, but updating property often works | |
if settings_pointer: | |
try: | |
settings_pointer.status_message = msg | |
# No redraw tag from thread | |
except Exception as e: | |
print(f"Warning: Failed to update Blender status property from thread: {e}") | |
print(f"Status (Thread): {msg}") # Always print to console | |
@staticmethod | |
def check_completion_timer_callback(): | |
""" | |
Timer callback run on the main thread to check if the background thread is done. | |
""" | |
# Access class variables directly | |
if SEQUENCER_OT_synthesize_tts._thread_completed: | |
# Thread has signaled completion, stop the timer | |
if SEQUENCER_OT_synthesize_tts._completion_timer: | |
try: | |
bpy.app.timers.unregister(SEQUENCER_OT_synthesize_tts._completion_timer) | |
print("Unregistered completion timer.") | |
except ValueError: | |
pass # Timer might have unregistered itself | |
SEQUENCER_OT_synthesize_tts._completion_timer = None | |
# Process the result/exception from the thread (safely on main thread) | |
result = SEQUENCER_OT_synthesize_tts._thread_result | |
exception = SEQUENCER_OT_synthesize_tts._thread_exception | |
settings = SEQUENCER_OT_synthesize_tts._settings_bpy_pointer # Get pointer to settings | |
# Reset busy flag and clear state variables FIRST | |
# global is_busy | |
# is_busy = False | |
SEQUENCER_OT_synthesize_tts._thread = None | |
SEQUENCER_OT_synthesize_tts._thread_result = None | |
SEQUENCER_OT_synthesize_tts._thread_exception = None | |
SEQUENCER_OT_synthesize_tts._thread_completed = False | |
SEQUENCER_OT_synthesize_tts._settings_bpy_pointer = None # Clear the pointer | |
# Now process the result and update UI/VSE | |
if exception: | |
# Synthesis failed in the thread | |
error_msg = f"Synthesis failed: {exception}" | |
# We can't use self.report from a static method/timer | |
print(f"ERROR: {error_msg}") # Log error to console | |
if settings: | |
# Status message should have been set by the thread, ensure progress is 0 | |
settings.synthesis_progress = 0.0 | |
settings.status_message = settings.status_message or f"Error: {error_msg}" # Ensure status is set | |
elif result: | |
# Synthesis succeeded | |
output_path, ref_text_used, used_seed = result | |
print(f"Synthesis thread completed successfully. Output: {output_path}") | |
# Add sound strip to VSE timeline (must be on main thread) | |
# Get context dynamically - required for VSE access in timer callback | |
# WARNING: Accessing bpy.context dynamically can be tricky/unreliable | |
# if the VSE is not the active context when the timer fires. | |
# A more robust solution would pass context to the timer callback | |
# if timers supported args, but they don't. Let's try dynamic context. | |
try: | |
scene = bpy.context.scene | |
if scene and scene.sequence_editor: | |
# Convert relative path from settings to absolute path for sequence editor | |
filepath_abs = bpy.path.abspath(output_path) | |
if os.path.exists(filepath_abs): | |
# Determine a suitable channel and frame | |
channel = 1 | |
start_frame = scene.frame_current | |
used_channels = set() | |
for s in scene.sequence_editor.sequences: | |
if s.frame_final_end > start_frame and s.frame_start < (start_frame + 1): | |
used_channels.add(s.channel) | |
while channel in used_channels: | |
channel += 1 | |
sound_strip = scene.sequence_editor.sequences.new_sound( | |
name=os.path.basename(output_path), | |
filepath=filepath_abs, | |
channel=channel, | |
frame_start=start_frame | |
) | |
info_msg = f"Added '{os.path.basename(output_path)}' to VSE at frame {start_frame}, channel {channel}." | |
print(f"INFO: {info_msg}") # Log info to console | |
if settings: | |
# Status message set by thread, just ensure success state | |
settings.synthesis_progress = 1.0 | |
settings.last_output_file = output_path | |
settings.status_message = settings.status_message or info_msg | |
else: | |
error_msg_file = f"Synthesis reported success, but output file not found: {filepath_abs}" | |
print(f"ERROR: {error_msg_file}") | |
if settings: | |
settings.synthesis_progress = 0.0 | |
settings.status_message = f"Error: {error_msg_file}" | |
elif settings: # Handle case where VSE/scene not accessible or doesn't exist | |
info_msg = f"Synthesis complete. Output saved to {filepath_abs}. Could not add to VSE (VSE not active?)." | |
print(f"INFO: {info_msg}") | |
settings.synthesis_progress = 1.0 | |
settings.last_output_file = output_path | |
settings.status_message = settings.status_message or info_msg | |
# Request a redraw manually if not in VSE context? Still unsafe from static. | |
# bpy.context.area.tag_redraw() # Unsafe | |
except Exception as vse_e: | |
error_msg_vse = f"Synthesis complete, but failed to add sound strip: {vse_e}" | |
print(f"ERROR: {error_msg_vse}") | |
traceback.print_exc() | |
if settings: | |
settings.status_message = f"Error: {error_msg_vse}" | |
settings.synthesis_progress = 0.0 # Reset progress on this error | |
else: | |
# Should not happen if thread always sets result or exception | |
unknown_error_msg = "Synthesis thread finished without result or error reported." | |
print(f"ERROR: {unknown_error_msg}") | |
if settings: | |
settings.status_message = settings.status_message or f"Error: {unknown_error_msg}" | |
settings.synthesis_progress = 0.0 | |
# Return None to stop the timer (it's already unregistered, but good practice) | |
return None | |
else: | |
# Thread is not yet completed, keep the timer running | |
# Return the interval for the timer to repeat | |
return 0.05 # Repeat check every 0.05 seconds | |
def execute(self, context): | |
# This runs on the main thread when the button is clicked | |
# FIX: Place global declaration FIRST in the function body | |
# global is_busy # Declare global state variable immediately at the top | |
# if is_busy: # This is the first intended *use* of is_busy in the function | |
# self.report({'WARNING'}, "Synthesis is already running.") | |
# context.scene.f5tts_settings.status_message = "Synthesis already running." | |
# return {'CANCELLED'} | |
settings = context.scene.f5tts_settings | |
# Check if dependencies are loaded before starting | |
if not dependencies_loaded: | |
error_msg = "TTS dependencies are not loaded. Please check console." | |
self.report({'ERROR'}, error_msg) | |
settings.status_message = error_msg | |
return {'CANCELLED'} | |
# Validate inputs before starting the thread | |
ref_audio_path_input = settings.ref_audio_path.strip() | |
gen_text_input = settings.gen_text.strip() | |
output_dir_input = settings.output_dir.strip() | |
ref_text_input = settings.ref_text.strip() | |
model_type = settings.model_type | |
remove_silence = settings.remove_silence | |
seed = settings.seed | |
randomize_seed = settings.randomize_seed | |
cross_fade_duration = settings.cross_fade_duration | |
nfe_step = settings.nfe_step | |
speed = settings.speed | |
# Convert paths from UI (potential relative paths like //) to absolute paths early | |
ref_audio_path_abs = bpy.path.abspath(ref_audio_path_input) | |
output_dir_abs = bpy.path.abspath(output_dir_input) | |
if not ref_audio_path_abs or not os.path.exists(ref_audio_path_abs): | |
error_msg = f"Reference audio file not found or invalid: {ref_audio_path_input}" | |
self.report({'ERROR'}, error_msg) | |
settings.status_message = error_msg | |
return {'CANCELLED'} | |
if not gen_text_input: # Check stripped text | |
error_msg = "Text to Generate cannot be empty." | |
self.report({'ERROR'}, error_msg) | |
settings.status_message = error_msg | |
return {'CANCELLED'} | |
custom_model_config = None | |
if model_type == 'Custom': | |
custom_ckpt_path = settings.custom_ckpt_path.strip() | |
custom_vocab_path = settings.custom_vocab_path.strip() | |
custom_model_cfg_json_str = settings.custom_model_cfg_json.strip() | |
custom_model_config = (custom_ckpt_path, custom_vocab_path, custom_model_cfg_json_str) | |
# Basic validation for custom model paths/config | |
if not custom_ckpt_path: | |
error_msg = "Custom model requires a checkpoint path." | |
self.report({'ERROR'}, error_msg) | |
settings.status_message = error_msg | |
return {'CANCELLED'} | |
# Check JSON config format | |
try: | |
json.loads(custom_model_cfg_json_str) | |
except json.JSONDecodeError as e: | |
error_msg = f"Invalid Custom Config JSON: {e}" | |
self.report({'ERROR'}, error_msg) | |
settings.status_message = error_msg | |
return {'CANCELLED'} | |
except Exception as e: # Catch other potential JSON issues | |
error_msg = f"Error parsing Custom Config JSON: {e}" | |
self.report({'ERROR'}, error_msg) | |
settings.status_message = error_msg | |
return {'CANCELLED'} | |
# Generate a unique output filename | |
base_filename = f"tts_{model_type.lower().replace('-', '_')}" | |
# Use blend file name if available, fallback to 'untitled' | |
blend_file_name = os.path.splitext(os.path.basename(bpy.data.filepath))[0] | |
if not blend_file_name: | |
blend_file_name = "untitled" | |
# Correct way to use datetime for a robust unique timestamp | |
# datetime.now() returns a datetime object | |
# .strftime('%Y%m%d%H%M%S%f') formats it as YearMonthDayHourMinuteSecondMicrosecond | |
# Combine frame number and microsecond timestamp | |
timestamp = f"{blend_file_name}_{int(context.scene.frame_current)}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}" | |
output_filename = f"{base_filename}_{timestamp}.wav" | |
# Use absolute path for the output file name passed to the thread | |
output_audio_path_abs = os.path.join(output_dir_abs, output_filename) | |
# Determine the seed to use | |
final_seed = -1 if randomize_seed else seed | |
# Reset completion state variables on the main thread | |
SEQUENCER_OT_synthesize_tts._thread_result = None | |
SEQUENCER_OT_synthesize_tts._thread_exception = None | |
SEQUENCER_OT_synthesize_tts._thread_completed = False | |
# Store pointer to settings for thread-safe updates | |
# Accessing context.scene from execute is safe, store the pointer in class variable | |
# This pointer will be used by the static update methods called from the thread | |
SEQUENCER_OT_synthesize_tts._settings_bpy_pointer = settings # Pass the actual property group | |
# Set busy flag and UI status on the main thread | |
# is_busy = True # This write to is_busy is after the global declaration | |
settings.synthesis_progress = 0.0 # Initialize progress bar | |
settings.status_message = "Starting synthesis thread..." # Initial status | |
# Define the thread target function as a closure inside execute | |
# This captures variables from execute's scope needed by the thread | |
def thread_target_wrapper(): | |
result = None | |
exception = None | |
# Define thread-safe status update function *inside* the thread wrapper | |
# so it captures the settings pointer class variable | |
def thread_status_callback(msg, icon='INFO'): | |
# Call the static method, passing the message, icon, and the stored pointer | |
SEQUENCER_OT_synthesize_tts.update_status_static(msg, icon, SEQUENCER_OT_synthesize_tts._settings_bpy_pointer) | |
# Define thread-safe progress update function *inside* the thread wrapper | |
# so it captures the settings pointer class variable | |
def thread_progress_callback(value): | |
# Call the static method, passing the value and the stored pointer | |
SEQUENCER_OT_synthesize_tts.update_progress_static(value, SEQUENCER_OT_synthesize_tts._settings_bpy_pointer) | |
# Define thread-safe completion signal function *inside* the thread wrapper | |
# so it captures the class variables to update | |
def thread_completion_signal(res, exc): | |
# These are class variables, accessed directly without 'self' | |
SEQUENCER_OT_synthesize_tts._thread_result = res | |
SEQUENCER_OT_synthesize_tts._thread_exception = exc | |
SEQUENCER_OT_synthesize_tts._thread_completed = True | |
# Final UI updates (progress/status) are handled by the main thread timer now | |
try: | |
# Call synthesize_speech, passing the captured status and progress functions | |
# The synthesize_speech function uses these callbacks instead of accessing bpy directly | |
result = synthesize_speech( | |
ref_audio_path=ref_audio_path_abs, # Pass absolute paths | |
gen_text=gen_text_input, # Pass stripped text | |
output_audio_path=output_audio_path_abs, # Pass absolute output path | |
ref_text=ref_text_input, # Pass stripped ref text | |
model_type=model_type, | |
custom_model_config=custom_model_config, | |
remove_silence=remove_silence, | |
seed=final_seed, | |
cross_fade_duration=cross_fade_duration, | |
nfe_step=nfe_step, | |
speed=speed, | |
progress_callback=thread_progress_callback, # Pass captured progress updater | |
status_callback=thread_status_callback, # Pass captured status updater | |
# Pass the completion signal callback | |
completion_callback=thread_completion_signal | |
) | |
# If synthesize_speech returns None, it should have set exception internally | |
# The exception check happens in the finally block now | |
except Exception as e: | |
# Catch any unexpected exception from the synthesize_speech function itself | |
exception = e # Store the exception object | |
# Update status via callback before printing full traceback | |
thread_status_callback(f"Error in thread: {e}", icon='ERROR') | |
print(f"Exception caught in synthesis thread: {e}") | |
traceback.print_exc() # Print detailed traceback | |
finally: | |
# --- Thread is finished --- | |
print("Synthesis thread finally block entered.") | |
# Signal completion (success or failure) using the completion callback | |
# The completion callback updates class variables to be read by the timer | |
# Ensure callbacks are called even if an exception occurred or result is None | |
thread_completion_signal(result, exception) | |
# Start the synthesis in a separate thread | |
print("Starting TTS synthesis thread...") | |
# Store thread reference (optional, mainly for unregister/debug) | |
SEQUENCER_OT_synthesize_tts._thread = threading.Thread(target=thread_target_wrapper) | |
SEQUENCER_OT_synthesize_tts._thread.daemon = True # Allow Blender to exit even if thread is running (risky but prevents hang) | |
SEQUENCER_OT_synthesize_tts._thread.start() | |
# Start the completion timer to check the thread state periodically | |
# FIX: Use interval=... explicitly, and register the static check_completion_timer_callback | |
# try: | |
# SEQUENCER_OT_synthesize_tts._completion_timer = bpy.app.timers.register( | |
# SEQUENCER_OT_synthesize_tts.check_completion_timer_callback, | |
# interval=0.05 # Check every 0.05 seconds | |
# ) | |
# info_msg = "Synthesis started in background. Waiting for completion..." | |
# self.report({'INFO'}, info_msg) | |
# # Initial status message will be set by the thread immediately via pointer | |
# # Operator returns RUNNING_MODAL to keep the modal active, driven by the timer | |
# return {'RUNNING_MODAL'} # Return RUNNING_MODAL to keep the operator alive | |
# except Exception as timer_e: | |
# # If timer registration itself fails | |
# print(f"Error registering modal timer: {timer_e}") | |
# traceback.print_exc() | |
## global is_busy | |
## is_busy = False # Reset busy flag on main thread immediately | |
# # Update status on main thread directly if timer failed | |
# settings.status_message = f"Error starting timer: {timer_e}" | |
# self.report({'ERROR'}, f"Failed to start synthesis timer: {timer_e}") | |
# # Also need to handle the thread that might have just started | |
# if SEQUENCER_OT_synthesize_tts._thread and SEQUENCER_OT_synthesize_tts._thread.is_alive(): | |
# print("Warning: Background thread started but completion timer failed to register. Thread will run but VSE strip won't be added automatically.") | |
# # The thread will still finish and update UI properties via static methods, | |
# # but automatic VSE strip addition won't happen. | |
# # The thread should still signal completion via thread_completion_signal | |
# # which updates _thread_completed, but nothing is checking that flag now. | |
# # We should clear the thread reference here so the operator doesn't think it's modal | |
# SEQUENCER_OT_synthesize_tts._thread = None # Clear reference | |
# SEQUENCER_OT_synthesize_tts._settings_bpy_pointer = None # Clear pointer as modal isn't running | |
# # Return FINISHED or CANCELLED if timer failed to indicate operator is done | |
# return {'CANCELLED'} # Return CANCELLED as the desired behavior couldn't start | |
def cancel(self, context): | |
# This is called if the user cancels the modal operator (e.g., press ESC) | |
print("TTS Synthesis cancelled by user.") | |
# Unregister the timer if it's running | |
if SEQUENCER_OT_synthesize_tts._completion_timer: | |
try: | |
bpy.app.timers.unregister(SEQUENCER_OT_synthesize_tts._completion_timer) | |
print("Unregistered completion timer.") | |
except ValueError: # Timer might have just finished on its own | |
pass | |
SEQUENCER_OT_synthesize_tts._completion_timer = None | |
# Note: We cannot reliably stop the background thread once started in Python. | |
# The user might need to restart Blender if it hangs after cancel due to thread issue. | |
if SEQUENCER_OT_synthesize_tts._thread and SEQUENCER_OT_synthesize_tts._thread.is_alive(): | |
print("Warning: Cannot stop the background synthesis thread. It will continue running.") | |
# We could try to signal cancellation to the thread if synthesize_speech checked a flag, | |
# but currently, it doesn't. | |
# The thread will still finish and call thread_completion_signal. | |
# The modal is cancelling, so the completion logic won't run in the modal. | |
# We should set the completion flag here so the timer callback doesn't potentially run later with stale data. | |
# This access is on main thread, safe for class variables. | |
SEQUENCER_OT_synthesize_tts._thread_completed = True | |
# Store a cancellation exception that will be seen by the thread's finally block, | |
# but also ensure it's seen by the main thread if it happens to process the queue item. | |
# The modal is cancelling, so we process the state directly. | |
pass # No safe way to inject cancellation exception into thread's result/exception flow | |
# Clean up state variables on main thread | |
SEQUENCER_OT_synthesize_tts._thread = None | |
SEQUENCER_OT_synthesize_tts._thread_result = None | |
SEQUENCER_OT_synthesize_tts._thread_exception = None | |
SEQUENCER_OT_synthesize_tts._thread_completed = False # Ensure this is clean | |
SEQUENCER_OT_synthesize_tts._settings_bpy_pointer = None # Clear pointer | |
# global is_busy | |
# is_busy = False # Reset busy flag on main thread | |
settings = context.scene.f5tts_settings | |
settings.synthesis_progress = 0.0 # Reset progress | |
settings.status_message = "Synthesis cancelled." | |
self.report({'CANCELLED'}, "Synthesis operation cancelled.") | |
# Do not try to join the thread here, that would block the main thread during cancellation! | |
# VSE Sidebar Panel | |
class SEQUENCER_PT_f5tts_sidebar(Panel): | |
bl_label = "F5/E2 TTS Synthesis" | |
bl_idname = "SEQUENCER_PT_f5tts_sidebar" | |
bl_space_type = 'SEQUENCE_EDITOR' # Corrected attribute name | |
bl_region_type = 'UI' | |
bl_category = 'F5/E2 TTS' # Creates a new tab in the sidebar | |
def draw(self, context): | |
layout = self.layout | |
settings = context.scene.f5tts_settings | |
# global is_busy, #dependencies_loaded # Use global is_busy for draw logic | |
# Check if dependencies are loaded at draw time | |
if not dependencies_loaded: | |
layout.label(text="Dependencies Missing!", icon='ERROR') | |
layout.label(text="Check console for install instructions.") | |
layout.separator() # Add a visual separator | |
# Optional: Add a button to view the console | |
layout.operator("wm.console_toggle", text="Toggle System Console", icon='CONSOLE') | |
return # Stop drawing the rest of the panel | |
layout.use_property_split = True | |
layout.use_property_decorate = False # No animation dots | |
# Input Section | |
box = layout.box() | |
box.label(text="Inputs", icon='SOUND') | |
box.prop(settings, "ref_audio_path") | |
# textarea=True is NOT a valid argument for layout.prop(). Removed. | |
box.prop(settings, "gen_text", text="Text", icon='TEXT') | |
box.prop(settings, "ref_text", text="Ref Text (Optional)") | |
# Model Section | |
box = layout.box() | |
box.label(text="Model", icon='FILE_FONT') | |
box.prop(settings, "model_type", text="Type") | |
# Show custom model options if type is 'Custom' | |
if settings.model_type == 'Custom': | |
box.prop(settings, "custom_ckpt_path", text="Checkpoint") | |
box.prop(settings, "custom_vocab_path", text="Vocab") | |
# textarea=True is NOT a valid argument for layout.prop(). Removed. | |
box.prop(settings, "custom_model_cfg_json", text="Config (JSON)") | |
# Add a simple hint for the detected model class from JSON | |
row = box.row() | |
row.label(text="Inferred Class:") | |
try: | |
# Safely attempt to parse JSON for the hint | |
cfg_dict = json.loads(settings.custom_model_cfg_json) | |
unet_keys = ["text_mask_padding", "pe_attn_head"] # Keys common in UNetT/E2 configs | |
dit_keys = ["conv_layers"] # Keys common in DiT/F5 configs | |
inferred_class = "Unknown" | |
icon = 'INFO' | |
if any(key in cfg_dict for key in unet_keys): | |
inferred_class = "UNetT (likely E2-style)" | |
# Check if it also has DiT keys, might be a mixed config? | |
if any(key in cfg_dict for key in dit_keys): | |
inferred_class += " (has DiT keys too)" | |
icon = 'WARNING' # Ambiguous config | |
elif any(key in cfg_dict for key in dit_keys): | |
inferred_class = "DiT (likely F5-style)" | |
else: | |
inferred_class = "Unclear from keys (defaulting to DiT in loader)" | |
icon = 'QUESTION' | |
row.label(text=inferred_class, icon=icon) | |
except json.JSONDecodeError: | |
row.label(text="Invalid JSON Config", icon='ERROR') | |
except Exception: # Catch other potential errors (e.g., cfg_dict not dict) | |
row.label(text="Error processing config", icon='ERROR') | |
# Settings Section | |
box = layout.box() | |
box.label(text="Settings", icon='SETTINGS') | |
box.prop(settings, "remove_silence") | |
row = box.row(align=True) | |
row.prop(settings, "randomize_seed") | |
# row.enabled = not is_busy # Keep enabled, user can change settings while busy is false | |
if not settings.randomize_seed: | |
row.prop(settings, "seed", text="Seed") | |
box.prop(settings, "speed") | |
box.prop(settings, "nfe_step") | |
box.prop(settings, "cross_fade_duration") | |
box.prop(settings, "output_dir") | |
if settings.last_output_file: | |
box.prop(settings, "last_output_file", icon='FILE_SOUND') | |
# Synthesis Button | |
row = layout.row() | |
# Change text/icon based on busy state | |
# Button text changes when busy | |
op = row.operator(SEQUENCER_OT_synthesize_tts.bl_idname, | |
text="Synthesize", | |
icon='PLAY') | |
# Button is enabled ONLY if NOT busy AND inputs are set | |
is_inputs_set = bool(settings.ref_audio_path.strip() and settings.gen_text.strip()) | |
if settings.model_type == 'Custom' and not settings.custom_ckpt_path.strip(): | |
is_inputs_set = False # Custom needs ckpt path | |
row.enabled = is_inputs_set #and not is_busy # <-- CORRECTED ENABLED LOGIC | |
# Progress Bar (visible when busy or recent) | |
# Show if busy OR if progress is > 0 and < 1 (i.e., recently started/failed/cancelled) | |
# Progress bar will likely not update visually smoothly, but will show stages/final state | |
# if is_busy or (settings.synthesis_progress > 0.0 and settings.synthesis_progress < 1.0): | |
# row = layout.row(align=True) # Align label and progress | |
# row.label(text="Progress:") | |
# # Use 'slider=True' to show the slider handle as well | |
# row.prop(settings, "synthesis_progress", text="", slider=True) | |
# Optional: Show final progress (1.0) briefly? Or just let status message handle success. | |
# if settings.synthesis_progress >= 1.0 and not is_busy: layout.label(text="Done!", icon='CHECKMARK') | |
# Status Message | |
if settings.status_message: | |
# Determine icon based on message content | |
icon = 'INFO' | |
# Simple keyword matching for icons | |
if 'Error' in settings.status_message or 'failed' in settings.status_message.lower(): | |
icon = 'ERROR' | |
elif 'Warning' in settings.status_message: | |
icon = 'WARNING' | |
elif 'complete' in settings.status_message.lower() or 'successful' in settings.status_message.lower() or settings.synthesis_progress >= 1.0: # Check >= 1.0 for success | |
icon = 'CHECKMARK' | |
elif 'Loading' in settings.status_message or 'Starting' in settings.status_message or 'Preprocessing' in settings.status_message or 'Inference' in settings.status_message or 'Removing' in settings.status_message: | |
icon = 'FILE_REFRESH' # Indicate ongoing process | |
else: | |
icon = 'INFO' | |
layout.label(text=settings.status_message, icon=icon) | |
# --- Registration --- | |
classes = ( | |
F5TTSSettings, | |
SEQUENCER_OT_synthesize_tts, | |
SEQUENCER_PT_f5tts_sidebar, | |
) | |
def register(): | |
from bpy.utils import register_class | |
# Ensure dependencies are checked/loaded before registering classes | |
# The import block already does this when the script is loaded. | |
# We might add a check here to prevent registration if dependencies are missing, | |
# but allowing the UI to show the warning is usually preferred. | |
for cls in classes: | |
register_class(cls) | |
# Register the property group on the Scene | |
# Use key 'f5tts_settings' | |
# Check if it already exists (e.g., during reload) | |
if not hasattr(bpy.types.Scene, 'f5tts_settings'): | |
bpy.types.Scene.f5tts_settings = PointerProperty(type=F5TTSSettings) | |
else: | |
# Handle potential conflicts during reload, though Blender usually manages this | |
pass # Or print a warning/debug message | |
print("F5/E2 TTS Add-on Registered.") | |
def unregister(): | |
from bpy.utils import unregister_class | |
# Attempt to clean up thread reference and busy flag | |
#global is_busy | |
# if is_busy: | |
# print("Warning: Unregistering while synthesis is busy. Thread may still be running.") | |
# # Cannot reliably stop the thread. | |
# Clear thread reference and state variables | |
SEQUENCER_OT_synthesize_tts._thread = None | |
SEQUENCER_OT_synthesize_tts._thread_result = None | |
SEQUENCER_OT_synthesize_tts._thread_exception = None | |
SEQUENCER_OT_synthesize_tts._thread_completed = False | |
SEQUENCER_OT_synthesize_tts._settings_bpy_pointer = None | |
# Stop the completion timer if it's running | |
if SEQUENCER_OT_synthesize_tts._completion_timer: | |
try: | |
bpy.app.timers.unregister(SEQUENCER_OT_synthesize_tts._completion_timer) | |
print("Unregistered completion timer during unregister.") | |
except ValueError: | |
pass # Timer might have already finished | |
SEQUENCER_OT_synthesize_tts._completion_timer = None | |
# Reset global busy flag | |
# is_busy = False # Ensure it's False on unregister | |
# Unregister the property group | |
# Check if it exists before deleting | |
if hasattr(bpy.types.Scene, 'f5tts_settings'): | |
# Ensure no references are held if possible - complex for pointer properties | |
# Simple deletion often suffices but can be fragile on reload | |
del bpy.types.Scene.f5tts_settings | |
for cls in reversed(classes): | |
unregister_class(cls) | |
# Attempt to clean up loaded models - IMPORTANT FOR MEMORY, ESPECIALLY GPU | |
# Do this *after* unregistering classes, as properties might hold references. | |
global vocoder, F5TTS_ema_model, E2TTS_ema_model, custom_ema_model, pre_custom_path | |
print("Attempting to clean up TTS models...") | |
try: | |
# Move tensors to CPU or delete them explicitly | |
if vocoder is not None: | |
try: | |
if hasattr(vocoder, 'cpu'): # Check if it has a .cpu() method (PyTorch modules do) | |
vocoder.cpu() | |
del vocoder | |
vocoder = None | |
print("Vocoder cleaned up.") | |
except Exception as e: | |
print(f"Warning: Error cleaning up vocoder: {e}") | |
if F5TTS_ema_model is not None: | |
try: | |
if hasattr(F5TTS_ema_model, 'cpu'): | |
F5TTS_ema_model.cpu() | |
del F5TTS_ema_model | |
F5TTS_ema_model = None | |
print("F5-TTS model cleaned up.") | |
except Exception as e: | |
print(f"Warning: Error cleaning up F5-TTS model: {e}") | |
if E2TTS_ema_model is not None: | |
try: | |
if hasattr(E2TTS_ema_model, 'cpu'): | |
E2TTS_ema_model.cpu() | |
del E2TTS_ema_model | |
E2TTS_ema_model = None | |
print("E2-TTS model cleaned up.") | |
except Exception as e: | |
print(f"Warning: Error cleaning up E2-TTS model: {e}") | |
if custom_ema_model is not None: | |
try: | |
if hasattr(custom_ema_model, 'cpu'): | |
custom_ema_model.cpu() | |
del custom_ema_model | |
custom_ema_model = None | |
pre_custom_path = "" # Clear custom path state | |
print("Custom model cleaned up.") | |
except Exception as e: | |
print(f"Warning: Error cleaning up custom model: {e}") | |
# Force garbage collection and clear CUDA cache if possible | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
print("CUDA cache emptied.") | |
print("TTS models cleanup process finished.") | |
except Exception as e: | |
# Catch any remaining unexpected errors during cleanup | |
print(f"Warning: Unexpected error during TTS model cleanup: {e}") | |
print("F5/E2 TTS Add-on Unregistered.") | |
if __name__ == "__main__": | |
# This block is for testing the add-on registration outside Blender | |
# In Blender, register() and unregister() are called automatically. | |
# print("Attempting to register F5/E2 TTS add-on...") | |
# register() | |
# print("Registration attempt finished.") | |
# # Example of how you might trigger the operator from the console for testing: | |
# # bpy.ops.sequencer.synthesize_tts() | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment