feat:提交

main
old-tom 3 months ago
commit 5076963f2c

228
.gitignore vendored

@ -0,0 +1,228 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be added to the global gitignore or merged into this project gitignore. For a PyCharm
# project, it is recommended to include the following files:
#*.iml
#*.iws
#*.ipr
#out/
# Rider
.idea/
*.sln.iml
# Visual Studio Code
.vscode/
# macOS
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
# Project specific
# 模型文件(太大,不适合版本控制)
models/*.onnx
models/*.tar.bz2
models/*.zip
# 日志文件
logs/
*.log
# 输出文件
output/
# 临时文件
temp/
tmp/
# 音频文件
*.wav
*.mp3
*.flac
*.m4a
# 配置文件(可能包含敏感信息)
config.local.py
settings.local.py
.env.local
# UV 相关
.uv/
uv.lock

@ -0,0 +1 @@
3.12.10

@ -0,0 +1,42 @@
# 角色
你是一名精通python开发的高级工程师拥有10年的开发经验擅长使用python 开发各种应用程序,你的任务是帮助用户设计和开发易用且易于维护的应用。请始终遵循最佳实践并坚持干净代码和健壮架构的原则。
# 目标
你的目标是以用户容易理解的方式帮助他们完成应用的设计和开发工作,确保应用功能完善、性能优异、用户体验良好、可扩展性强,并且易于维护和扩展。
# 要求
在理解用户需求、设计UI、编写代码、解决问题和项目迭代优化时你应该始终遵循以下原则
## 一、项目初始化
+ 在项目开始时首先仔细阅读项目目录下的README.md文件包括项目目标、功能架构、技术栈和开发计划。确保对项目的整体架构和实现方法有清晰的认知。
+ 如果还没有README.md请主动创建一个用于后续记录该应用的功能模块、页面结构、数据模型、接口设计等信息。
## 二、需求理解
+ 充分理解用户需求,分析需求是否存在缺漏、冲突,确保需求的完整性和准确性,并与用户讨论完善需求。
+ 选择最合理的解决方案来满足用户需求,避免过度设计。
## 三、UI和样式设计
+ 使用现代UI框架进行样式设计。
+ 在不同的平台上实现一致的设计和响应式模式
## 四、代码编写
+ 技术选型通常已经在README.md中说明如果没有的话请根据项目需求选择合适的技术栈。你需要仔细阅读相关技术栈的API文档不能猜测API的使用方法而应该根据文档进行使用。
+ 代码结构通常已经在README.md中说明, 如果没有的话请根据项目需求选择合适的代码结构。强调代码清晰、可读性、模块化、可维护性遵循最佳实践如DRY原则、KISS原则、YAGNI原则、最小权限原则、响应式、函数式等
+ 代码安全性:始终考虑代码安全性,避免引入漏洞。
+ 性能优化:优化代码性能,减少资源占用,提升加载运行速度,确保项目高效运行。
+ 测试与文档:编写单元测试,确保代码质量,并提供清晰的中文注释和文档,方便后续阅读和维护。
## 五、问题解决
+ 全面月底相关代码和文档,理解项目的整体架构和实现方法、工作原理。
+ 根据用户的反馈分析问题原因,提出解决方案,确保问题得到有效解决。
+ 确保每次代码变更不会破坏现有功能,尽可能保持最小的改动。
## 六、项目迭代优化
+ 与用户保持密切沟通,根据反馈调整功能和设计,确保应用符合用户需求。
+ 在不确定需求时,主动询问用户已澄清需求和技术细节
+ 每次迭代必须更新说明文件包括功能说明和优化建议如果说明文件不存在请在docs目录下主动创建一个。
## 七、方法论
+ 系统思维:以分析严谨的方式解决问题。可将需求拆解为更小、更易于理解、管理的部分,并在实施前仔细思考每一步。
+ 思维树:评估多种可能的解决方案及后果。使用结构化的方法探索不用的实现路径,并选择最优的解决方案。
+ 迭代改进:在最终确定代码前,考虑改进、边缘情况和优化。通过潜在增强的迭代,确保最终解决方案是健壮的。

@ -0,0 +1,54 @@
本项目目标是实现实时语音识别功能,支持中英文。
实现思路如下:
1. 语音输入:使用麦克风采集音频数据。
2. 语音预处理:对音频数据进行预处理,如降噪、增益控制等。
3. 语音识别:使用语音识别模型将预处理后的音频数据转换为文本。
# 基础环境
+ 操作系统linux
+ 语言python 3.12
+ 虚拟环境及包管理工具uv
+ 语音识别框架新一代kaldi
+ 语音识别模型zipformer
+ 部署框架sherpa-onnx
# 目录层级说明
```text
.
├── .venv
│ ├── bin
│ ├── lib
│ └── pyvenv.cfg
├── .python-version
├── docs
├── src
│ └── main.py
├── config
├── tests
├── static
├── README.md
├── hello.py
├── pyproject.toml
└── uv.lock
```
+ docs 存放项目文档
+ src 存放项目源码
+ config 存放项目配置文件
+ tests 存放单元测试代码
+ static 目录下存放静态文件如图片、css、js等
+ README.md 项目说明文件
+ pyproject.toml 项目配置文件
+ uv.lock 虚拟环境依赖文件
除main.py外其他文件均需要放置在src子目录下要求子目录名称清晰易读不能过长。
# 包管理命令说明
1. 添加包 uv add <package>
2. 移除包 uv remove <package>
3. 所有包管理命令均需要在项目根目录下执行并且强制使用uv命令不能使用pip命令。
# 脚本运行命令说明
1. 运行项目 uv run main.py
2. 运行单元测试 uv run -m pytest tests
*禁止修改本文件*

@ -0,0 +1,153 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
实时语音转文字系统主程序
基于sherpa-onnx的实时语音识别应用
"""
import sys
import argparse
import logging
from pathlib import Path
# 添加src目录到Python路径
sys.path.insert(0, str(Path(__file__).parent / "src"))
from src import RealTimeVTT, ModelDownloader, ModelConfig
def setup_logging(level: str = "INFO"):
"""设置日志"""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def check_models() -> bool:
"""检查模型文件是否存在"""
config = ModelConfig()
missing_files = config.validate_model_files()
if missing_files:
print("错误: 缺少模型文件")
print("缺少的文件:")
for file_path in missing_files:
print(f" - {file_path}")
print("\n请运行以下命令下载模型:")
print(" python main.py --download-model")
return False
return True
def download_model_interactive():
"""交互式下载模型"""
config = ModelConfig()
downloader = ModelDownloader(config)
downloader.interactive_download()
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description="实时语音转文字系统",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
python main.py # 启动实时语音识别
python main.py --download-model # 下载语音识别模型
python main.py --list-devices # 列出音频设备
python main.py --log-level DEBUG # 启用调试日志
"""
)
parser.add_argument(
"--download-model",
action="store_true",
help="下载语音识别模型"
)
parser.add_argument(
"--list-devices",
action="store_true",
help="列出可用的音频设备"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="设置日志级别 (默认: INFO)"
)
parser.add_argument(
"--no-save",
action="store_true",
help="不保存识别结果到文件"
)
parser.add_argument(
"--no-partial",
action="store_true",
help="不显示部分识别结果"
)
args = parser.parse_args()
# 设置日志
setup_logging(args.log_level)
try:
if args.download_model:
# 下载模型
download_model_interactive()
return
# 创建应用实例
app = RealTimeVTT()
# 应用配置
if args.no_save:
app.app_config.save_to_file = False
if args.no_partial:
app.app_config.show_partial_results = False
if args.list_devices:
# 列出音频设备
if not app.audio_processor.initialize():
print("错误: 无法初始化音频设备")
return 1
devices = app.list_audio_devices()
print("可用的音频设备:")
for device in devices:
print(f" [{device['index']}] {device['name']}")
print(f" 通道数: {device['channels']}")
print(f" 采样率: {device['sample_rate']} Hz")
app.cleanup()
return
# 检查模型文件
if not check_models():
return 1
# 初始化应用
if not app.initialize():
print("错误: 应用初始化失败")
return 1
# 运行应用
app.run_interactive()
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"错误: {e}")
if args.log_level == "DEBUG":
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

File diff suppressed because it is too large Load Diff

@ -0,0 +1,16 @@
[project]
name = "realtimevtt"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12.10"
dependencies = [
"numpy>=2.2.6",
"pyaudio>=0.2.14",
"sherpa-onnx>=1.12.0",
]
[[tool.uv.index]]
name = "private-pypi"
url = "https://mirrors.aliyun.com/pypi/simple/"
default = true

@ -0,0 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
实时语音转文字系统
基于sherpa-onnx的实时语音识别应用
"""
__version__ = "1.0.0"
__author__ = "RealTimeVTT Team"
__description__ = "基于sherpa-onnx的实时语音转文字系统"
from .config import ModelConfig, AudioConfig, AppConfig
from .audio_processor import AudioProcessor
from .speech_recognizer import SpeechRecognizer, RecognitionResult, RecognitionSession
from .realtime_vtt import RealTimeVTT
from .model_downloader import ModelDownloader
__all__ = [
"ModelConfig",
"AudioConfig",
"AppConfig",
"AudioProcessor",
"SpeechRecognizer",
"RecognitionResult",
"RecognitionSession",
"RealTimeVTT",
"ModelDownloader"
]

@ -0,0 +1,175 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
音频处理模块
负责从麦克风采集音频数据并进行预处理
"""
import pyaudio
import numpy as np
import threading
import queue
import logging
from typing import Optional, Callable
from .config import AudioConfig
class AudioProcessor:
"""音频处理器类"""
def __init__(self, config: AudioConfig):
self.config = config
self.audio = None
self.stream = None
self.is_recording = False
self.audio_queue = queue.Queue()
self.record_thread = None
# 设置日志
self.logger = logging.getLogger(__name__)
def initialize(self) -> bool:
"""初始化音频设备"""
try:
self.audio = pyaudio.PyAudio()
self.config.format = pyaudio.paInt16
# 检查音频设备
device_count = self.audio.get_device_count()
self.logger.info(f"检测到 {device_count} 个音频设备")
# 查找默认输入设备
default_input_device = self.audio.get_default_input_device_info()
self.logger.info(f"默认输入设备: {default_input_device['name']}")
return True
except Exception as e:
self.logger.error(f"音频设备初始化失败: {e}")
return False
def start_recording(self, callback: Optional[Callable] = None) -> bool:
"""开始录音"""
if self.is_recording:
self.logger.warning("录音已在进行中")
return False
try:
# 创建音频流(只使用回调模式)
self.stream = self.audio.open(
format=self.config.format,
channels=self.config.channels,
rate=self.config.sample_rate,
input=True,
frames_per_buffer=self.config.chunk_size,
stream_callback=self._audio_callback if callback is None else callback
)
self.is_recording = True
self.stream.start_stream()
self.logger.info("开始录音")
return True
except Exception as e:
self.logger.error(f"启动录音失败: {e}")
return False
def stop_recording(self):
"""停止录音"""
if not self.is_recording:
return
self.is_recording = False
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
if self.record_thread and self.record_thread.is_alive():
self.record_thread.join(timeout=1.0)
self.logger.info("停止录音")
def get_audio_data(self, timeout: float = 0.1) -> Optional[np.ndarray]:
"""获取音频数据"""
try:
audio_data = self.audio_queue.get(timeout=timeout)
return audio_data
except queue.Empty:
return None
def _audio_callback(self, in_data, frame_count, time_info, status):
"""音频回调函数"""
if status:
self.logger.warning(f"音频流状态: {status}")
# 将音频数据转换为numpy数组
audio_data = np.frombuffer(in_data, dtype=np.int16)
# 将数据放入队列
try:
self.audio_queue.put_nowait(audio_data)
except queue.Full:
# 队列满时丢弃最旧的数据
try:
self.audio_queue.get_nowait()
self.audio_queue.put_nowait(audio_data)
except queue.Empty:
pass
return (None, pyaudio.paContinue)
def _record_loop(self):
"""录音循环(已弃用,现在使用回调模式)"""
# 此方法已不再使用,保留用于兼容性
pass
def cleanup(self):
"""清理资源"""
self.stop_recording()
if self.audio:
self.audio.terminate()
self.audio = None
self.logger.info("音频处理器已清理")
def list_audio_devices(self):
"""列出所有音频设备"""
if not self.audio:
self.logger.error("音频设备未初始化")
return []
devices = []
device_count = self.audio.get_device_count()
for i in range(device_count):
device_info = self.audio.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0: # 只显示输入设备
devices.append({
'index': i,
'name': device_info['name'],
'channels': device_info['maxInputChannels'],
'sample_rate': device_info['defaultSampleRate']
})
return devices
def apply_noise_reduction(self, audio_data: np.ndarray) -> np.ndarray:
"""简单的噪声抑制"""
# 这里可以实现更复杂的噪声抑制算法
# 目前只做简单的音量门限处理
threshold = np.max(np.abs(audio_data)) * 0.1
audio_data[np.abs(audio_data) < threshold] = 0
return audio_data
def apply_gain_control(self, audio_data: np.ndarray, target_level: float = 0.5) -> np.ndarray:
"""自动增益控制"""
current_level = np.sqrt(np.mean(audio_data.astype(np.float32) ** 2))
if current_level > 0:
gain = target_level / current_level
# 限制增益范围
gain = np.clip(gain, 0.1, 10.0)
audio_data = (audio_data.astype(np.float32) * gain).astype(np.int16)
return audio_data

@ -0,0 +1,85 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
配置文件
管理sherpa-onnx模型路径和语音识别参数
"""
import os
from pathlib import Path
# 项目根目录
PROJECT_ROOT = Path(__file__).parent.parent
# 模型配置
class ModelConfig:
"""模型配置类"""
def __init__(self):
# 模型文件路径(需要用户下载)
self.model_dir = PROJECT_ROOT / "models"
# 默认使用中英双语模型
self.tokens = str(self.model_dir / "tokens.txt")
self.encoder = str(self.model_dir / "encoder-epoch-99-avg-1.onnx")
self.decoder = str(self.model_dir / "decoder-epoch-99-avg-1.onnx")
self.joiner = str(self.model_dir / "joiner-epoch-99-avg-1.onnx")
# 语音识别参数
self.sample_rate = 16000
self.feature_dim = 80
self.num_threads = 1
# 端点检测参数
self.enable_endpoint = True
self.enable_endpoint_detection = True
self.rule1_min_trailing_silence = 2.4
self.rule2_min_trailing_silence = 1.2
self.rule3_min_utterance_length = 300
# 解码方法
self.decoding_method = "greedy_search"
self.max_active_paths = 4
self.provider = "cpu"
def validate_model_files(self):
"""验证模型文件是否存在"""
required_files = [self.tokens, self.encoder, self.decoder, self.joiner]
missing_files = []
for file_path in required_files:
if not os.path.exists(file_path):
missing_files.append(file_path)
return missing_files
# 音频配置
class AudioConfig:
"""音频配置类"""
def __init__(self):
# PyAudio配置
self.sample_rate = 16000 # 采样率
self.chunk_size = 1024 # 每次读取的音频帧数
self.channels = 1 # 单声道
self.format = None # 将在运行时设置为pyaudio.paInt16
# 音频处理参数
self.samples_per_read = int(0.1 * self.sample_rate) # 100ms
# 应用配置
class AppConfig:
"""应用配置类"""
def __init__(self):
# 显示配置
self.show_partial_results = True # 显示部分识别结果
self.show_timestamps = True # 显示时间戳
# 日志配置
self.log_level = "INFO"
self.log_file = PROJECT_ROOT / "logs" / "app.log"
# 输出配置
self.output_file = PROJECT_ROOT / "output" / "transcription.txt"
self.save_to_file = True

@ -0,0 +1,311 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
模型下载工具
帮助用户下载sherpa-onnx模型文件
"""
import os
import urllib.request
import tarfile
import zipfile
import logging
from pathlib import Path
from typing import Dict, List
from .config import ModelConfig
class ModelDownloader:
"""模型下载器类"""
# 预定义的模型配置
MODELS = {
"zh-en-bilingual": {
"name": "中英双语模型 (推荐)",
"url": "https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/sherpa-onnx-streaming-zipformer-bilingual-zh-en-2023-02-20.tar.bz2",
"size": "约 65MB",
"description": "支持中文和英文的实时语音识别模型",
"files": {
"tokens": "tokens.txt",
"encoder": "encoder-epoch-99-avg-1.onnx",
"decoder": "decoder-epoch-99-avg-1.onnx",
"joiner": "joiner-epoch-99-avg-1.onnx"
}
},
"zh-only": {
"name": "中文模型",
"url": "https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/sherpa-onnx-streaming-zipformer-zh-14M-2023-02-23.tar.bz2",
"size": "约 45MB",
"description": "专门针对中文优化的语音识别模型",
"files": {
"tokens": "tokens.txt",
"encoder": "encoder-epoch-99-avg-1.onnx",
"decoder": "decoder-epoch-99-avg-1.onnx",
"joiner": "joiner-epoch-99-avg-1.onnx"
}
},
"en-only": {
"name": "英文模型",
"url": "https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/sherpa-onnx-streaming-zipformer-en-2023-06-26.tar.bz2",
"size": "约 65MB",
"description": "专门针对英文优化的语音识别模型",
"files": {
"tokens": "tokens.txt",
"encoder": "encoder-epoch-99-avg-1-chunk-16-left-128.onnx",
"decoder": "decoder-epoch-99-avg-1-chunk-16-left-128.onnx",
"joiner": "joiner-epoch-99-avg-1-chunk-16-left-128.onnx"
}
}
}
def __init__(self, config: ModelConfig):
self.config = config
self.logger = logging.getLogger(__name__)
# 创建模型目录
self.config.model_dir.mkdir(exist_ok=True)
def list_available_models(self) -> Dict:
"""列出可用的模型"""
return self.MODELS
def check_model_exists(self, model_key: str = "zh-en-bilingual") -> bool:
"""检查模型是否已存在"""
if model_key not in self.MODELS:
return False
model_info = self.MODELS[model_key]
for file_key, filename in model_info["files"].items():
file_path = self.config.model_dir / filename
if not file_path.exists():
return False
return True
def download_model(self, model_key: str = "zh-en-bilingual", force: bool = False) -> bool:
"""下载指定模型"""
if model_key not in self.MODELS:
self.logger.error(f"未知的模型: {model_key}")
return False
model_info = self.MODELS[model_key]
# 检查模型是否已存在
if not force and self.check_model_exists(model_key):
self.logger.info(f"模型 {model_info['name']} 已存在")
return True
self.logger.info(f"开始下载模型: {model_info['name']}")
self.logger.info(f"大小: {model_info['size']}")
self.logger.info(f"描述: {model_info['description']}")
try:
# 下载文件
url = model_info["url"]
filename = url.split("/")[-1]
download_path = self.config.model_dir / filename
self.logger.info(f"正在下载: {url}")
self._download_file_with_progress(url, download_path)
# 解压文件
self.logger.info("正在解压文件...")
extract_dir = self._extract_archive(download_path)
# 移动文件到正确位置
self._organize_model_files(extract_dir, model_info["files"])
# 清理下载的压缩文件
download_path.unlink()
# 清理解压目录
if extract_dir.exists():
import shutil
shutil.rmtree(extract_dir)
self.logger.info(f"模型 {model_info['name']} 下载完成")
return True
except Exception as e:
self.logger.error(f"下载模型失败: {e}")
return False
def _download_file_with_progress(self, url: str, filepath: Path):
"""带进度显示的文件下载"""
def progress_hook(block_num, block_size, total_size):
downloaded = block_num * block_size
if total_size > 0:
percent = min(100, (downloaded * 100) // total_size)
print(f"\r下载进度: {percent}% ({downloaded // 1024 // 1024}MB / {total_size // 1024 // 1024}MB)", end="")
else:
print(f"\r已下载: {downloaded // 1024 // 1024}MB", end="")
urllib.request.urlretrieve(url, filepath, progress_hook)
print() # 换行
def _extract_archive(self, archive_path: Path) -> Path:
"""解压压缩文件"""
extract_dir = archive_path.parent / archive_path.stem
if archive_path.suffix == '.zip':
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
elif archive_path.suffix in ['.tar', '.bz2'] or '.tar.' in archive_path.name:
with tarfile.open(archive_path, 'r:*') as tar_ref:
tar_ref.extractall(extract_dir)
else:
raise ValueError(f"不支持的压缩格式: {archive_path.suffix}")
return extract_dir
def _organize_model_files(self, extract_dir: Path, file_mapping: Dict[str, str]):
"""整理模型文件到正确位置"""
# 查找解压后的实际目录
actual_dirs = [d for d in extract_dir.iterdir() if d.is_dir()]
if actual_dirs:
source_dir = actual_dirs[0] # 通常模型文件在第一个子目录中
else:
source_dir = extract_dir
# 移动文件
for file_key, target_filename in file_mapping.items():
# 查找源文件
source_files = list(source_dir.glob(f"*{target_filename}"))
if not source_files:
# 尝试查找类似的文件
if file_key == "encoder":
source_files = list(source_dir.glob("*encoder*.onnx"))
elif file_key == "decoder":
source_files = list(source_dir.glob("*decoder*.onnx"))
elif file_key == "joiner":
source_files = list(source_dir.glob("*joiner*.onnx"))
elif file_key == "tokens":
source_files = list(source_dir.glob("tokens.txt"))
if source_files:
source_file = source_files[0]
target_file = self.config.model_dir / target_filename
# 如果目标文件已存在,先删除
if target_file.exists():
target_file.unlink()
# 移动文件
source_file.rename(target_file)
self.logger.info(f"已安装: {target_filename}")
else:
self.logger.warning(f"未找到文件: {target_filename}")
def remove_model(self, model_key: str = "zh-en-bilingual") -> bool:
"""删除指定模型"""
if model_key not in self.MODELS:
self.logger.error(f"未知的模型: {model_key}")
return False
model_info = self.MODELS[model_key]
try:
for file_key, filename in model_info["files"].items():
file_path = self.config.model_dir / filename
if file_path.exists():
file_path.unlink()
self.logger.info(f"已删除: {filename}")
self.logger.info(f"模型 {model_info['name']} 已删除")
return True
except Exception as e:
self.logger.error(f"删除模型失败: {e}")
return False
def get_model_status(self) -> Dict[str, bool]:
"""获取所有模型的状态"""
status = {}
for model_key in self.MODELS:
status[model_key] = self.check_model_exists(model_key)
return status
def interactive_download(self):
"""交互式下载模型"""
print("=" * 60)
print("模型下载工具")
print("=" * 60)
# 显示可用模型
print("\n可用模型:")
for i, (key, info) in enumerate(self.MODELS.items(), 1):
status = "已安装" if self.check_model_exists(key) else "未安装"
print(f" {i}. {info['name']} ({info['size']}) - {status}")
print(f" {info['description']}")
# 用户选择
try:
choice = input("\n请选择要下载的模型 (1-3, 默认1): ").strip()
if not choice:
choice = "1"
choice_idx = int(choice) - 1
model_keys = list(self.MODELS.keys())
if 0 <= choice_idx < len(model_keys):
model_key = model_keys[choice_idx]
# 检查是否已存在
if self.check_model_exists(model_key):
overwrite = input("模型已存在,是否重新下载?(y/N): ").strip().lower()
if overwrite != 'y':
print("取消下载")
return
# 开始下载
print(f"\n开始下载模型...")
if self.download_model(model_key, force=True):
print("\n下载完成!")
else:
print("\n下载失败!")
else:
print("无效的选择")
except (ValueError, KeyboardInterrupt):
print("\n取消下载")
except Exception as e:
print(f"\n下载过程中出错: {e}")
def main():
"""命令行入口"""
import argparse
parser = argparse.ArgumentParser(description="sherpa-onnx 模型下载工具")
parser.add_argument("--model", choices=list(ModelDownloader.MODELS.keys()),
default="zh-en-bilingual", help="要下载的模型")
parser.add_argument("--force", action="store_true", help="强制重新下载")
parser.add_argument("--list", action="store_true", help="列出可用模型")
parser.add_argument("--status", action="store_true", help="显示模型状态")
parser.add_argument("--interactive", action="store_true", help="交互式下载")
args = parser.parse_args()
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
config = ModelConfig()
downloader = ModelDownloader(config)
if args.list:
print("可用模型:")
for key, info in downloader.list_available_models().items():
print(f" {key}: {info['name']} ({info['size']})")
print(f" {info['description']}")
elif args.status:
print("模型状态:")
status = downloader.get_model_status()
for key, installed in status.items():
info = downloader.MODELS[key]
status_text = "已安装" if installed else "未安装"
print(f" {key}: {info['name']} - {status_text}")
elif args.interactive:
downloader.interactive_download()
else:
downloader.download_model(args.model, args.force)
if __name__ == "__main__":
main()

@ -0,0 +1,319 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
实时语音转文字主应用
整合音频处理和语音识别功能
"""
import time
import threading
import logging
import signal
import sys
from pathlib import Path
from typing import Optional, Callable
from .config import ModelConfig, AudioConfig, AppConfig
from .audio_processor import AudioProcessor
from .speech_recognizer import SpeechRecognizer, RecognitionSession
class RealTimeVTT:
"""实时语音转文字应用类"""
def __init__(self):
# 配置
self.model_config = ModelConfig()
self.audio_config = AudioConfig()
self.app_config = AppConfig()
# 组件
self.audio_processor = AudioProcessor(self.audio_config)
self.speech_recognizer = SpeechRecognizer(self.model_config)
# 会话
self.session = RecognitionSession()
# 状态
self.is_running = False
self.processing_thread = None
# 回调函数
self.result_callback = None
self.partial_result_callback = None
# 设置日志
self._setup_logging()
self.logger = logging.getLogger(__name__)
# 设置信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _setup_logging(self):
"""设置日志"""
# 创建日志目录
log_dir = self.app_config.log_file.parent
log_dir.mkdir(exist_ok=True)
# 配置日志格式
logging.basicConfig(
level=getattr(logging, self.app_config.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.app_config.log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
def _signal_handler(self, signum, frame):
"""信号处理函数"""
self.logger.info(f"接收到信号 {signum},正在停止应用...")
self.stop()
sys.exit(0)
def initialize(self) -> bool:
"""初始化应用"""
self.logger.info("初始化实时语音转文字应用")
# 初始化音频处理器
if not self.audio_processor.initialize():
self.logger.error("音频处理器初始化失败")
return False
# 初始化语音识别器
if not self.speech_recognizer.initialize():
self.logger.error("语音识别器初始化失败")
return False
# 设置回调函数
self.speech_recognizer.set_result_callback(self._on_recognition_result)
self.speech_recognizer.set_partial_result_callback(self._on_partial_result)
# 创建输出目录
if self.app_config.save_to_file:
output_dir = self.app_config.output_file.parent
output_dir.mkdir(exist_ok=True)
self.logger.info("应用初始化完成")
return True
def start(self) -> bool:
"""开始实时语音识别"""
if self.is_running:
self.logger.warning("应用已在运行中")
return False
self.logger.info("开始实时语音识别")
# 创建识别流
if not self.speech_recognizer.create_stream():
self.logger.error("创建识别流失败")
return False
# 开始录音
if not self.audio_processor.start_recording():
self.logger.error("开始录音失败")
return False
# 开始会话
self.session.start()
# 启动处理线程
self.is_running = True
self.processing_thread = threading.Thread(target=self._processing_loop)
self.processing_thread.daemon = True
self.processing_thread.start()
self.logger.info("实时语音识别已启动")
return True
def stop(self):
"""停止实时语音识别"""
if not self.is_running:
return
self.logger.info("停止实时语音识别")
# 停止处理循环
self.is_running = False
# 等待处理线程结束
if self.processing_thread and self.processing_thread.is_alive():
self.processing_thread.join(timeout=2.0)
# 获取最终结果(在停止录音之前)
try:
final_result = self.speech_recognizer.finalize_stream()
if final_result:
self._on_recognition_result(final_result)
except Exception as e:
self.logger.warning(f"获取最终识别结果失败: {e}")
# 停止录音
self.audio_processor.stop_recording()
# 结束会话
self.session.stop()
# 保存结果到文件
if self.app_config.save_to_file:
self._save_results_to_file()
self.logger.info("实时语音识别已停止")
def _processing_loop(self):
"""音频处理循环"""
self.logger.info("开始音频处理循环")
while self.is_running:
try:
# 获取音频数据
audio_data = self.audio_processor.get_audio_data(timeout=0.1)
if audio_data is None:
continue
# 音频预处理
# audio_data = self.audio_processor.apply_noise_reduction(audio_data)
# audio_data = self.audio_processor.apply_gain_control(audio_data)
# 语音识别
result = self.speech_recognizer.process_audio(audio_data)
# 处理识别结果在回调函数中完成
except Exception as e:
self.logger.error(f"音频处理循环错误: {e}")
break
self.logger.info("音频处理循环结束")
def _on_recognition_result(self, text: str):
"""识别结果回调"""
if not text.strip():
return
# 添加到会话
result = self.session.add_result(text, is_final=True)
# 显示结果
if self.app_config.show_timestamps:
print(f"\n[{time.strftime('%H:%M:%S')}] {text}")
else:
print(f"\n{text}")
# 调用外部回调
if self.result_callback:
self.result_callback(result)
self.logger.info(f"识别结果: {text}")
def _on_partial_result(self, text: str):
"""部分识别结果回调"""
if not text.strip() or not self.app_config.show_partial_results:
return
# 显示部分结果(覆盖当前行)
print(f"\r正在识别: {text}", end="", flush=True)
# 调用外部回调
if self.partial_result_callback:
self.partial_result_callback(text)
def _save_results_to_file(self):
"""保存结果到文件"""
try:
with open(self.app_config.output_file, 'w', encoding='utf-8') as f:
f.write(f"# 语音识别结果\n")
f.write(f"# 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.session.start_time))}\n")
f.write(f"# 持续时间: {self.session.get_duration():.2f}\n")
f.write(f"# 识别结果数量: {len([r for r in self.session.results if r.is_final])}\n\n")
for result in self.session.results:
if result.is_final:
f.write(f"{result}\n")
f.write(f"\n# 完整文本\n")
f.write(self.session.get_full_text())
self.logger.info(f"结果已保存到: {self.app_config.output_file}")
except Exception as e:
self.logger.error(f"保存结果到文件失败: {e}")
def set_result_callback(self, callback: Callable):
"""设置识别结果回调函数"""
self.result_callback = callback
def set_partial_result_callback(self, callback: Callable):
"""设置部分识别结果回调函数"""
self.partial_result_callback = callback
def get_session_info(self) -> dict:
"""获取会话信息"""
return {
"is_active": self.session.is_active,
"start_time": self.session.start_time,
"duration": self.session.get_duration(),
"result_count": len([r for r in self.session.results if r.is_final]),
"full_text": self.session.get_full_text()
}
def get_model_info(self) -> dict:
"""获取模型信息"""
return self.speech_recognizer.get_model_info()
def list_audio_devices(self) -> list:
"""列出音频设备"""
return self.audio_processor.list_audio_devices()
def cleanup(self):
"""清理资源"""
self.stop()
self.audio_processor.cleanup()
self.speech_recognizer.cleanup()
self.logger.info("应用资源已清理")
def run_interactive(self):
"""交互式运行"""
print("=" * 60)
print("实时语音转文字系统")
print("=" * 60)
# 显示系统信息
print(f"\n模型信息:")
model_info = self.get_model_info()
for key, value in model_info.items():
print(f" {key}: {value}")
print(f"\n音频设备:")
devices = self.list_audio_devices()
for device in devices:
print(f" [{device['index']}] {device['name']} ({device['channels']} 通道)")
print(f"\n按 Ctrl+C 停止识别")
print(f"开始说话...\n")
try:
# 启动识别
if not self.start():
print("启动失败")
return
# 等待用户中断
while self.is_running:
time.sleep(0.1)
except KeyboardInterrupt:
print("\n\n用户中断")
finally:
self.cleanup()
# 显示会话统计
session_info = self.get_session_info()
print(f"\n=" * 60)
print(f"会话统计:")
print(f" 持续时间: {session_info['duration']:.2f}")
print(f" 识别结果数量: {session_info['result_count']}")
if self.app_config.save_to_file:
print(f" 结果已保存到: {self.app_config.output_file}")
print(f"=" * 60)

@ -0,0 +1,236 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
语音识别模块
使用sherpa-onnx进行实时语音识别
"""
import sherpa_onnx
import numpy as np
import logging
import time
from typing import Optional, List, Callable
from .config import ModelConfig
class SpeechRecognizer:
"""语音识别器类"""
def __init__(self, config: ModelConfig):
self.config = config
self.recognizer = None
self.stream = None
self.display = None
# 设置日志
self.logger = logging.getLogger(__name__)
# 识别结果回调
self.result_callback = None
self.partial_result_callback = None
def initialize(self) -> bool:
"""初始化语音识别器"""
try:
# 检查模型文件
missing_files = self.config.validate_model_files()
if missing_files:
self.logger.error(f"缺少模型文件: {missing_files}")
self.logger.error("请下载模型文件到 models/ 目录")
return False
# 使用工厂方法创建识别器
self.recognizer = sherpa_onnx.OnlineRecognizer.from_transducer(
tokens=self.config.tokens,
encoder=self.config.encoder,
decoder=self.config.decoder,
joiner=self.config.joiner,
num_threads=self.config.num_threads,
sample_rate=self.config.sample_rate,
feature_dim=self.config.feature_dim,
decoding_method=self.config.decoding_method,
max_active_paths=self.config.max_active_paths,
enable_endpoint_detection=self.config.enable_endpoint,
rule1_min_trailing_silence=self.config.rule1_min_trailing_silence,
rule2_min_trailing_silence=self.config.rule2_min_trailing_silence,
rule3_min_utterance_length=self.config.rule3_min_utterance_length,
provider=self.config.provider
)
self.logger.info("语音识别器初始化成功")
return True
except Exception as e:
self.logger.error(f"语音识别器初始化失败: {e}")
return False
def create_stream(self):
"""创建识别流"""
if not self.recognizer:
self.logger.error("识别器未初始化")
return None
self.stream = self.recognizer.create_stream()
return self.stream
def process_audio(self, audio_data: np.ndarray) -> Optional[str]:
"""处理音频数据"""
if not self.stream or not self.recognizer:
return None
try:
# 将音频数据转换为float32格式
audio_float = audio_data.astype(np.float32) / 32768.0
# 接受音频数据
self.stream.accept_waveform(
sample_rate=self.config.sample_rate,
waveform=audio_float
)
# 解码音频流
while self.recognizer.is_ready(self.stream):
self.recognizer.decode_stream(self.stream)
# 获取部分识别结果
result = self.recognizer.get_result(self.stream)
if result and result.strip() and self.partial_result_callback:
self.partial_result_callback(result)
# 检查是否有完整的识别结果(端点检测)
if self.recognizer.is_endpoint(self.stream):
final_result = self.recognizer.get_result(self.stream)
if final_result and final_result.strip():
if self.result_callback:
self.result_callback(final_result)
# 重置流以继续识别
self.recognizer.reset(self.stream)
return final_result
else:
# 即使没有结果也要重置流
self.recognizer.reset(self.stream)
return None
except Exception as e:
self.logger.error(f"音频处理错误: {e}")
return None
def finalize_stream(self) -> Optional[str]:
"""结束识别流并获取最终结果"""
if not self.stream:
return None
try:
# 输入结束标志
self.stream.input_finished()
# 获取最终结果
result = self.recognizer.get_result(self.stream)
if result and result.strip():
if self.result_callback:
self.result_callback(result)
return result
return None
except Exception as e:
self.logger.error(f"结束识别流错误: {e}")
return None
def set_result_callback(self, callback: Callable[[str], None]):
"""设置识别结果回调函数"""
self.result_callback = callback
def set_partial_result_callback(self, callback: Callable[[str], None]):
"""设置部分识别结果回调函数"""
self.partial_result_callback = callback
def reset_stream(self):
"""重置识别流"""
if self.stream and self.recognizer:
self.recognizer.reset(self.stream)
def get_model_info(self) -> dict:
"""获取模型信息"""
if not self.recognizer:
return {}
return {
"sample_rate": self.config.sample_rate,
"feature_dim": self.config.feature_dim,
"num_threads": self.config.num_threads,
"provider": self.config.provider,
"decoding_method": self.config.decoding_method,
"endpoint_detection": self.config.enable_endpoint_detection
}
def cleanup(self):
"""清理资源"""
if self.stream:
self.stream = None
if self.recognizer:
self.recognizer = None
self.logger.info("语音识别器已清理")
class RecognitionResult:
"""识别结果类"""
def __init__(self, text: str, timestamp: float, is_final: bool = True):
self.text = text
self.timestamp = timestamp
self.is_final = is_final
self.confidence = 1.0 # sherpa-onnx暂不提供置信度
def __str__(self):
return f"[{time.strftime('%H:%M:%S', time.localtime(self.timestamp))}] {self.text}"
def to_dict(self):
return {
"text": self.text,
"timestamp": self.timestamp,
"is_final": self.is_final,
"confidence": self.confidence
}
class RecognitionSession:
"""识别会话类"""
def __init__(self):
self.results: List[RecognitionResult] = []
self.start_time = time.time()
self.is_active = False
def add_result(self, text: str, is_final: bool = True):
"""添加识别结果"""
result = RecognitionResult(
text=text,
timestamp=time.time(),
is_final=is_final
)
self.results.append(result)
return result
def get_full_text(self) -> str:
"""获取完整文本"""
return " ".join([r.text for r in self.results if r.is_final])
def get_duration(self) -> float:
"""获取会话持续时间"""
return time.time() - self.start_time
def start(self):
"""开始会话"""
self.is_active = True
self.start_time = time.time()
def stop(self):
"""结束会话"""
self.is_active = False
def clear(self):
"""清空结果"""
self.results.clear()
self.start_time = time.time()
Loading…
Cancel
Save