构建可插话的实时语音代理

:孟买明天的天气怎么样?
助手:好的,让我查一下孟买明天的天——
:算了,给我今天的就行。
助手:今天孟买气温 32°C,湿度较高,预计傍晚有零星小雨。还有什么需要了解的吗?
:我明天的日程安排是什么?
助手:明天有两件事——上午 10 点的站会,以及下午 4 点的季度评审。

上面这段对话中,第二行的打断处理——助手干净利落地中断正在进行的句子,转而回答今天的情况——正是大多数语音代理做不好的地方。要么它们自顾自地说下去,完全覆盖你的声音;要么你每喘口气它就停下来,显得神经质。干净利落的那一种,才是本文要教你构建的。

整个实现基于 Gemini 的 Live API,不到 250 行 Python 代码,全部写在一个文件里。它接入了两个真实可用的工具,并带有一个基于播放队列刷新的打断机制。语音代理之所以花了整整一年才变得好用,问题不在音频管道的搭建上,而在于修复方案必须内嵌在模型 API 内部,而不是靠客户端代码来 hack。下面就是完整的构建过程。

1、为什么打断处理是最难的部分

如果你用过 2025 年底之前构建的语音代理,你一定熟悉两种失败模式。第一种是自顾自说下去。你意识到助手误解了你的问题,开口纠正它,但它完全不理你,继续往下说。

第二种是神经质失败。开发者试图修复第一个问题,于是在本地实现了语音检测。现在,你每喘口气、咳嗽一声,或者发出"嗯……"的沉吟,助手就立刻切断自己的话,以为你要抢话,然后干等着一个你根本没想好的问题。

这两种失败源于同一个架构缺陷:级联管道(cascade pipeline)

多年来,语音代理的构建方式是将三个独立的模型串联起来。一个语音转文本(STT)模型将用户的音频转录为文字。这段文字被发送给LLM生成回复文本。回复文本再被发送给**文本转语音(TTS)**模型合成音频。

这个架构没有给打断信号留下安全的容身之处。STT 模型在本地运行,知道你什么时候开始说话。但 LLM 在远端,正在生成 token,而 TTS 模型已经在朗读这些 token 了。本地的"用户在说话"检测与远程的"模型正在生成 token"状态之间没有共享状态。如果你在本地切断了音频播放,LLM 仍然认为自己成功输出了完整的回复。对话历史因此失去同步,代理丢失上下文。

解决方案需要一个原生拥有双向音频流的模型。

当你使用原生双向模型时,原始音频字节直接流入网络,网络也直接流回原始音频字节。服务器端运行自己的语音活动检测(VAD)——一个判断当前是否有人说话的分类器。由于服务器同时掌控生成和监听,它可以在听到你打断的精确毫秒级时刻停止自己的输出。随后它会通过 WebSocket 下发一个 interrupted 事件。客户端信任这个信号,丢弃已缓冲但尚未播放的音频,对话便无缝继续。

2、端到端:这个文件做了什么

构建这个系统需要协调四个同时运行且互不阻塞的组件——一个麦克风采集循环、一个扬声器播放循环、一个居中读取一端写入另一端的 Gemini Live 会话,以及一个将模型请求映射到实际 Python 函数的工具注册表。

我们用异步队列来管理这一切。麦克风任务将原始字节块推入输入队列。一个网络任务从队列中取出这些块并发送给 API。另一个网络任务监听 API,将收到的音频块推入输出队列。扬声器任务从输出队列中取出并播放声音。

当用户打断时,API 会发送一个特定的控制消息。网络任务看到这个消息后,设置一个打断标志,扬声器任务立即清空其队列。音频瞬间停止。

核心引擎是 Gemini 3.1 Flash Live 模型。它提供原生双向音频,并且打断协议开箱即用。

SDK 是 google-genai——Google 当前统一的 Python 客户端。如果你看到教程还在用旧的 google-generativeai 包,请忽略;那个包在新的 Live API 构建中已被弃用。

LangChain Core 提供工具支持。我们不用手写原始 JSON schema 传给 Gemini API,而是使用 @tool 装饰器,自动从 Python 函数和文档字符串中提取 schema。

音频 I/O 来自 sounddevice,工具则依赖 Open-Meteo 获取天气数据,以及 icalendar 解析本地的 .ics 文件来查询日程。

3、音频 I/O

语音代理中的音频,本质上就是一串代表声波振幅的无尽整数流。要让 API 理解这些整数,我们必须严格按照模型期望的格式来格式化它们。

Gemini Live 期望 16-bit PCM 音频。它期望传入的麦克风音频以 16 kHz(每秒 16,000 个采样点)单声道采样,并以 24 kHz 单声道输出扬声器音频。这些是严格的协议细节。如果你用标准笔记本麦克风采集了 44.1 kHz 的音频却没有告知 API,模型听到的将是你以极端慢速说话的声音,完全无法转录。

下面是音频采集与播放的架构。

import asyncio
import datetime
import os
from pathlib import Path

import httpx
import numpy as np
import sounddevice as sd
from google import genai
from google.genai import types as genai_types
from icalendar import Calendar
from langchain_core.tools import tool
from langchain_core.utils.function_calling import convert_to_openai_function

MODEL = "gemini-3.1-flash-live-preview"
INPUT_SAMPLE_RATE = 16_000     # Gemini Live expects 16 kHz mono PCM input
OUTPUT_SAMPLE_RATE = 24_000    # Gemini Live emits 24 kHz mono PCM output
CHANNELS = 1
CHUNK_SIZE = 1024              # ~64 ms at 16 kHz — small enough for low-latency VAD
CALENDAR_FILE = Path(__file__).parent / "calendar.ics"

async def audio_input_task(mic_queue: asyncio.Queue) -> None:
    """Capture 16 kHz 16-bit PCM mic audio in chunks and forward to mic_queue."""
    loop = asyncio.get_running_loop()

    def callback(indata, frames, time_info, status):
        if status:
            print(f"[mic status] {status}", flush=True)
        loop.call_soon_threadsafe(mic_queue.put_nowait, bytes(indata))

    with sd.RawInputStream(
        samplerate=INPUT_SAMPLE_RATE,
        blocksize=CHUNK_SIZE,
        channels=CHANNELS,
        dtype="int16",
        callback=callback,
    ):
        while True:
            await asyncio.sleep(0.1)

async def audio_output_task(
    speaker_queue: asyncio.Queue,
    interrupt_event: asyncio.Event,
) -> None:
    """Play 24 kHz PCM chunks from speaker_queue; flush remaining audio on interrupt."""
    stream = sd.RawOutputStream(
        samplerate=OUTPUT_SAMPLE_RATE,
        channels=CHANNELS,
        dtype="int16",
    )
    stream.start()
    try:
        while True:
            chunk = await speaker_queue.get()
            if interrupt_event.is_set():
                while not speaker_queue.empty():
                    try:
                        speaker_queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break
                interrupt_event.clear()
                continue
            stream.write(chunk)
    finally:
        stream.stop()
        stream.close()

麦克风回调中的 loop.call_soon_threadsafe 这一行,是开发者最容易搞砸音频管道的地方。sounddevice 库在操作系统音频驱动管理的独立 C 线程中触发音频回调。你无法在 Python 事件循环线程之外安全地与 asyncio.Queue 交互。安全地将字节块跨线程边界推入队列,可以避免随机的内存损坏和无声的音频丢包。

CHUNK_SIZE 设为 1024 帧。在 16 kHz 下,这正好是 64 毫秒的音频。这是延迟的甜蜜点。如果块太大,服务器端的 VAD 对你的声音反应太慢。如果太小,每秒处理数百个微小网络请求的 Python 开销会耗尽单个 CPU 核心,导致卡顿。

audio_output_task 中的 interrupt_event 检查,是接收循环在稍后章节中接入的钩子。当该事件触发时,循环会在清除标志之前清空队列中所有待播放的音频块。这就是打断的物理机制。模型停止发送新音频,我们删除已经收到但尚未播放的音频。

4、连接 Gemini Live

语音代理的提示工程不同于文本代理。如果你给语音代理一个标准的企业系统提示,它会用冗长、多从句、充满要点的句子说话。你必须明确指示它保持对话感、简洁,并自然地确认操作。

SYSTEM_INSTRUCTIONS = """You are a helpful voice assistant.

Style:
- Conversational, friendly, and short — 2 sentences per response unless the user asks for detail.
- Confirm tool actions briefly. Example: "Looking up the weather…" not "I shall now invoke the get_weather function."

Tools:
- get_weather(latitude, longitude) — current conditions and a daily forecast.
- read_calendar(days_ahead) — events from the user's local calendar.

Behaviour:
- If the user interrupts you mid-sentence, stop immediately and listen.
- If a tool returns an error, say so in plain English and offer an alternative.
"""

def build_live_config(tools_config: list[dict]) -> genai_types.LiveConnectConfig:
    """Build the LiveConnectConfig with audio response, server-side VAD, and tools.

    The automatic_activity_detection block is what enables clean mid-sentence
    interruption. When the model is mid-response and the server detects user
    speech, it sends server_content.interrupted=True down the receive stream.
    """
    return genai_types.LiveConnectConfig(
        response_modalities=["AUDIO"],
        system_instruction=SYSTEM_INSTRUCTIONS,
        tools=tools_config,
        realtime_input_config=genai_types.RealtimeInputConfig(
            automatic_activity_detection=genai_types.AutomaticActivityDetection(
                disabled=False,
            ),
        ),
    )

automatic_activity_detection.disabled=False 这一行是整篇文章的核心。默认情况下,Gemini 服务器对传入的 16 kHz 音频流运行连续的概率分类器。当它检测到人类语音越过特定置信度阈值时,会停止自己的生成。

如果你要为嘈杂的工厂车间构建这个系统,服务器端 VAD 可能会误触发机器噪音,那么你会设置 disabled=True。然后你会在客户端运行一个专门的本地 VAD 模型,并向 API 发送明确的 activityStartactivityEnd 控制消息。对于桌面助手来说,服务器端默认值已经非常准确,免去了本地音频分类的管理。

5、工具 Schema 与执行

我们给代理两个工具:一个获取实时天气数据,一个读取本地日历文件。

我们使用标准的 LangChain @tool 装饰器来定义它们。这里唯一与语音相关的考量是文档字符串。当你把工具传给多模态模型时,模型会阅读文档字符串来理解何时调用它。如果你的文档字符串充满了边缘情况说明和复杂的参数描述,模型在快节奏的语音对话中会犹豫或幻觉参数。保持文档字符串的声明性。

@tool
def get_weather(latitude: float, longitude: float) -> dict:
    """Get current weather and today's forecast for a location.

    Args:
        latitude: Latitude (e.g., 19.0760 for Mumbai).
        longitude: Longitude (e.g., 72.8777 for Mumbai).
    """
    try:
        resp = httpx.get(
            "https://api.open-meteo.com/v1/forecast",
            params={
                "latitude": latitude,
                "longitude": longitude,
                "current": "temperature_2m,relative_humidity_2m,wind_speed_10m",
                "daily": "temperature_2m_max,temperature_2m_min,precipitation_probability_max",
                "timezone": "auto",
                "forecast_days": 2,
            },
            timeout=5.0,
        )
        resp.raise_for_status()
        data = resp.json()
        return {
            "current_temp_c": data["current"]["temperature_2m"],
            "current_humidity_pct": data["current"]["relative_humidity_2m"],
            "current_wind_kmh": data["current"]["wind_speed_10m"],
            "today_max_c": data["daily"]["temperature_2m_max"][0],
            "today_min_c": data["daily"]["temperature_2m_min"][0],
            "today_rain_chance_pct": data["daily"]["precipitation_probability_max"][0],
        }
    except (httpx.HTTPError, KeyError) as exc:
        return {"error": f"Could not fetch weather: {exc}"}

@tool
def read_calendar(days_ahead: int = 7) -> list[dict]:
    """Read upcoming events from the user's local calendar.ics file.

    Args:
        days_ahead: How many days into the future to include. Default 7.
    """
    if not CALENDAR_FILE.exists():
        return [{"error": f"No calendar file at {CALENDAR_FILE}"}]

    now = datetime.datetime.now(datetime.timezone.utc)
    window_end = now + datetime.timedelta(days=days_ahead)

    try:
        cal = Calendar.from_ical(CALENDAR_FILE.read_bytes())
    except Exception as exc:
        return [{"error": f"Could not parse calendar: {exc}"}]

    def _as_aware(value):
        if isinstance(value, datetime.datetime):
            return value if value.tzinfo else value.replace(tzinfo=datetime.timezone.utc)
        if isinstance(value, datetime.date):
            return datetime.datetime.combine(value, datetime.time.min, tzinfo=datetime.timezone.utc)
        return None

    events = []
    for component in cal.walk("VEVENT"):
        start = _as_aware(component.get("dtstart").dt)
        if start is None or not (now <= start <= window_end):
            continue
        end_raw = component.get("dtend")
        end = _as_aware(end_raw.dt) if end_raw else None
        events.append({
            "summary": str(component.get("summary", "Untitled")),
            "start": start.isoformat(),
            "end": end.isoformat() if end else None,
        })
    events.sort(key=lambda e: e["start"])
    return events[:10]

TOOLS = [get_weather, read_calendar]
TOOL_MAP = {t.name: t for t in TOOLS}

def gemini_tool_config(tools) -> list[dict]:
    """Convert LangChain @tool functions to Gemini Live function declarations."""
    return [{"function_declarations": [convert_to_openai_function(t) for t in tools]}]

日历工具处理 .ics 文件的一个恼人现实:它们混合了仅日期值(如全天假期)和带时区的高精度日期时间值。_as_aware 辅助函数将所有内容强制转换为 UTC 日期时间,以便安全地筛选落在请求 days_ahead 窗口内的事件。

两个工具以相同的方式处理网络和解析失败——它们返回一个带有 error 键的字典,而不是抛出异常。模型看到这个错误载荷后,理解工具失败了,并可以通过音频流自然地向用户道歉。

gemini_tool_config 函数是 LangChain 与 Gemini 之间的桥梁。由于行业已基本汇聚到 JSON Schema 作为工具定义标准,LangChain 的 convert_to_openai_function 生成的 schema 形状正是 Gemini Live API 所期望的。

6、发送/接收循环与打断处理

文件中所有其他函数都是为了支撑这个多路复用的网络循环。我们需要两个任务:一个从麦克风队列中拉取音频块并推送给 API,另一个监听 API 并对响应进行多路分解。Gemini Live 的响应可能是三种之一:工具调用、打断事件,或合成音频块。我们必须严格按顺序处理它们。

async def send_audio_task(session, mic_queue: asyncio.Queue) -> None:
    """Forward mic chunks from the queue to the Live session."""
    while True:
        chunk = await mic_queue.get()
        await session.send_realtime_input(
            audio=genai_types.Blob(data=chunk, mime_type="audio/pcm;rate=16000")
        )

async def handle_tool_call(session, tool_call) -> None:
    """Run the requested tools and stream results back to the Live session."""
    responses = []
    for fc in tool_call.function_calls or []:
        tool_fn = TOOL_MAP.get(fc.name)
        if tool_fn is None:
            result = {"error": f"Unknown tool: {fc.name}"}
        else:
            try:
                result = tool_fn.invoke(dict(fc.args))
            except Exception as exc:
                result = {"error": str(exc)}
        responses.append({"id": fc.id, "name": fc.name, "response": result})
    await session.send_tool_response(function_responses=responses)

async def receive_task(
    session,
    speaker_queue: asyncio.Queue,
    interrupt_event: asyncio.Event,
) -> None:
    """Demultiplex the three response shapes from Gemini Live."""
    async for response in session.receive():
        if response.tool_call:
            await handle_tool_call(session, response.tool_call)
            continue

        sc = response.server_content
        if sc is None:
            continue

        if sc.interrupted:
            interrupt_event.set()
            while not speaker_queue.empty():
                try:
                    speaker_queue.get_nowait()
                except asyncio.QueueEmpty:
                    break
            print("[interrupted by user]", flush=True)
            continue

        if sc.model_turn:
            for part in sc.model_turn.parts or []:
                if part.inline_data and part.inline_data.data:
                    await speaker_queue.put(part.inline_data.data)

async def main() -> None:
    api_key = os.environ.get("GOOGLE_API_KEY")
    if not api_key:
        raise SystemExit("Set GOOGLE_API_KEY (https://aistudio.google.com).")

    client = genai.Client(api_key=api_key)
    config = build_live_config(gemini_tool_config(TOOLS))

    mic_queue: asyncio.Queue = asyncio.Queue(maxsize=200)
    speaker_queue: asyncio.Queue = asyncio.Queue(maxsize=200)
    interrupt_event = asyncio.Event()

    print("Voice agent ready. Speak. (Ctrl+C to quit.)", flush=True)
    try:
        async with client.aio.live.connect(model=MODEL, config=config) as session:
            await asyncio.gather(
                audio_input_task(mic_queue),
                audio_output_task(speaker_queue, interrupt_event),
                send_audio_task(session, mic_queue),
                receive_task(session, speaker_queue, interrupt_event),
            )
    except KeyboardInterrupt:
        print("\nShutting down.", flush=True)
    except Exception as exc:
        print(f"Session error: {exc}", flush=True)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

receive_task 中分支的顺序很重要。我们先检查工具调用,因为它们代表离散的执行块。其次检查打断。如果服务器检测到语音,它会设置 sc.interrupted = True。当我们看到这个标志时,我们设置 interrupt_event 来通知播放任务,并在接收循环中积极排空 speaker_queue。这确保了内存中等待播放的任何音频块都被立即销毁。

main() 函数将整个生命周期串联起来。我们初始化队列,连接会话,并使用 asyncio.gather 并发运行所有四个任务。

7、运行它

要在自己的机器上运行,你需要正确的依赖版本。创建一个 requirements.txt 文件,并固定这些确切版本。

google-genai>=1.0,<2.0
langchain-core>=0.3,<1.0
sounddevice>=0.5,<1.0
numpy>=2.0,<3.0
httpx>=0.27,<1.0
icalendar>=6.0,<7.0

你还需要一个示例日历文件,以便第二个工具能正常工作。将以下内容保存为 calendar.ics,放在与 Python 脚本相同的目录下。

BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Voice Agent Demo//EN
CALSCALE:GREGORIAN
BEGIN:VEVENT
UID:demo-standup-001@voice-agent
SUMMARY:Daily standup
DTSTART:20260516T100000Z
DTEND:20260516T101500Z
END:VEVENT
BEGIN:VEVENT
UID:demo-quarterly-review@voice-agent
SUMMARY:Quarterly review
DTSTART:20260516T160000Z
DTEND:20260516T173000Z
END:VEVENT
END:VCALENDAR

安装依赖,导出你的 API 密钥,然后运行文件。

pip install -r requirements.txt
export GOOGLE_API_KEY=your_key_from_ai_studio
python voice_agent.py

连接成功后,向它询问孟买的天气。在它朗读气温的时候打断它,询问你的日程安排。你会看到 [interrupted by user] 日志行打印出来,音频会瞬间切断,然后代理会转而朗读你的日程。首次连接通常需要两到三秒来建立 WebSocket,但后续的对话轮次在一秒内就能完成。

8、第 31 分钟会出什么错

上面的构建给了你当前能运行的东西。它是一个快速的本地语音代理,非常适合原型设计和内部工具。但当你将这个架构投入生产环境时,会遇到一系列需要专门工程投入才能解决的故障模式。

最刚性的限制是会话上限。纯音频的 Live 会话默认限制为 15 分钟,底层的 WebSocket 连接大约存活 10 分钟,之后服务器会发送 GoAway 消息并关闭连接。修复方案已内置于 API 中——会话恢复令牌。当你收到 SessionResumptionUpdate 消息时,保存其句柄。重新连接时,将其作为 SessionResumptionConfig.handle 传入,对话便可以在连接中断后继续,无需重放转录文本。令牌在最后一次终止后的两小时内有效。

网络抖动是下一个问题。本地脚本假设你的 WiFi 连接完美稳定。它从 API 拉取块并直接推送到扬声器队列。如果你的网络卡顿 100 毫秒,播放队列就会排空,扬声器饥饿,音频出现爆音。生产环境的语音代理需要抖动缓冲区。你故意将播放延迟 50 到 100 毫秒,以积累一小段音频块储备,以轻微增加感知延迟为代价,吸收轻微的网络丢包。

工具调用延迟在语音交互中有严格的限制。在文本聊天中,如果数据库查询耗时三秒,用户会等待。在语音聊天中,三秒的冷场感觉像掉线了。本文构建中的天气和日历工具在 400 毫秒内返回。如果你接入了一个慢速的内部 API,你必须实现填充词生成。你指示模型立即说"让我为您查一下……",在后台执行工具,等数据到达后再流式传输最终答案。

最后,这个构建假设用户坐在笔记本电脑前。如果你想让这个代理接听真实的电话号码,你不能使用 sounddevice。你需要一个电话传输层——剥离本地音频队列,替换为与 Twilio Media Streams 的 WebSocket 连接,或者通过 LiveKit 或 Daily 等 WebRTC 提供商路由音频。工具执行状态和 Gemini Live 配置保持不变,但 I/O 层完全变为网络绑定。

9、结束语

从级联架构到原生双向模型的转变,改变了我们构建语音体验的方式。多年来,让语音代理感觉自然需要复杂的客户端编排、自定义 VAD 调优,以及激进的状态同步来处理打断。这是一个音频工程问题。

现在,这是一个协议问题。通过将听、想、说整合到单个模型会话中,API 接管了时序控制。我们只需要正确连接队列,并尊重控制信号。


原文链接: Build a Real-Time Voice Agent in 30 Minutes (With Interruption Handling)

汇智网翻译整理,转载请标明出处