Spaces:
Sleeping
Sleeping
| """Models module for Kokoro TTS Local""" | |
| from typing import Optional, Tuple, List | |
| import torch | |
| from kokoro import KPipeline | |
| import os | |
| import json | |
| import codecs | |
| from pathlib import Path | |
| import numpy as np | |
| import shutil | |
| import threading | |
| # Set environment variables for proper encoding | |
| os.environ["PYTHONIOENCODING"] = "utf-8" | |
| # Disable symlinks warning | |
| os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" | |
| # Setup for safer monkey-patching | |
| import atexit | |
| import signal | |
| import sys | |
| # Track whether patches have been applied | |
| _patches_applied = { | |
| 'json_load': False, | |
| 'load_voice': False | |
| } | |
| def _cleanup_monkey_patches(): | |
| """Restore original functions that were monkey-patched""" | |
| try: | |
| if _patches_applied['json_load'] and _original_json_load is not None: | |
| restore_json_load() | |
| _patches_applied['json_load'] = False | |
| print("Restored original json.load function") | |
| except Exception as e: | |
| print(f"Warning: Error restoring json.load: {e}") | |
| try: | |
| if _patches_applied['load_voice']: | |
| restore_original_load_voice() | |
| _patches_applied['load_voice'] = False | |
| print("Restored original KPipeline.load_voice function") | |
| except Exception as e: | |
| print(f"Warning: Error restoring KPipeline.load_voice: {e}") | |
| # Register cleanup for normal exit | |
| atexit.register(_cleanup_monkey_patches) | |
| # Register cleanup for signals | |
| for sig in [signal.SIGINT, signal.SIGTERM]: | |
| try: | |
| signal.signal(sig, lambda signum, frame: ( | |
| print(f"\nReceived signal {signum}, cleaning up..."), | |
| _cleanup_monkey_patches(), | |
| sys.exit(1) | |
| )) | |
| except (ValueError, AttributeError): | |
| # Some signals might not be available on all platforms | |
| pass | |
| # List of available voice files (54 voices across 8 languages) | |
| VOICE_FILES = [ | |
| # American English Female voices (11 voices) | |
| "af_heart.pt", "af_alloy.pt", "af_aoede.pt", "af_bella.pt", "af_jessica.pt", | |
| "af_kore.pt", "af_nicole.pt", "af_nova.pt", "af_river.pt", "af_sarah.pt", "af_sky.pt", | |
| # American English Male voices (9 voices) | |
| "am_adam.pt", "am_echo.pt", "am_eric.pt", "am_fenrir.pt", "am_liam.pt", | |
| "am_michael.pt", "am_onyx.pt", "am_puck.pt", "am_santa.pt", | |
| # British English Female voices (4 voices) | |
| "bf_alice.pt", "bf_emma.pt", "bf_isabella.pt", "bf_lily.pt", | |
| # British English Male voices (4 voices) | |
| "bm_daniel.pt", "bm_fable.pt", "bm_george.pt", "bm_lewis.pt", | |
| # Japanese voices (5 voices) | |
| "jf_alpha.pt", "jf_gongitsune.pt", "jf_nezumi.pt", "jf_tebukuro.pt", "jm_kumo.pt", | |
| # Mandarin Chinese voices (8 voices) | |
| "zf_xiaobei.pt", "zf_xiaoni.pt", "zf_xiaoxiao.pt", "zf_xiaoyi.pt", | |
| "zm_yunjian.pt", "zm_yunxi.pt", "zm_yunxia.pt", "zm_yunyang.pt", | |
| # Spanish voices (3 voices) | |
| "ef_dora.pt", "em_alex.pt", "em_santa.pt", | |
| # French voices (1 voice) | |
| "ff_siwis.pt", | |
| # Hindi voices (4 voices) | |
| "hf_alpha.pt", "hf_beta.pt", "hm_omega.pt", "hm_psi.pt", | |
| # Italian voices (2 voices) | |
| "if_sara.pt", "im_nicola.pt", | |
| # Brazilian Portuguese voices (3 voices) | |
| "pf_dora.pt", "pm_alex.pt", "pm_santa.pt" | |
| ] | |
| # Language code mapping for different languages | |
| LANGUAGE_CODES = { | |
| 'a': 'American English', | |
| 'b': 'British English', | |
| 'j': 'Japanese', | |
| 'z': 'Mandarin Chinese', | |
| 'e': 'Spanish', | |
| 'f': 'French', | |
| 'h': 'Hindi', | |
| 'i': 'Italian', | |
| 'p': 'Brazilian Portuguese' | |
| } | |
| # Patch KPipeline's load_voice method to use weights_only=False | |
| original_load_voice = KPipeline.load_voice | |
| def patched_load_voice(self, voice_path): | |
| """Load voice model with weights_only=False for compatibility""" | |
| if not os.path.exists(voice_path): | |
| raise FileNotFoundError(f"Voice file not found: {voice_path}") | |
| voice_name = Path(voice_path).stem | |
| try: | |
| voice_model = torch.load(voice_path, weights_only=False) | |
| if voice_model is None: | |
| raise ValueError(f"Failed to load voice model from {voice_path}") | |
| # Ensure device is set | |
| if not hasattr(self, 'device'): | |
| self.device = 'cpu' | |
| # Move model to device and store in voices dictionary | |
| self.voices[voice_name] = voice_model.to(self.device) | |
| return self.voices[voice_name] | |
| except Exception as e: | |
| print(f"Error loading voice {voice_name}: {e}") | |
| raise | |
| # Apply the patch | |
| KPipeline.load_voice = patched_load_voice | |
| _patches_applied['load_voice'] = True | |
| # Store original function for restoration if needed | |
| def restore_original_load_voice(): | |
| global _patches_applied | |
| if _patches_applied['load_voice']: | |
| KPipeline.load_voice = original_load_voice | |
| _patches_applied['load_voice'] = False | |
| def patch_json_load(): | |
| """Patch json.load to handle UTF-8 encoded files with special characters""" | |
| global _patches_applied, _original_json_load | |
| original_load = json.load | |
| _original_json_load = original_load # Store for restoration | |
| def custom_load(fp, *args, **kwargs): | |
| try: | |
| # Try reading with UTF-8 encoding | |
| if hasattr(fp, 'buffer'): | |
| content = fp.buffer.read().decode('utf-8') | |
| else: | |
| content = fp.read() | |
| try: | |
| return json.loads(content) | |
| except json.JSONDecodeError as e: | |
| print(f"JSON parsing error: {e}") | |
| raise | |
| except UnicodeDecodeError: | |
| # If UTF-8 fails, try with utf-8-sig for files with BOM | |
| fp.seek(0) | |
| content = fp.read() | |
| if isinstance(content, bytes): | |
| content = content.decode('utf-8-sig', errors='replace') | |
| try: | |
| return json.loads(content) | |
| except json.JSONDecodeError as e: | |
| print(f"JSON parsing error: {e}") | |
| raise | |
| json.load = custom_load | |
| _patches_applied['json_load'] = True | |
| return original_load # Return original for restoration | |
| # Store the original load function for potential restoration | |
| _original_json_load = None | |
| def restore_json_load(): | |
| """Restore the original json.load function""" | |
| global _original_json_load, _patches_applied | |
| if _original_json_load is not None and _patches_applied['json_load']: | |
| json.load = _original_json_load | |
| _original_json_load = None | |
| _patches_applied['json_load'] = False | |
| def load_config(config_path: str) -> dict: | |
| """Load configuration file with proper encoding handling""" | |
| try: | |
| with codecs.open(config_path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except UnicodeDecodeError: | |
| # Fallback to utf-8-sig if regular utf-8 fails | |
| with codecs.open(config_path, 'r', encoding='utf-8-sig') as f: | |
| return json.load(f) | |
| # Initialize espeak-ng | |
| phonemizer_available = False # Global flag to track if phonemizer is working | |
| try: | |
| from phonemizer.backend.espeak.wrapper import EspeakWrapper | |
| from phonemizer import phonemize | |
| import espeakng_loader | |
| # Make library available first | |
| library_path = espeakng_loader.get_library_path() | |
| data_path = espeakng_loader.get_data_path() | |
| espeakng_loader.make_library_available() | |
| # Set up espeak-ng paths | |
| EspeakWrapper.library_path = library_path | |
| EspeakWrapper.data_path = data_path | |
| # Verify espeak-ng is working | |
| try: | |
| test_phonemes = phonemize('test', language='en-us') | |
| if test_phonemes: | |
| phonemizer_available = True | |
| print("Phonemizer successfully initialized") | |
| else: | |
| print("Note: Phonemization returned empty result") | |
| print("TTS will work, but phoneme visualization will be disabled") | |
| except Exception as e: | |
| # Continue without espeak functionality | |
| print(f"Note: Phonemizer not available: {e}") | |
| print("TTS will work, but phoneme visualization will be disabled") | |
| except ImportError as e: | |
| print(f"Note: Phonemizer packages not installed: {e}") | |
| print("TTS will work, but phoneme visualization will be disabled") | |
| # Rather than automatically installing packages, inform the user | |
| print("If you want phoneme visualization, manually install required packages:") | |
| print("pip install espeakng-loader phonemizer-fork") | |
| # Initialize pipeline globally with thread safety | |
| _pipeline = None | |
| _pipeline_lock = threading.RLock() # Reentrant lock for thread safety | |
| def download_voice_files(voice_files=None, repo_version="main", required_count=1): | |
| """Download voice files from Hugging Face. | |
| Args: | |
| voice_files: Optional list of voice files to download. If None, download all VOICE_FILES. | |
| repo_version: Version/tag of the repository to use (default: "main") | |
| required_count: Minimum number of voices required (default: 1) | |
| Returns: | |
| List of successfully downloaded voice files | |
| Raises: | |
| ValueError: If fewer than required_count voices could be downloaded | |
| """ | |
| # Use absolute path for voices directory | |
| voices_dir = Path(os.path.abspath("voices")) | |
| voices_dir.mkdir(exist_ok=True) | |
| # Import here to avoid startup dependency | |
| from huggingface_hub import hf_hub_download | |
| downloaded_voices = [] | |
| failed_voices = [] | |
| # If specific voice files are requested, use those. Otherwise use all. | |
| files_to_download = voice_files if voice_files is not None else VOICE_FILES | |
| total_files = len(files_to_download) | |
| print(f"\nDownloading voice files... ({total_files} total files)") | |
| # Check for existing voice files first | |
| existing_files = [] | |
| for voice_file in files_to_download: | |
| voice_path = voices_dir / voice_file | |
| if voice_path.exists(): | |
| print(f"Voice file {voice_file} already exists") | |
| downloaded_voices.append(voice_file) | |
| existing_files.append(voice_file) | |
| # Remove existing files from the download list | |
| files_to_download = [f for f in files_to_download if f not in existing_files] | |
| if not files_to_download and downloaded_voices: | |
| print(f"All required voice files already exist ({len(downloaded_voices)} files)") | |
| return downloaded_voices | |
| # Proceed with downloading missing files | |
| retry_count = 3 | |
| try: | |
| import tempfile | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| for voice_file in files_to_download: | |
| # Full path where the voice file should be | |
| voice_path = voices_dir / voice_file | |
| # Try with retries | |
| for attempt in range(retry_count): | |
| try: | |
| print(f"Downloading {voice_file}... (attempt {attempt+1}/{retry_count})") | |
| # Download to a temporary location first | |
| temp_path = hf_hub_download( | |
| repo_id="hexgrad/Kokoro-82M", | |
| filename=f"voices/{voice_file}", | |
| local_dir=temp_dir, | |
| force_download=True, | |
| revision=repo_version | |
| ) | |
| # Move the file to the correct location | |
| os.makedirs(os.path.dirname(str(voice_path)), exist_ok=True) | |
| shutil.copy2(temp_path, str(voice_path)) # Use copy2 instead of move | |
| # Verify file integrity | |
| if os.path.getsize(str(voice_path)) > 0: | |
| downloaded_voices.append(voice_file) | |
| print(f"Successfully downloaded {voice_file}") | |
| break # Success, exit retry loop | |
| else: | |
| print(f"Warning: Downloaded file {voice_file} has zero size, retrying...") | |
| os.remove(str(voice_path)) # Remove invalid file | |
| if attempt == retry_count - 1: | |
| failed_voices.append(voice_file) | |
| except (IOError, OSError, ValueError, FileNotFoundError, ConnectionError) as e: | |
| print(f"Warning: Failed to download {voice_file} (attempt {attempt+1}): {e}") | |
| if attempt == retry_count - 1: | |
| failed_voices.append(voice_file) | |
| print(f"Error: Failed all {retry_count} attempts to download {voice_file}") | |
| except Exception as e: | |
| print(f"Error during voice download process: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Report results | |
| if failed_voices: | |
| print(f"Warning: Failed to download {len(failed_voices)} voice files: {', '.join(failed_voices)}") | |
| if not downloaded_voices: | |
| error_msg = "No voice files could be downloaded. Please check your internet connection." | |
| print(f"Error: {error_msg}") | |
| raise ValueError(error_msg) | |
| elif len(downloaded_voices) < required_count: | |
| error_msg = f"Only {len(downloaded_voices)} voice files could be downloaded, but {required_count} were required." | |
| print(f"Error: {error_msg}") | |
| raise ValueError(error_msg) | |
| else: | |
| print(f"Successfully processed {len(downloaded_voices)} voice files") | |
| return downloaded_voices | |
| def build_model(model_path: str, device: str, repo_version: str = "main") -> KPipeline: | |
| """Build and return the Kokoro pipeline with proper encoding configuration | |
| Args: | |
| model_path: Path to the model file or None to use default | |
| device: Device to use ('cuda' or 'cpu') | |
| repo_version: Version/tag of the repository to use (default: "main") | |
| Returns: | |
| Initialized KPipeline instance | |
| """ | |
| global _pipeline, _pipeline_lock | |
| # Use a lock for thread safety | |
| with _pipeline_lock: | |
| # Double-check pattern to avoid race conditions | |
| if _pipeline is not None: | |
| return _pipeline | |
| try: | |
| # Patch json loading before initializing pipeline | |
| patch_json_load() | |
| # Download model if it doesn't exist | |
| if model_path is None: | |
| model_path = 'kokoro-v1_0.pth' | |
| model_path = os.path.abspath(model_path) | |
| if not os.path.exists(model_path): | |
| print(f"Downloading model file {model_path}...") | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| model_path = hf_hub_download( | |
| repo_id="hexgrad/Kokoro-82M", | |
| filename="kokoro-v1_0.pth", | |
| local_dir=".", | |
| force_download=True, | |
| revision=repo_version | |
| ) | |
| print(f"Model downloaded to {model_path}") | |
| except Exception as e: | |
| print(f"Error downloading model: {e}") | |
| raise ValueError(f"Could not download model: {e}") from e | |
| # Download config if it doesn't exist | |
| config_path = os.path.abspath("config.json") | |
| if not os.path.exists(config_path): | |
| print("Downloading config file...") | |
| try: | |
| config_path = hf_hub_download( | |
| repo_id="hexgrad/Kokoro-82M", | |
| filename="config.json", | |
| local_dir=".", | |
| force_download=True, | |
| revision=repo_version | |
| ) | |
| print(f"Config downloaded to {config_path}") | |
| except Exception as e: | |
| print(f"Error downloading config: {e}") | |
| raise ValueError(f"Could not download config: {e}") from e | |
| # Download voice files - require at least one voice | |
| try: | |
| downloaded_voices = download_voice_files(repo_version=repo_version, required_count=1) | |
| except ValueError as e: | |
| print(f"Error: Voice files download failed: {e}") | |
| raise ValueError("Voice files download failed") from e | |
| # Validate language code | |
| lang_code = 'a' # Default to 'a' for American English | |
| supported_codes = list(LANGUAGE_CODES.keys()) | |
| if lang_code not in supported_codes: | |
| print(f"Warning: Unsupported language code '{lang_code}'. Using 'a' (American English).") | |
| print(f"Supported language codes: {', '.join(supported_codes)}") | |
| lang_code = 'a' | |
| # Initialize pipeline with validated language code | |
| pipeline_instance = KPipeline(lang_code=lang_code) | |
| if pipeline_instance is None: | |
| raise ValueError("Failed to initialize KPipeline - pipeline is None") | |
| # Store device parameter for reference in other operations | |
| pipeline_instance.device = device | |
| # Initialize voices dictionary if it doesn't exist | |
| if not hasattr(pipeline_instance, 'voices'): | |
| pipeline_instance.voices = {} | |
| # Try to load the first available voice with improved error handling | |
| voice_loaded = False | |
| for voice_file in downloaded_voices: | |
| voice_path = os.path.abspath(os.path.join("voices", voice_file)) | |
| if os.path.exists(voice_path): | |
| try: | |
| pipeline_instance.load_voice(voice_path) | |
| print(f"Successfully loaded voice: {voice_file}") | |
| voice_loaded = True | |
| break # Successfully loaded a voice | |
| except Exception as e: | |
| print(f"Warning: Failed to load voice {voice_file}: {e}") | |
| continue | |
| if not voice_loaded: | |
| print("Warning: Could not load any voice models") | |
| # Set the global _pipeline only after successful initialization | |
| _pipeline = pipeline_instance | |
| except Exception as e: | |
| print(f"Error initializing pipeline: {e}") | |
| # Restore original json.load on error | |
| restore_json_load() | |
| raise | |
| return _pipeline | |
| def list_available_voices() -> List[str]: | |
| """List all available voice models""" | |
| # Always use absolute path for consistency | |
| voices_dir = Path(os.path.abspath("voices")) | |
| # Create voices directory if it doesn't exist | |
| if not voices_dir.exists(): | |
| print(f"Creating voices directory at {voices_dir}") | |
| voices_dir.mkdir(exist_ok=True) | |
| return [] | |
| # Get all .pt files in the voices directory | |
| voice_files = list(voices_dir.glob("*.pt")) | |
| # If we found voice files, return them | |
| if voice_files: | |
| return [f.stem for f in sorted(voice_files, key=lambda f: f.stem.lower())] | |
| # If no voice files in standard location, check if we need to do a one-time migration | |
| # This is legacy support for older installations | |
| alt_voices_path = Path(".") / "voices" | |
| if alt_voices_path.exists() and alt_voices_path.is_dir() and alt_voices_path != voices_dir: | |
| print(f"Checking alternative voice location: {alt_voices_path.absolute()}") | |
| alt_voice_files = list(alt_voices_path.glob("*.pt")) | |
| if alt_voice_files: | |
| print(f"Found {len(alt_voice_files)} voice files in alternate location") | |
| print("Moving files to the standard voices directory...") | |
| # Process files in a batch for efficiency | |
| files_moved = 0 | |
| for voice_file in alt_voice_files: | |
| target_path = voices_dir / voice_file.name | |
| if not target_path.exists(): | |
| try: | |
| # Use copy2 to preserve metadata, then remove original if successful | |
| shutil.copy2(str(voice_file), str(target_path)) | |
| files_moved += 1 | |
| except (OSError, IOError) as e: | |
| print(f"Error copying {voice_file.name}: {e}") | |
| if files_moved > 0: | |
| print(f"Successfully moved {files_moved} voice files") | |
| return [f.stem for f in sorted(voices_dir.glob("*.pt"), key=lambda f: f.stem.lower())] | |
| print("No voice files found. Please run the application again to download voices.") | |
| return [] | |
| def get_language_code_from_voice(voice_name: str) -> str: | |
| """Get the appropriate language code from a voice name | |
| Args: | |
| voice_name: Name of the voice (e.g., 'af_bella', 'jf_alpha') | |
| Returns: | |
| Language code for the voice | |
| """ | |
| # Extract prefix from voice name | |
| prefix = voice_name[:2] if len(voice_name) >= 2 else 'af' | |
| # Map voice prefixes to language codes | |
| prefix_to_lang = { | |
| 'af': 'a', 'am': 'a', # American English | |
| 'bf': 'b', 'bm': 'b', # British English | |
| 'jf': 'j', 'jm': 'j', # Japanese | |
| 'zf': 'z', 'zm': 'z', # Mandarin Chinese | |
| 'ef': 'e', 'em': 'e', # Spanish | |
| 'ff': 'f', 'fm': 'f', # French | |
| 'hf': 'h', 'hm': 'h', # Hindi | |
| 'if': 'i', 'im': 'i', # Italian | |
| 'pf': 'p', 'pm': 'p', # Brazilian Portuguese | |
| } | |
| return prefix_to_lang.get(prefix, 'a') # Default to American English | |
| def load_voice(voice_name: str, device: str) -> torch.Tensor: | |
| """Load a voice model in a thread-safe manner | |
| Args: | |
| voice_name: Name of the voice to load (with or without .pt extension) | |
| device: Device to use ('cuda' or 'cpu') | |
| Returns: | |
| Loaded voice model tensor | |
| Raises: | |
| ValueError: If voice file not found or loading fails | |
| """ | |
| pipeline = build_model(None, device) | |
| # Format voice path correctly - strip .pt if it was included | |
| voice_name = voice_name.replace('.pt', '') | |
| voice_path = os.path.abspath(os.path.join("voices", f"{voice_name}.pt")) | |
| if not os.path.exists(voice_path): | |
| raise ValueError(f"Voice file not found: {voice_path}") | |
| # Use a lock to ensure thread safety when loading voices | |
| with _pipeline_lock: | |
| # Check if voice is already loaded | |
| if hasattr(pipeline, 'voices') and voice_name in pipeline.voices: | |
| return pipeline.voices[voice_name] | |
| # Load voice if not already loaded | |
| return pipeline.load_voice(voice_path) | |
| def generate_speech( | |
| model: KPipeline, | |
| text: str, | |
| voice: str, | |
| lang: str = 'a', | |
| device: str = 'cpu', | |
| speed: float = 1.0 | |
| ) -> Tuple[Optional[torch.Tensor], Optional[str]]: | |
| """Generate speech using the Kokoro pipeline in a thread-safe manner | |
| Args: | |
| model: KPipeline instance | |
| text: Text to synthesize | |
| voice: Voice name (e.g. 'af_bella') | |
| lang: Language code ('a' for American English, 'b' for British English) | |
| device: Device to use ('cuda' or 'cpu') | |
| speed: Speech speed multiplier (default: 1.0) | |
| Returns: | |
| Tuple of (audio tensor, phonemes string) or (None, None) on error | |
| """ | |
| global _pipeline_lock | |
| try: | |
| if model is None: | |
| raise ValueError("Model is None - pipeline not properly initialized") | |
| # Format voice name and path | |
| voice_name = voice.replace('.pt', '') | |
| voice_path = os.path.abspath(os.path.join("voices", f"{voice_name}.pt")) | |
| # Check if voice file exists | |
| if not os.path.exists(voice_path): | |
| raise ValueError(f"Voice file not found: {voice_path}") | |
| # Thread-safe initialization of model properties and voice loading | |
| with _pipeline_lock: | |
| # Initialize voices dictionary if it doesn't exist | |
| if not hasattr(model, 'voices'): | |
| model.voices = {} | |
| # Ensure device is set | |
| if not hasattr(model, 'device'): | |
| model.device = device | |
| # Ensure voice is loaded before generating | |
| if voice_name not in model.voices: | |
| print(f"Loading voice {voice_name}...") | |
| try: | |
| model.load_voice(voice_path) | |
| if voice_name not in model.voices: | |
| raise ValueError("Voice load succeeded but voice not in model.voices dictionary") | |
| except Exception as e: | |
| raise ValueError(f"Failed to load voice {voice_name}: {e}") | |
| # Generate speech (outside the lock for better concurrency) | |
| print(f"Generating speech with device: {model.device}") | |
| generator = model( | |
| text, | |
| voice=voice_path, | |
| speed=speed, | |
| split_pattern=r'\n+' | |
| ) | |
| # Get first generated segment and convert numpy array to tensor if needed | |
| for gs, ps, audio in generator: | |
| if audio is not None: | |
| if isinstance(audio, np.ndarray): | |
| audio = torch.from_numpy(audio).float() | |
| return audio, ps | |
| return None, None | |
| except (ValueError, FileNotFoundError, RuntimeError, KeyError, AttributeError, TypeError) as e: | |
| print(f"Error generating speech: {e}") | |
| return None, None | |
| except Exception as e: | |
| print(f"Unexpected error during speech generation: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, None |