本文将详细介绍如何在 Orion6 开发板上完成 NeuTTSAir 的本地部署,涵盖模型量化、依赖配置、代码适配及推理测试等关键步骤。
Neutts-air简介
NeuTTSAir 是 Neuphonic 于 2025 年推出的轻量级端侧文本到语音(TTS)系统,采用“语言模型 + 自研编解码器”的混合架构,能够在仅需 3 秒参考音频的条件下实现高保真语音克隆。该系统基于 Qwen2.5-0.5B 轻量化语言模型,并以 GGUF 格式发布,支持在资源受限设备(如手机、树莓派、Orion6 等)上完全离线运行,兼顾实时性、隐私安全与音质表现。
模型转换
NeuTTSAir 官方已提供 BF16 精度的 GGUF 模型,可直接用于 llama.cpp 推理框架。为适配边缘设备的内存与计算限制,我们进一步将其量化为 Q4_0 格式。
首先使用git lfs命令,从modelscope或huggingface上下载模型
# Make sure git-lfs is installed (https://git-lfs.com)
git lfs install
git clone https://huggingface.co/neuphonic/neutts-air安装编译工具
sudo apt install cmake gcc g++ libcurl4-openssl-dev
之后克隆并编译 llama.cpp
git clone https://github.com/ggml-org/llama.cpp.git
cd llama.cpp
cmake -B build
cmake --build build --config Release
随后,使用 llama-quantize 工具对模型进行量化:
cd build/bin
./llama-quantize /public/home/scntb3z6kw/SothisAI/model/ExternalSource/neutts-air/main/neutts-air/neutss-air-BF16.gguf ../../../../models/neutss-air-BF16.gguf Q4_0模型部署
首先将相关代码克隆至本地
git clone https://github.com/neuphonic/neutts-air.git
git clone https://github.com/neuphonic/neucodec.git官方实现默认从 Hugging Face 自动下载模型。为支持完全离线运行,我们可以修改如下代码以支持加载本地模型路径。
neutts-air下的neutts.py
from typing import Generator
from pathlib import Path
import librosa
import numpy as np
import torch
import re
import perth
from neucodec import NeuCodec, DistillNeuCodec
from phonemizer.backend import EspeakBackend
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from threading import Thread
def _linear_overlap_add(frames: list[np.ndarray], stride: int) -> np.ndarray:
# original impl --> https://github.com/facebookresearch/encodec/blob/main/encodec/utils.py
assert len(frames)
dtype = frames[0].dtype
shape = frames[0].shape[:-1]
total_size = 0
for i, frame in enumerate(frames):
frame_end = stride * i + frame.shape[-1]
total_size = max(total_size, frame_end)
sum_weight = np.zeros(total_size, dtype=dtype)
out = np.zeros(*shape, total_size, dtype=dtype)
offset: int = 0
for frame in frames:
frame_length = frame.shape[-1]
t = np.linspace(0, 1, frame_length + 2, dtype=dtype)[1:-1]
weight = np.abs(0.5 - (t - 0.5))
out[..., offset : offset + frame_length] += weight * frame
sum_weight[offset : offset + frame_length] += weight
offset += stride
assert sum_weight.min() > 0
return out / sum_weight
class NeuTTSAir:
def __init__(
self,
backbone_repo="neuphonic/neutts-air",
backbone_device="cpu",
codec_repo="neuphonic/neucodec",
codec_repo2="ntu-spml/distilhubert",
codec_device="cpu",
):
# Consts
self.sample_rate = 24_000
self.max_context = 2048
self.hop_length = 480
self.streaming_overlap_frames = 1
self.streaming_frames_per_chunk = 25
self.streaming_lookforward = 5
self.streaming_lookback = 50
self.streaming_stride_samples = self.streaming_frames_per_chunk * self.hop_length
# ggml & onnx flags
self._is_quantized_model = False
self._is_onnx_codec = False
# HF tokenizer
self.tokenizer = None
# Load phonemizer + models
print("Loading phonemizer...")
self.phonemizer = EspeakBackend(
language="en-us", preserve_punctuation=True, with_stress=True
)
self._load_backbone(backbone_repo, backbone_device)
self._load_codec(codec_repo, codec_repo2, codec_device)
# Load watermarker
self.watermarker = perth.PerthImplicitWatermarker()
def _load_backbone(self, backbone_repo, backbone_device):
print(f"Loading backbone from: {backbone_repo} on {backbone_device} ...")
# GGUF loading
if backbone_repo.endswith("gguf"):
try:
from llama_cpp import Llama
except ImportError as e:
raise ImportError(
"Failed to import `llama_cpp`. "
"Please install it with:\n"
" pip install llama-cpp-python"
) from e
# self.backbone = Llama.from_pretrained(
# repo_id=backbone_repo,
# filename="*.gguf",
# verbose=False,
# n_gpu_layers=-1 if backbone_device == "gpu" else 0,
# n_ctx=self.max_context,
# mlock=True,
# flash_attn=True if backbone_device == "gpu" else False,
# )
self.backbone = Llama(
model_path=backbone_repo, # 👈 直接传本地路径
verbose=False,
n_gpu_layers=-1 if backbone_device == "gpu" else 0,
n_ctx=self.max_context,
mlock=True,
flash_attn=True if backbone_device == "gpu" else False,
)
self._is_quantized_model = True
else:
self.tokenizer = AutoTokenizer.from_pretrained(backbone_repo)
self.backbone = AutoModelForCausalLM.from_pretrained(backbone_repo).to(
torch.device(backbone_device)
)
def _load_codec(self, codec_repo, codec_repo2, codec_device):
print(f"Loading codec from: {codec_repo} on {codec_device} ...")
if "distill-neucodec" in codec_repo:
self.codec = DistillNeuCodec.from_pretrained(codec_repo, model_id2=codec_repo2)
self.codec.eval().to(codec_device)
elif "neucodec-onnx-decoder" in codec_repo:
if codec_device != "cpu":
raise ValueError("Onnx decoder only currently runs on CPU.")
try:
from neucodec import NeuCodecOnnxDecoder
except ImportError as e:
raise ImportError(
"Failed to import the onnx decoder."
" Ensure you have onnxruntime installed as well as neucodec >= 0.0.4."
) from e
self.codec = NeuCodecOnnxDecoder.from_pretrained(codec_repo)
self._is_onnx_codec = True
elif "neucodec" in codec_repo:
self.codec = NeuCodec.from_pretrained(codec_repo, model_id2=codec_repo2)
self.codec.eval().to(codec_device)
else:
raise ValueError(
"Invalid codec repo! Must be one of:"
" 'neuphonic/neucodec', 'neuphonic/distill-neucodec',"
" 'neuphonic/neucodec-onnx-decoder'."
)
def infer(self, text: str, ref_codes: np.ndarray | torch.Tensor, ref_text: str) -> np.ndarray:
"""
Perform inference to generate speech from text using the TTS model and reference audio.
Args:
text (str): Input text to be converted to speech.
ref_codes (np.ndarray | torch.tensor): Encoded reference.
ref_text (str): Reference text for reference audio. Defaults to None.
Returns:
np.ndarray: Generated speech waveform.
"""
# Generate tokens
if self._is_quantized_model:
output_str = self._infer_ggml(ref_codes, ref_text, text)
else:
prompt_ids = self._apply_chat_template(ref_codes, ref_text, text)
output_str = self._infer_torch(prompt_ids)
# Decode
wav = self._decode(output_str)
watermarked_wav = self.watermarker.apply_watermark(wav, sample_rate=24_000)
return watermarked_wav
def infer_stream(self, text: str, ref_codes: np.ndarray | torch.Tensor, ref_text: str) -> Generator[np.ndarray, None, None]:
"""
Perform streaming inference to generate speech from text using the TTS model and reference audio.
Args:
text (str): Input text to be converted to speech.
ref_codes (np.ndarray | torch.tensor): Encoded reference.
ref_text (str): Reference text for reference audio. Defaults to None.
Yields:
np.ndarray: Generated speech waveform.
"""
if self._is_quantized_model:
return self._infer_stream_ggml(ref_codes, ref_text, text)
else:
raise NotImplementedError("Streaming is not implemented for the torch backend!")
def encode_reference(self, ref_audio_path: str | Path):
wav, _ = librosa.load(ref_audio_path, sr=16000, mono=True)
wav_tensor = torch.from_numpy(wav).float().unsqueeze(0).unsqueeze(0) # [1, 1, T]
with torch.no_grad():
ref_codes = self.codec.encode_code(audio_or_path=wav_tensor).squeeze(0).squeeze(0)
return ref_codes
def _decode(self, codes: str):
# Extract speech token IDs using regex
speech_ids = [int(num) for num in re.findall(r"<\|speech_(\d+)\|>", codes)]
if len(speech_ids) > 0:
# Onnx decode
if self._is_onnx_codec:
codes = np.array(speech_ids, dtype=np.int32)[np.newaxis, np.newaxis, :]
recon = self.codec.decode_code(codes)
# Torch decode
else:
with torch.no_grad():
codes = torch.tensor(speech_ids, dtype=torch.long)[None, None, :].to(
self.codec.device
)
recon = self.codec.decode_code(codes).cpu().numpy()
return recon[0, 0, :]
else:
raise ValueError("No valid speech tokens found in the output.")
def _to_phones(self, text: str) -> str:
phones = self.phonemizer.phonemize([text])
phones = phones[0].split()
phones = " ".join(phones)
return phones
def _apply_chat_template(
self, ref_codes: list[int], ref_text: str, input_text: str
) -> list[int]:
input_text = self._to_phones(ref_text) + " " + self._to_phones(input_text)
speech_replace = self.tokenizer.convert_tokens_to_ids("<|SPEECH_REPLACE|>")
speech_gen_start = self.tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_START|>")
text_replace = self.tokenizer.convert_tokens_to_ids("<|TEXT_REPLACE|>")
text_prompt_start = self.tokenizer.convert_tokens_to_ids("<|TEXT_PROMPT_START|>")
text_prompt_end = self.tokenizer.convert_tokens_to_ids("<|TEXT_PROMPT_END|>")
input_ids = self.tokenizer.encode(input_text, add_special_tokens=False)
chat = """user: Convert the text to speech:<|TEXT_REPLACE|>\nassistant:<|SPEECH_REPLACE|>"""
ids = self.tokenizer.encode(chat)
text_replace_idx = ids.index(text_replace)
ids = (
ids[:text_replace_idx]
+ [text_prompt_start]
+ input_ids
+ [text_prompt_end]
+ ids[text_replace_idx + 1 :] # noqa
)
speech_replace_idx = ids.index(speech_replace)
codes_str = "".join([f"<|speech_{i}|>" for i in ref_codes])
codes = self.tokenizer.encode(codes_str, add_special_tokens=False)
ids = ids[:speech_replace_idx] + [speech_gen_start] + list(codes)
return ids
def _infer_torch(self, prompt_ids: list[int]) -> str:
prompt_tensor = torch.tensor(prompt_ids).unsqueeze(0).to(self.backbone.device)
speech_end_id = self.tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_END|>")
with torch.no_grad():
output_tokens = self.backbone.generate(
prompt_tensor,
max_length=self.max_context,
eos_token_id=speech_end_id,
do_sample=True,
temperature=1.0,
top_k=50,
use_cache=True,
min_new_tokens=50,
)
input_length = prompt_tensor.shape[-1]
output_str = self.tokenizer.decode(
output_tokens[0, input_length:].cpu().numpy().tolist(), add_special_tokens=False
)
return output_str
def _infer_ggml(self, ref_codes: list[int], ref_text: str, input_text: str) -> str:
ref_text = self._to_phones(ref_text)
input_text = self._to_phones(input_text)
codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes])
prompt = (
f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text} {input_text}"
f"<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}"
)
output = self.backbone(
prompt,
max_tokens=self.max_context,
temperature=1.0,
top_k=50,
stop=["<|SPEECH_GENERATION_END|>"],
)
output_str = output["choices"][0]["text"]
return output_str
def _infer_stream_ggml(self, ref_codes: torch.Tensor, ref_text: str, input_text: str) -> Generator[np.ndarray, None, None]:
ref_text = self._to_phones(ref_text)
input_text = self._to_phones(input_text)
codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes])
prompt = (
f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text} {input_text}"
f"<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}"
)
audio_cache: list[np.ndarray] = []
token_cache: list[str] = [f"<|speech_{idx}|>" for idx in ref_codes]
n_decoded_samples: int = 0
n_decoded_tokens: int = len(ref_codes)
for item in self.backbone(
prompt,
max_tokens=self.max_context,
temperature=1.0,
top_k=50,
stop=["<|SPEECH_GENERATION_END|>"],
stream=True
):
output_str = item["choices"][0]["text"]
token_cache.append(output_str)
if len(token_cache[n_decoded_tokens:]) >= self.streaming_frames_per_chunk + self.streaming_lookforward:
# decode chunk
tokens_start = max(
n_decoded_tokens
- self.streaming_lookback
- self.streaming_overlap_frames,
0
)
tokens_end = (
n_decoded_tokens
+ self.streaming_frames_per_chunk
+ self.streaming_lookforward
+ self.streaming_overlap_frames
)
sample_start = (
n_decoded_tokens - tokens_start
) * self.hop_length
sample_end = (
sample_start
+ (self.streaming_frames_per_chunk + 2 * self.streaming_overlap_frames) * self.hop_length
)
curr_codes = token_cache[tokens_start:tokens_end]
recon = self._decode("".join(curr_codes))
recon = self.watermarker.apply_watermark(recon, sample_rate=24_000)
recon = recon[sample_start:sample_end]
audio_cache.append(recon)
# postprocess
processed_recon = _linear_overlap_add(
audio_cache, stride=self.streaming_stride_samples
)
new_samples_end = len(audio_cache) * self.streaming_stride_samples
processed_recon = processed_recon[
n_decoded_samples:new_samples_end
]
n_decoded_samples = new_samples_end
n_decoded_tokens += self.streaming_frames_per_chunk
yield processed_recon
# final decoding handled seperately as non-constant chunk size
remaining_tokens = len(token_cache) - n_decoded_tokens
if len(token_cache) > n_decoded_tokens:
tokens_start = max(
len(token_cache)
- (self.streaming_lookback + self.streaming_overlap_frames + remaining_tokens),
0
)
sample_start = (
len(token_cache)
- tokens_start
- remaining_tokens
- self.streaming_overlap_frames
) * self.hop_length
curr_codes = token_cache[tokens_start:]
recon = self._decode("".join(curr_codes))
recon = self.watermarker.apply_watermark(recon, sample_rate=24_000)
recon = recon[sample_start:]
audio_cache.append(recon)
processed_recon = _linear_overlap_add(audio_cache, stride=self.streaming_stride_samples)
processed_recon = processed_recon[n_decoded_samples:]
yield processed_reconneucodec下的model.py
from typing import Optional, Dict
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from torchaudio import transforms as T
from huggingface_hub import PyTorchModelHubMixin, ModelHubMixin, hf_hub_download
from transformers import AutoFeatureExtractor, HubertModel, Wav2Vec2BertModel
from .codec_encoder import CodecEncoder
from .codec_encoder_distill import DistillCodecEncoder
from .codec_decoder_vocos import CodecDecoderVocos
from .module import SemanticEncoder
class NeuCodec(
nn.Module,
PyTorchModelHubMixin,
repo_url="https://github.com/neuphonic/neucodec",
license="apache-2.0",
):
def __init__(self, sample_rate: int, hop_length: int):
super().__init__()
self.sample_rate = sample_rate
self.hop_length = hop_length
# self.semantic_model = Wav2Vec2BertModel.from_pretrained(
# "facebook/w2v-bert-2.0", output_hidden_states=True
# )
# self.feature_extractor = AutoFeatureExtractor.from_pretrained(
# "facebook/w2v-bert-2.0"
# )
self.semantic_model = None
self.feature_extractor = None
self.SemanticEncoder_module = SemanticEncoder(1024, 1024, 1024)
self.CodecEnc = CodecEncoder()
self.generator = CodecDecoderVocos(hop_length=hop_length)
self.fc_prior = nn.Linear(2048, 2048)
self.fc_post_a = nn.Linear(2048, 1024)
@property
def device(self):
return next(self.parameters()).device
@classmethod
def _from_pretrained(
cls,
*,
model_id: str,
model_id2: Optional[str] = None,
revision: Optional[str] = None,
cache_dir: Optional[str] = None,
force_download: bool = False,
proxies: Optional[Dict] = None,
resume_download: bool = False,
local_files_only: bool = False,
token: Optional[str] = None,
map_location: str = "cpu",
strict: bool = True,
**model_kwargs,
):
if "distill-neucodec" in model_id:
ignore_keys = []
else:
ignore_keys = ["fc_post_s", "SemanticDecoder"]
cls.semantic_model = HubertModel.from_pretrained(model_id2, output_hidden_states=True)
cls.feature_extractor = AutoFeatureExtractor.from_pretrained(model_id2)
# download the model weights file
# ckpt_path = hf_hub_download(
# repo_id=model_id,
# filename="pytorch_model.bin",
# revision=revision,
# cache_dir=cache_dir,
# force_download=force_download,
# proxies=proxies,
# resume_download=resume_download,
# local_files_only=local_files_only,
# token=token,
# )
# download meta.yaml to track number of downloads
# _ = hf_hub_download(
# repo_id=model_id,
# filename="meta.yaml",
# revision=revision,
# cache_dir=cache_dir,
# force_download=force_download,
# proxies=proxies,
# resume_download=resume_download,
# local_files_only=local_files_only,
# token=token,
# )
# initialize model
model = cls(24_000, 480)
# load weights
state_dict = torch.load(model_id+"/pytorch_model.bin", map_location)
contains_list = lambda s, l: any(i in s for i in l)
state_dict = {
k:v for k, v in state_dict.items()
if not contains_list(k, ignore_keys)
}
# TODO: we can move to strict loading once we clean up the checkpoints
model.load_state_dict(state_dict, strict=False)
return model
def _prepare_audio(self, audio_or_path: torch.Tensor | Path | str):
# load from file
if isinstance(audio_or_path, (Path, str)):
y, sr = torchaudio.load(audio_or_path)
if sr != 16_000:
y, sr = (T.Resample(sr, 16_000)(y), 16_000)
y = y[None, :] # [1, T] -> [B, 1, T]
# ensure input tensor is of correct shape
elif isinstance(audio_or_path, torch.Tensor):
y = audio_or_path
if len(y.shape) == 3:
y = audio_or_path
else:
raise ValueError(
f"NeuCodec expects tensor audio input to be of shape [B, 1, T] -- received shape: {y.shape}"
)
# pad audio
pad_for_wav = 320 - (y.shape[-1] % 320)
y = torch.nn.functional.pad(y, (0, pad_for_wav))
return y
def encode_code(self, audio_or_path: torch.Tensor | Path | str) -> torch.Tensor:
"""
Args:
audio_or_path: torch.Tensor [B, 1, T] | Path | str, input audio
Returns:
fsq_codes: torch.Tensor [B, 1, F], 50hz FSQ codes
"""
# prepare inputs
y = self._prepare_audio(audio_or_path)
semantic_features = self.feature_extractor(
y.squeeze(0), sampling_rate=16_000, return_tensors="pt"
).input_features.to(self.device)
# acoustic encoding
acoustic_emb = self.CodecEnc(y.to(self.device))
acoustic_emb = acoustic_emb.transpose(1, 2)
# semantic encoding
semantic_output = (
self.semantic_model(semantic_features).hidden_states[16].transpose(1, 2)
)
semantic_encoded = self.SemanticEncoder_module(semantic_output)
# concatenate embeddings
if acoustic_emb.shape[-1] != semantic_encoded.shape[-1]:
min_len = min(acoustic_emb.shape[-1], semantic_encoded.shape[-1])
acoustic_emb = acoustic_emb[:, :, :min_len]
semantic_encoded = semantic_encoded[:, :, :min_len]
concat_emb = torch.cat([semantic_encoded, acoustic_emb], dim=1)
concat_emb = self.fc_prior(concat_emb.transpose(1, 2)).transpose(1, 2)
# quantize
_, fsq_codes, _ = self.generator(concat_emb, vq=True)
return fsq_codes
def decode_code(self, fsq_codes: torch.Tensor) -> torch.Tensor:
"""
Args:
fsq_codes: torch.Tensor [B, 1, F], 50hz FSQ codes
Returns:
recon: torch.Tensor [B, 1, T], reconstructed 24kHz audio
"""
fsq_post_emb = self.generator.quantizer.get_output_from_indices(fsq_codes.transpose(1, 2))
fsq_post_emb = fsq_post_emb.transpose(1, 2)
fsq_post_emb = self.fc_post_a(fsq_post_emb.transpose(1, 2)).transpose(1, 2)
recon = self.generator(fsq_post_emb.transpose(1, 2), vq=False)[0]
return recon
class DistillNeuCodec(NeuCodec):
def __init__(self, sample_rate: int, hop_length: int):
nn.Module.__init__(self)
self.sample_rate = sample_rate
self.hop_length = hop_length
self.SemanticEncoder_module = SemanticEncoder(768, 768, 1024)
self.codec_encoder = DistillCodecEncoder()
self.generator = CodecDecoderVocos(hop_length=hop_length)
self.fc_prior = nn.Linear(
768 # acoustic model
+ 768, # semantic model
2048,
)
self.fc_sq_prior = nn.Linear(512, 768)
self.fc_post_a = nn.Linear(2048, 1024)
def encode_code(self, audio_or_path: torch.Tensor | Path | str) -> torch.Tensor:
"""
Args:
audio_or_path: torch.Tensor [B, 1, T] | Path | str, input audio
Returns:
fsq_codes: torch.Tensor [B, 1, F], 50hz FSQ codes
"""
# prepare inputs
y = self._prepare_audio(audio_or_path)
semantic_features = (
self.feature_extractor(
F.pad(y[0, :].cpu(), (160, 160)),
sampling_rate=16_000,
return_tensors="pt",
)
.input_values.to(self.device)
.squeeze(0)
)
# acoustic encoding
fsq_emb = self.fc_sq_prior(self.codec_encoder(y.to(self.device)))
fsq_emb = fsq_emb.transpose(1, 2)
# semantic encoding
semantic_target = self.semantic_model(
semantic_features
).last_hidden_state.transpose(1, 2)
semantic_target = self.SemanticEncoder_module(semantic_target)
if fsq_emb.shape[-1] != semantic_target.shape[-1]:
min_len = min(fsq_emb.shape[-1], semantic_target.shape[-1])
fsq_emb = fsq_emb[:, :, :min_len]
semantic_target = semantic_target[:, :, :min_len]
concat_emb = torch.cat([semantic_target, fsq_emb], dim=1)
concat_emb = self.fc_prior(concat_emb.transpose(1, 2)).transpose(1, 2)
_, fsq_codes, _ = self.generator(concat_emb, vq=True)
return fsq_codes
class NeuCodecOnnxDecoder(
ModelHubMixin,
repo_url="https://github.com/neuphonic/neucodec",
license="apache-2.0",
):
def __init__(self, onnx_path):
# onnx import
try:
import onnxruntime
except ImportError as e:
raise ImportError("Failed to import `onnxruntime`. Install with the following command: pip install onnxruntime") from e
# load model
so = onnxruntime.SessionOptions()
so.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = onnxruntime.InferenceSession(
onnx_path,
sess_options=so
)
self.sample_rate = 24_000
@classmethod
def _from_pretrained(
cls,
*,
model_id: str,
revision: Optional[str] = None,
cache_dir: Optional[str] = None,
force_download: bool = False,
proxies: Optional[Dict] = None,
resume_download: bool = False,
local_files_only: bool = False,
token: Optional[str] = None,
map_location: str = "cpu",
strict: bool = True,
**model_kwargs,
):
# download the model weights file
onnx_path = hf_hub_download(
repo_id=model_id,
filename="model.onnx",
revision=revision,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
resume_download=resume_download,
local_files_only=local_files_only,
token=token,
)
# download meta.yaml to track number of downloads
_ = hf_hub_download(
repo_id=model_id,
filename="meta.yaml",
revision=revision,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
resume_download=resume_download,
local_files_only=local_files_only,
token=token,
)
# initialize model
model = cls(onnx_path) #cls(onnx_path)
# only support CPU
if map_location != "cpu":
raise ValueError("The onnx decoder currently only supports CPU runtimes.")
return model
def encode_code(self, *args, **kwargs):
raise NotImplementedError(
"The onnx decoder has no functionality to encode codes, as it only contains the compiled decoder graph."
)
def decode_code(self, codes: np.ndarray) -> np.ndarray:
"""
Args:
fsq_codes: np.array [B, 1, F], 50hz FSQ codes
Returns:
recon: np.array [B, 1, T], reconstructed 24kHz audio
"""
# validate inputs
if not isinstance(codes, np.ndarray):
raise ValueError("`Codes` should be an np.array.")
if not len(codes.shape) == 3 or codes.shape[1] != 1:
raise ValueError("`Codes` should be of shape [B, 1, F].")
# run decoder
recon = self.session.run(
None, {"codes": codes}
)[0].astype(np.float32)
return recon之后需要确保系统已安装语音处理所需的底层库
sudo apt update
sudo apt install -y espeak espeak-ng随后创建 Python 虚拟环境并安装依赖
python3 -m venv --system-site-packages neutts
source neutts/bin/activate安装llama-cpp-python用于llama-cpp python推理
sudo apt-get update
sudo apt-get install libgomp1 libomp-dev
pip install llama-cpp-python -i https://pypi.tuna.tsinghua.edu.cn/simple安装neutts-air项目的相关依赖
cd neutts-air
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple编译并安装修改后的neucodec
cd ../neucodec
pip install -e . --no-deps模型测试
创建测试文件
创建测试文件用于测试
cd ../neutts-air
touch test.py将如下代码复制到test.py文件中并执行
from neuttsair.neutts import NeuTTSAir
import soundfile as sf
import time
tts = NeuTTSAir( backbone_repo="/home/radxa/Documents/llm/neutts/models/neutss-air-BF16.gguf", backbone_device="cpu", codec_repo="/home/radxa/Documents/llm/neutts/models/distill-neucodec", codec_repo2="/home/radxa/Documents/llm/neutts/models/distilhubert", codec_device="cpu")
input_text = "My name is Dave, and um, I'm from London. I love China!"
ref_text = "samples/dave.txt"
ref_audio_path = "samples/dave.wav"
ref_text = open(ref_text, "r").read().strip()
# 开始计时
start_time = time.time()
ref_codes = tts.encode_reference(ref_audio_path)
wav = tts.infer(input_text, ref_codes, ref_text)
# 结束计时
end_time = time.time()
inference_time = end_time - start_time
# 打印耗时
print(f"Inference time: {inference_time:.2f} seconds")
sf.write("test.wav", wav, 24000)CPU测试
首先是基于cpu的模型部署测试,运行test.py脚本后,结果如下:
可以看到在 Orion6 的CPU上,完整推理流程耗时约 16.82 秒。而相同任务在x86云服务器上使用原始 PyTorch 模型测试需要数分钟。尽管尚未达到实时交互水平,但该性能已足以支撑离线播报等典型边缘应用场景。
输出的音频文件如下:
https://pan.quark.cn/s/84242c...
vulkan测试
之后将backbone改成部署在vulkan。但由于当前 PyTorch 对 Vulkan 的支持有限,且编解码器仍需运行于 CPU上,测试结果如下:
可以看到整体推理时间反而略增至 17.80 秒。因此,在目前情况下下,纯 CPU 部署仍是更优选择。
小结
NeuTTSAir 展示了高质量语音合成向端侧迁移的可行性。通过合理的模型压缩、本地化加载与轻量架构设计,该系统在保障音质的同时,实现了完全离线、低资源占用的部署能力。目前仅是在Orion6上进行了基础的部署测试,后续还有更多的优化空间。