Waterman · 1月5日 · 辽宁

【“星睿O6”AI PC开发套件评测】部署运行端侧TTS-Neutts-air

本文将详细介绍如何在 Orion6 开发板上完成 NeuTTSAir 的本地部署,涵盖模型量化、依赖配置、代码适配及推理测试等关键步骤。

Neutts-air简介

1.PNG
NeuTTSAir 是 Neuphonic 于 2025 年推出的轻量级端侧文本到语音(TTS)系统,采用“语言模型 + 自研编解码器”的混合架构,能够在仅需 3 秒参考音频的条件下实现高保真语音克隆。该系统基于 Qwen2.5-0.5B 轻量化语言模型,并以 GGUF 格式发布,支持在资源受限设备(如手机、树莓派、Orion6 等)上完全离线运行,兼顾实时性、隐私安全与音质表现。

模型转换

NeuTTSAir 官方已提供 BF16 精度的 GGUF 模型,可直接用于 llama.cpp 推理框架。为适配边缘设备的内存与计算限制,我们进一步将其量化为 Q4_0 格式。
首先使用git lfs命令,从modelscope或huggingface上下载模型

# Make sure git-lfs is installed (https://git-lfs.com)
git lfs install
git clone https://huggingface.co/neuphonic/neutts-air

安装编译工具

sudo apt install cmake gcc g++ libcurl4-openssl-dev

2.png
之后克隆并编译 llama.cpp

git clone https://github.com/ggml-org/llama.cpp.git
cd llama.cpp
cmake -B build
cmake --build build --config Release

3.PNG
随后,使用 llama-quantize 工具对模型进行量化:

cd build/bin
./llama-quantize /public/home/scntb3z6kw/SothisAI/model/ExternalSource/neutts-air/main/neutts-air/neutss-air-BF16.gguf ../../../../models/neutss-air-BF16.gguf Q4_0

4.PNG

模型部署

首先将相关代码克隆至本地

git clone https://github.com/neuphonic/neutts-air.git
git clone https://github.com/neuphonic/neucodec.git

官方实现默认从 Hugging Face 自动下载模型。为支持完全离线运行,我们可以修改如下代码以支持加载本地模型路径。
neutts-air下的neutts.py

from typing import Generator
from pathlib import Path
import librosa
import numpy as np
import torch
import re
import perth
from neucodec import NeuCodec, DistillNeuCodec
from phonemizer.backend import EspeakBackend
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from threading import Thread


def _linear_overlap_add(frames: list[np.ndarray], stride: int) -> np.ndarray:
    # original impl --> https://github.com/facebookresearch/encodec/blob/main/encodec/utils.py
    assert len(frames)
    dtype = frames[0].dtype
    shape = frames[0].shape[:-1]

    total_size = 0
    for i, frame in enumerate(frames):
        frame_end = stride * i + frame.shape[-1]
        total_size = max(total_size, frame_end)

    sum_weight = np.zeros(total_size, dtype=dtype)
    out = np.zeros(*shape, total_size, dtype=dtype)

    offset: int = 0
    for frame in frames:
        frame_length = frame.shape[-1]
        t = np.linspace(0, 1, frame_length + 2, dtype=dtype)[1:-1]
        weight = np.abs(0.5 - (t - 0.5))

        out[..., offset : offset + frame_length] += weight * frame
        sum_weight[offset : offset + frame_length] += weight
        offset += stride
    assert sum_weight.min() > 0
    return out / sum_weight


class NeuTTSAir:

    def __init__(
        self,
        backbone_repo="neuphonic/neutts-air",
        backbone_device="cpu",
        codec_repo="neuphonic/neucodec",
        codec_repo2="ntu-spml/distilhubert",
        codec_device="cpu",
    ):

        # Consts
        self.sample_rate = 24_000
        self.max_context = 2048
        self.hop_length = 480
        self.streaming_overlap_frames = 1
        self.streaming_frames_per_chunk = 25
        self.streaming_lookforward = 5
        self.streaming_lookback = 50
        self.streaming_stride_samples = self.streaming_frames_per_chunk * self.hop_length

        # ggml & onnx flags
        self._is_quantized_model = False
        self._is_onnx_codec = False

        # HF tokenizer
        self.tokenizer = None

        # Load phonemizer + models
        print("Loading phonemizer...")
        self.phonemizer = EspeakBackend(
            language="en-us", preserve_punctuation=True, with_stress=True
        )

        self._load_backbone(backbone_repo, backbone_device)

        self._load_codec(codec_repo, codec_repo2, codec_device)

        # Load watermarker
        self.watermarker = perth.PerthImplicitWatermarker()

    def _load_backbone(self, backbone_repo, backbone_device):
        print(f"Loading backbone from: {backbone_repo} on {backbone_device} ...")

        # GGUF loading
        if backbone_repo.endswith("gguf"):

            try:
                from llama_cpp import Llama
            except ImportError as e:
                raise ImportError(
                    "Failed to import `llama_cpp`. "
                    "Please install it with:\n"
                    "    pip install llama-cpp-python"
                ) from e

            # self.backbone = Llama.from_pretrained(
            #     repo_id=backbone_repo,
            #     filename="*.gguf",
            #     verbose=False,
            #     n_gpu_layers=-1 if backbone_device == "gpu" else 0,
            #     n_ctx=self.max_context,
            #     mlock=True,
            #     flash_attn=True if backbone_device == "gpu" else False,
            # )
            self.backbone = Llama(
                model_path=backbone_repo,          # 👈 直接传本地路径
                verbose=False,
                n_gpu_layers=-1 if backbone_device == "gpu" else 0,
                n_ctx=self.max_context,
                mlock=True,
                flash_attn=True if backbone_device == "gpu" else False,
            )
            
            self._is_quantized_model = True

        else:
            self.tokenizer = AutoTokenizer.from_pretrained(backbone_repo)
            self.backbone = AutoModelForCausalLM.from_pretrained(backbone_repo).to(
                torch.device(backbone_device)
            )

    def _load_codec(self, codec_repo, codec_repo2, codec_device):

        print(f"Loading codec from: {codec_repo} on {codec_device} ...")
        if "distill-neucodec" in codec_repo:
            self.codec = DistillNeuCodec.from_pretrained(codec_repo, model_id2=codec_repo2)
            self.codec.eval().to(codec_device)
        elif "neucodec-onnx-decoder" in codec_repo:
            if codec_device != "cpu":
                raise ValueError("Onnx decoder only currently runs on CPU.")

            try:
                from neucodec import NeuCodecOnnxDecoder
            except ImportError as e:
                raise ImportError(
                    "Failed to import the onnx decoder."
                    " Ensure you have onnxruntime installed as well as neucodec >= 0.0.4."
                ) from e

            self.codec = NeuCodecOnnxDecoder.from_pretrained(codec_repo)
            self._is_onnx_codec = True
        elif "neucodec" in codec_repo:
            self.codec = NeuCodec.from_pretrained(codec_repo, model_id2=codec_repo2)
            self.codec.eval().to(codec_device)
        else:
            raise ValueError(
                "Invalid codec repo! Must be one of:"
                " 'neuphonic/neucodec', 'neuphonic/distill-neucodec',"
                " 'neuphonic/neucodec-onnx-decoder'."
            )

    def infer(self, text: str, ref_codes: np.ndarray | torch.Tensor, ref_text: str) -> np.ndarray:
        """
        Perform inference to generate speech from text using the TTS model and reference audio.

        Args:
            text (str): Input text to be converted to speech.
            ref_codes (np.ndarray | torch.tensor): Encoded reference.
            ref_text (str): Reference text for reference audio. Defaults to None.
        Returns:
            np.ndarray: Generated speech waveform.
        """

        # Generate tokens
        if self._is_quantized_model:
            output_str = self._infer_ggml(ref_codes, ref_text, text)
        else:
            prompt_ids = self._apply_chat_template(ref_codes, ref_text, text)
            output_str = self._infer_torch(prompt_ids)

        # Decode
        wav = self._decode(output_str)
        watermarked_wav = self.watermarker.apply_watermark(wav, sample_rate=24_000)

        return watermarked_wav
    
    def infer_stream(self, text: str, ref_codes: np.ndarray | torch.Tensor, ref_text: str) -> Generator[np.ndarray, None, None]:
        """
        Perform streaming inference to generate speech from text using the TTS model and reference audio.

        Args:
            text (str): Input text to be converted to speech.
            ref_codes (np.ndarray | torch.tensor): Encoded reference.
            ref_text (str): Reference text for reference audio. Defaults to None.
        Yields:
            np.ndarray: Generated speech waveform.
        """ 

        if self._is_quantized_model:
            return self._infer_stream_ggml(ref_codes, ref_text, text)

        else:
            raise NotImplementedError("Streaming is not implemented for the torch backend!")

    def encode_reference(self, ref_audio_path: str | Path):
        wav, _ = librosa.load(ref_audio_path, sr=16000, mono=True)
        wav_tensor = torch.from_numpy(wav).float().unsqueeze(0).unsqueeze(0)  # [1, 1, T]
        with torch.no_grad():
            ref_codes = self.codec.encode_code(audio_or_path=wav_tensor).squeeze(0).squeeze(0)
        return ref_codes

    def _decode(self, codes: str):

        # Extract speech token IDs using regex
        speech_ids = [int(num) for num in re.findall(r"<\|speech_(\d+)\|>", codes)]

        if len(speech_ids) > 0:

            # Onnx decode
            if self._is_onnx_codec:
                codes = np.array(speech_ids, dtype=np.int32)[np.newaxis, np.newaxis, :]
                recon = self.codec.decode_code(codes)

            # Torch decode
            else:
                with torch.no_grad():
                    codes = torch.tensor(speech_ids, dtype=torch.long)[None, None, :].to(
                        self.codec.device
                    )
                    recon = self.codec.decode_code(codes).cpu().numpy()

            return recon[0, 0, :]
        else:
            raise ValueError("No valid speech tokens found in the output.")

    def _to_phones(self, text: str) -> str:
        phones = self.phonemizer.phonemize([text])
        phones = phones[0].split()
        phones = " ".join(phones)
        return phones

    def _apply_chat_template(
        self, ref_codes: list[int], ref_text: str, input_text: str
    ) -> list[int]:

        input_text = self._to_phones(ref_text) + " " + self._to_phones(input_text)
        speech_replace = self.tokenizer.convert_tokens_to_ids("<|SPEECH_REPLACE|>")
        speech_gen_start = self.tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_START|>")
        text_replace = self.tokenizer.convert_tokens_to_ids("<|TEXT_REPLACE|>")
        text_prompt_start = self.tokenizer.convert_tokens_to_ids("<|TEXT_PROMPT_START|>")
        text_prompt_end = self.tokenizer.convert_tokens_to_ids("<|TEXT_PROMPT_END|>")

        input_ids = self.tokenizer.encode(input_text, add_special_tokens=False)
        chat = """user: Convert the text to speech:<|TEXT_REPLACE|>\nassistant:<|SPEECH_REPLACE|>"""
        ids = self.tokenizer.encode(chat)

        text_replace_idx = ids.index(text_replace)
        ids = (
            ids[:text_replace_idx]
            + [text_prompt_start]
            + input_ids
            + [text_prompt_end]
            + ids[text_replace_idx + 1 :]  # noqa
        )

        speech_replace_idx = ids.index(speech_replace)
        codes_str = "".join([f"<|speech_{i}|>" for i in ref_codes])
        codes = self.tokenizer.encode(codes_str, add_special_tokens=False)
        ids = ids[:speech_replace_idx] + [speech_gen_start] + list(codes)

        return ids

    def _infer_torch(self, prompt_ids: list[int]) -> str:
        prompt_tensor = torch.tensor(prompt_ids).unsqueeze(0).to(self.backbone.device)
        speech_end_id = self.tokenizer.convert_tokens_to_ids("<|SPEECH_GENERATION_END|>")
        with torch.no_grad():
            output_tokens = self.backbone.generate(
                prompt_tensor,
                max_length=self.max_context,
                eos_token_id=speech_end_id,
                do_sample=True,
                temperature=1.0,
                top_k=50,
                use_cache=True,
                min_new_tokens=50,
            )
        input_length = prompt_tensor.shape[-1]
        output_str = self.tokenizer.decode(
            output_tokens[0, input_length:].cpu().numpy().tolist(), add_special_tokens=False
        )
        return output_str
    
    def _infer_ggml(self, ref_codes: list[int], ref_text: str, input_text: str) -> str:
        ref_text = self._to_phones(ref_text)
        input_text = self._to_phones(input_text)

        codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes])
        prompt = (
            f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text} {input_text}"
            f"<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}"
        )
        output = self.backbone(
            prompt,
            max_tokens=self.max_context,
            temperature=1.0,
            top_k=50,
            stop=["<|SPEECH_GENERATION_END|>"],
        )
        output_str = output["choices"][0]["text"]
        return output_str

    def _infer_stream_ggml(self, ref_codes: torch.Tensor, ref_text: str, input_text: str) -> Generator[np.ndarray, None, None]:
        ref_text = self._to_phones(ref_text)
        input_text = self._to_phones(input_text)

        codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes])
        prompt = (
            f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text} {input_text}"
            f"<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}"
        )

        audio_cache: list[np.ndarray] = []
        token_cache: list[str] = [f"<|speech_{idx}|>" for idx in ref_codes]
        n_decoded_samples: int = 0
        n_decoded_tokens: int = len(ref_codes)

        for item in self.backbone(
            prompt,
            max_tokens=self.max_context,
            temperature=1.0,
            top_k=50,
            stop=["<|SPEECH_GENERATION_END|>"],
            stream=True
        ):
            output_str = item["choices"][0]["text"]
            token_cache.append(output_str)

            if len(token_cache[n_decoded_tokens:]) >= self.streaming_frames_per_chunk + self.streaming_lookforward:

                # decode chunk
                tokens_start = max(
                    n_decoded_tokens
                    - self.streaming_lookback
                    - self.streaming_overlap_frames,
                    0
                )
                tokens_end = (
                    n_decoded_tokens
                    + self.streaming_frames_per_chunk
                    + self.streaming_lookforward
                    + self.streaming_overlap_frames
                )
                sample_start = (
                    n_decoded_tokens - tokens_start
                ) * self.hop_length
                sample_end = (
                    sample_start
                    + (self.streaming_frames_per_chunk + 2 * self.streaming_overlap_frames) * self.hop_length
                )
                curr_codes = token_cache[tokens_start:tokens_end]
                recon = self._decode("".join(curr_codes))
                recon = self.watermarker.apply_watermark(recon, sample_rate=24_000)
                recon = recon[sample_start:sample_end]
                audio_cache.append(recon)

                # postprocess
                processed_recon = _linear_overlap_add(
                    audio_cache, stride=self.streaming_stride_samples
                )
                new_samples_end = len(audio_cache) * self.streaming_stride_samples
                processed_recon = processed_recon[
                    n_decoded_samples:new_samples_end
                ]
                n_decoded_samples = new_samples_end
                n_decoded_tokens += self.streaming_frames_per_chunk
                yield processed_recon

        # final decoding handled seperately as non-constant chunk size
        remaining_tokens = len(token_cache) - n_decoded_tokens
        if len(token_cache) > n_decoded_tokens:
            tokens_start = max(
                len(token_cache)
                - (self.streaming_lookback + self.streaming_overlap_frames + remaining_tokens), 
                0
            )
            sample_start = (
                len(token_cache) 
                - tokens_start 
                - remaining_tokens 
                - self.streaming_overlap_frames
            ) * self.hop_length
            curr_codes = token_cache[tokens_start:]
            recon = self._decode("".join(curr_codes))
            recon = self.watermarker.apply_watermark(recon, sample_rate=24_000)
            recon = recon[sample_start:]
            audio_cache.append(recon)

            processed_recon = _linear_overlap_add(audio_cache, stride=self.streaming_stride_samples)
            processed_recon = processed_recon[n_decoded_samples:]
            yield processed_recon

neucodec下的model.py

from typing import Optional, Dict
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from torchaudio import transforms as T
from huggingface_hub import PyTorchModelHubMixin, ModelHubMixin, hf_hub_download
from transformers import AutoFeatureExtractor, HubertModel, Wav2Vec2BertModel

from .codec_encoder import CodecEncoder
from .codec_encoder_distill import DistillCodecEncoder
from .codec_decoder_vocos import CodecDecoderVocos
from .module import SemanticEncoder


class NeuCodec(
    nn.Module,
    PyTorchModelHubMixin,
    repo_url="https://github.com/neuphonic/neucodec",
    license="apache-2.0",
):

    def __init__(self, sample_rate: int, hop_length: int):
        super().__init__()
        self.sample_rate = sample_rate
        self.hop_length = hop_length
        # self.semantic_model = Wav2Vec2BertModel.from_pretrained(
        #     "facebook/w2v-bert-2.0", output_hidden_states=True
        # )
        # self.feature_extractor = AutoFeatureExtractor.from_pretrained(
        #     "facebook/w2v-bert-2.0"
        # )
        self.semantic_model = None
        self.feature_extractor = None
        self.SemanticEncoder_module = SemanticEncoder(1024, 1024, 1024)
        self.CodecEnc = CodecEncoder()
        self.generator = CodecDecoderVocos(hop_length=hop_length)
        self.fc_prior = nn.Linear(2048, 2048)
        self.fc_post_a = nn.Linear(2048, 1024)

    @property
    def device(self):
        return next(self.parameters()).device

    @classmethod
    def _from_pretrained(
        cls,
        *,
        model_id: str,
        model_id2: Optional[str] = None,
        revision: Optional[str] = None,
        cache_dir: Optional[str] = None,
        force_download: bool = False,
        proxies: Optional[Dict] = None,
        resume_download: bool = False,
        local_files_only: bool = False,
        token: Optional[str] = None,
        map_location: str = "cpu",
        strict: bool = True,
        **model_kwargs,
    ):
        
        if "distill-neucodec" in model_id:
            ignore_keys = []
        else:
            ignore_keys = ["fc_post_s", "SemanticDecoder"]

        cls.semantic_model = HubertModel.from_pretrained(model_id2, output_hidden_states=True)
        cls.feature_extractor = AutoFeatureExtractor.from_pretrained(model_id2)

        # download the model weights file
        # ckpt_path = hf_hub_download(
        #     repo_id=model_id,
        #     filename="pytorch_model.bin",
        #     revision=revision,
        #     cache_dir=cache_dir,
        #     force_download=force_download,
        #     proxies=proxies,
        #     resume_download=resume_download,
        #     local_files_only=local_files_only,
        #     token=token,
        # )

        # download meta.yaml to track number of downloads
        # _ = hf_hub_download(
        #     repo_id=model_id,
        #     filename="meta.yaml",
        #     revision=revision,
        #     cache_dir=cache_dir,
        #     force_download=force_download,
        #     proxies=proxies,
        #     resume_download=resume_download,
        #     local_files_only=local_files_only,
        #     token=token,
        # )

        # initialize model
        model = cls(24_000, 480)

        # load weights
        state_dict = torch.load(model_id+"/pytorch_model.bin", map_location)
        contains_list = lambda s, l: any(i in s for i in l)
        state_dict = {
            k:v for k, v in state_dict.items() 
            if not contains_list(k, ignore_keys)
        }

        # TODO: we can move to strict loading once we clean up the checkpoints
        model.load_state_dict(state_dict, strict=False)

        return model
    
    def _prepare_audio(self, audio_or_path: torch.Tensor | Path | str):
        
        # load from file
        if isinstance(audio_or_path, (Path, str)):
            y, sr = torchaudio.load(audio_or_path)
            if sr != 16_000:
                y, sr = (T.Resample(sr, 16_000)(y), 16_000)
                y = y[None, :]  # [1, T] -> [B, 1, T]

        # ensure input tensor is of correct shape
        elif isinstance(audio_or_path, torch.Tensor):
            y = audio_or_path
            if len(y.shape) == 3:
                y = audio_or_path
            else:
                raise ValueError(
                    f"NeuCodec expects tensor audio input to be of shape [B, 1, T] -- received shape: {y.shape}"
                )

        # pad audio
        pad_for_wav = 320 - (y.shape[-1] % 320)
        y = torch.nn.functional.pad(y, (0, pad_for_wav))
        
        return y
        
    def encode_code(self, audio_or_path: torch.Tensor | Path | str) -> torch.Tensor:
        """
        Args:
            audio_or_path: torch.Tensor [B, 1, T] | Path | str, input audio

        Returns:
            fsq_codes: torch.Tensor [B, 1, F], 50hz FSQ codes
        """
         
        # prepare inputs
        y = self._prepare_audio(audio_or_path)
        semantic_features = self.feature_extractor(
            y.squeeze(0), sampling_rate=16_000, return_tensors="pt"
        ).input_features.to(self.device)

        # acoustic encoding
        acoustic_emb = self.CodecEnc(y.to(self.device))
        acoustic_emb = acoustic_emb.transpose(1, 2)

        # semantic encoding
        semantic_output = (
            self.semantic_model(semantic_features).hidden_states[16].transpose(1, 2)
        )
        semantic_encoded = self.SemanticEncoder_module(semantic_output)

        # concatenate embeddings
        if acoustic_emb.shape[-1] != semantic_encoded.shape[-1]:
            min_len = min(acoustic_emb.shape[-1], semantic_encoded.shape[-1])
            acoustic_emb = acoustic_emb[:, :, :min_len]
            semantic_encoded = semantic_encoded[:, :, :min_len]        
        concat_emb = torch.cat([semantic_encoded, acoustic_emb], dim=1)
        concat_emb = self.fc_prior(concat_emb.transpose(1, 2)).transpose(1, 2)

        # quantize
        _, fsq_codes, _ = self.generator(concat_emb, vq=True)
        return fsq_codes

    def decode_code(self, fsq_codes: torch.Tensor) -> torch.Tensor:
        """
        Args:
            fsq_codes: torch.Tensor [B, 1, F], 50hz FSQ codes

        Returns:
            recon: torch.Tensor [B, 1, T], reconstructed 24kHz audio
        """

        fsq_post_emb = self.generator.quantizer.get_output_from_indices(fsq_codes.transpose(1, 2))
        fsq_post_emb = fsq_post_emb.transpose(1, 2)
        fsq_post_emb = self.fc_post_a(fsq_post_emb.transpose(1, 2)).transpose(1, 2) 
        recon = self.generator(fsq_post_emb.transpose(1, 2), vq=False)[0]
        return recon
    

class DistillNeuCodec(NeuCodec):
    def __init__(self, sample_rate: int, hop_length: int):
        nn.Module.__init__(self)
        self.sample_rate = sample_rate
        self.hop_length = hop_length
        self.SemanticEncoder_module = SemanticEncoder(768, 768, 1024)
        self.codec_encoder = DistillCodecEncoder()
        self.generator = CodecDecoderVocos(hop_length=hop_length)
        self.fc_prior = nn.Linear(
            768  # acoustic model
            + 768,  # semantic model
            2048,
        )
        self.fc_sq_prior = nn.Linear(512, 768)
        self.fc_post_a = nn.Linear(2048, 1024)
        
    def encode_code(self, audio_or_path:  torch.Tensor | Path | str) -> torch.Tensor:
        """
        Args:
            audio_or_path: torch.Tensor [B, 1, T] | Path | str, input audio

        Returns:
            fsq_codes: torch.Tensor [B, 1, F], 50hz FSQ codes
        """
         
        # prepare inputs
        y = self._prepare_audio(audio_or_path)
        semantic_features = (
            self.feature_extractor(
                F.pad(y[0, :].cpu(), (160, 160)),
                sampling_rate=16_000,
                return_tensors="pt",
            )
            .input_values.to(self.device)
            .squeeze(0)
        )

        # acoustic encoding
        fsq_emb = self.fc_sq_prior(self.codec_encoder(y.to(self.device)))
        fsq_emb = fsq_emb.transpose(1, 2)

        # semantic encoding
        semantic_target = self.semantic_model(
            semantic_features
        ).last_hidden_state.transpose(1, 2)
        semantic_target = self.SemanticEncoder_module(semantic_target)

        if fsq_emb.shape[-1] != semantic_target.shape[-1]:
            min_len = min(fsq_emb.shape[-1], semantic_target.shape[-1])
            fsq_emb = fsq_emb[:, :, :min_len]
            semantic_target = semantic_target[:, :, :min_len]

        concat_emb = torch.cat([semantic_target, fsq_emb], dim=1)
        concat_emb = self.fc_prior(concat_emb.transpose(1, 2)).transpose(1, 2)
        _, fsq_codes, _ = self.generator(concat_emb, vq=True)
        return fsq_codes


class NeuCodecOnnxDecoder(
    ModelHubMixin,
    repo_url="https://github.com/neuphonic/neucodec",
    license="apache-2.0",
):
    
    def __init__(self, onnx_path):
        
        # onnx import
        try: 
            import onnxruntime
        except ImportError as e:
            raise ImportError("Failed to import `onnxruntime`. Install with the following command: pip install onnxruntime") from e
        
        # load model
        so = onnxruntime.SessionOptions()
        so.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
        self.session = onnxruntime.InferenceSession(
            onnx_path,
            sess_options=so
        )
        self.sample_rate = 24_000

    @classmethod
    def _from_pretrained(
        cls,
        *,
        model_id: str,
        revision: Optional[str] = None,
        cache_dir: Optional[str] = None,
        force_download: bool = False,
        proxies: Optional[Dict] = None,
        resume_download: bool = False,
        local_files_only: bool = False,
        token: Optional[str] = None,
        map_location: str = "cpu",
        strict: bool = True,
        **model_kwargs,
    ):
        
        # download the model weights file
        onnx_path = hf_hub_download(
            repo_id=model_id,
            filename="model.onnx",
            revision=revision,
            cache_dir=cache_dir,
            force_download=force_download,
            proxies=proxies,
            resume_download=resume_download,
            local_files_only=local_files_only,
            token=token,
        )

        # download meta.yaml to track number of downloads
        _ = hf_hub_download(
            repo_id=model_id,
            filename="meta.yaml",
            revision=revision,
            cache_dir=cache_dir,
            force_download=force_download,
            proxies=proxies,
            resume_download=resume_download,
            local_files_only=local_files_only,
            token=token,
        )

        # initialize model
        model = cls(onnx_path) #cls(onnx_path)

        # only support CPU
        if map_location != "cpu":
            raise ValueError("The onnx decoder currently only supports CPU runtimes.")

        return model
    
    def encode_code(self, *args, **kwargs):
        raise NotImplementedError(
            "The onnx decoder has no functionality to encode codes, as it only contains the compiled decoder graph."
        )
    
    def decode_code(self, codes: np.ndarray) -> np.ndarray:
        """
        Args:
            fsq_codes: np.array [B, 1, F], 50hz FSQ codes

        Returns:
            recon: np.array [B, 1, T], reconstructed 24kHz audio
        """

        # validate inputs
        if not isinstance(codes, np.ndarray):
            raise ValueError("`Codes` should be an np.array.")
        if not len(codes.shape) == 3 or codes.shape[1] != 1:
            raise ValueError("`Codes` should be of shape [B, 1, F].")

        # run decoder
        recon = self.session.run(
            None, {"codes": codes}
        )[0].astype(np.float32)
        
        return recon

之后需要确保系统已安装语音处理所需的底层库

sudo apt update
sudo apt install -y espeak espeak-ng

随后创建 Python 虚拟环境并安装依赖

python3 -m venv --system-site-packages neutts
source neutts/bin/activate

安装llama-cpp-python用于llama-cpp python推理

sudo apt-get update
sudo apt-get install libgomp1 libomp-dev
pip install llama-cpp-python -i https://pypi.tuna.tsinghua.edu.cn/simple

安装neutts-air项目的相关依赖

cd neutts-air
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

编译并安装修改后的neucodec

cd ../neucodec
pip install -e . --no-deps

5.png

模型测试

创建测试文件

创建测试文件用于测试

cd ../neutts-air
touch test.py

将如下代码复制到test.py文件中并执行

from neuttsair.neutts import NeuTTSAir
import soundfile as sf
import time

tts = NeuTTSAir( backbone_repo="/home/radxa/Documents/llm/neutts/models/neutss-air-BF16.gguf", backbone_device="cpu", codec_repo="/home/radxa/Documents/llm/neutts/models/distill-neucodec", codec_repo2="/home/radxa/Documents/llm/neutts/models/distilhubert", codec_device="cpu")

input_text = "My name is Dave, and um, I'm from London. I love China!"

ref_text = "samples/dave.txt"
ref_audio_path = "samples/dave.wav"

ref_text = open(ref_text, "r").read().strip()

# 开始计时
start_time = time.time()

ref_codes = tts.encode_reference(ref_audio_path)

wav = tts.infer(input_text, ref_codes, ref_text)

# 结束计时
end_time = time.time()
inference_time = end_time - start_time
# 打印耗时
print(f"Inference time: {inference_time:.2f} seconds")

sf.write("test.wav", wav, 24000)

CPU测试

首先是基于cpu的模型部署测试,运行test.py脚本后,结果如下:
6.png
可以看到在 Orion6 的CPU上,完整推理流程耗时约 16.82 秒。而相同任务在x86云服务器上使用原始 PyTorch 模型测试需要数分钟。尽管尚未达到实时交互水平,但该性能已足以支撑离线播报等典型边缘应用场景。
输出的音频文件如下:
https://pan.quark.cn/s/84242c...

vulkan测试

之后将backbone改成部署在vulkan。但由于当前 PyTorch 对 Vulkan 的支持有限,且编解码器仍需运行于 CPU上,测试结果如下:
7.png
可以看到整体推理时间反而略增至 17.80 秒。因此,在目前情况下下,纯 CPU 部署仍是更优选择。

小结

NeuTTSAir 展示了高质量语音合成向端侧迁移的可行性。通过合理的模型压缩、本地化加载与轻量架构设计,该系统在保障音质的同时,实现了完全离线、低资源占用的部署能力。目前仅是在Orion6上进行了基础的部署测试,后续还有更多的优化空间。

推荐阅读
关注数
0
文章数
4
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息