构建一个视频问答智能体

智能体AI项目:使用Streamlit、FAISS、BM25和OpenAI构建一个AI驱动的YouTube助手,通过可点击的时间戳回答问题。

构建一个视频问答智能体
微信 ezpoda免费咨询:AI编程 | AI模型微调| AI私有化部署
AI模型价格对比 | AI工具导航 | ONNX模型库 | Tripo 3D | Meshy AI | ElevenLabs | KlingAI | ArtSpace | Phot.AI | InVideo

你正在看一个充满价值的一小时YouTube视频……但你只有5分钟。

那你怎么办?

随机跳过?拖动进度条?放弃?

如果你可以直接问呢:

"总结这个视频" "视频哪里谈到了X?"

然后瞬间跳到有答案的那个确切时刻?

YouTube已经在试验"Ask"这样的功能。 让我们构建自己的YouTube视频聊天助手,了解幕后发生了什么。

使用Streamlit、FAISS、BM25和OpenAI,我们将把任何视频变成一个可搜索、可交互的体验,具有AI驱动的答案和可点击的时间戳。

1、这个项目做什么

YT Chat让你能够:

  • 粘贴YouTube URL
  • 获取并索引视频字幕
  • 对视频提问
  • 接收基于字幕的答案
  • 点击时间戳引用跳转到视频的确切部分

系统支持会话中加载多个视频,并使用混合检索策略来平衡召回率和精确率。

2、架构概览

为了保持系统简单、可扩展且易于理解,每个组件都设计为单一职责。

架构图
"与YouTube视频聊天"端到端架构

从高层来看,应用由以下层组成:

核心组件:

  1. 前端(Streamlit)app.py处理用户界面:视频输入、聊天交互和使用Streamlit渲染带可点击时间戳的响应。
  • 字幕摄取transcript.py使用youtube-transcript-api获取并标准化YouTube字幕,包括语言回退支持。
  • 嵌入 + 向量搜索(FAISS)embedder.py使用OpenAI的text-embedding-small模型将字幕块转换为嵌入,并存储在由FAISS驱动的快速相似度索引中。
  • 关键词搜索(BM25)keyword_index.py使用BM25构建轻量级的基于关键词的检索系统,用于精确的术语匹配。
  • 混合检索(RRF)retrieval_fusion.py使用倒数排名融合结合语义和关键词搜索结果,以提高召回率和精确率。
  • LLM层(OpenAI)chat.py使用OpenAI模型处理查询理解、路由和响应生成。

以下是流程:

流程图

与YouTube视频聊天流程[图片由作者提供]

现在让我们动手吧。

完整的代码仓库可以在这里访问:

3、环境设置

创建并激活虚拟环境。

对于macOS/Linux:

python3 -m venv .venv
source .venv/bin/activate

对于Windows(命令提示符):

python -m venv .venv
.venv\Scripts\activate

requirements.txt安装依赖。

streamlit>=1.35.0
openai>=1.30.0
python-dotenv>=1.0.0
youtube-transcript-api>=0.6.2
faiss-cpu>=1.8.0
numpy>=1.26.0
requests>=2.31.0
rank-bm25>=0.2.2
#terminal
pip install requirements.txt

创建包含你的OPENAI_API_KEY.env文件

我们继续。

4、获取字幕和分块

4.1 提取YouTube视频ID

第一步简单但关键。应用在transcript.py中使用正则匹配器支持多种URL格式:

def extract_video_id(url: str) -> str | None:
    patterns = [
        r"(?:v=|\/)([0-9A-Za-z_-]{11}).*",
        r"(?:youtu\.be\/)([0-9A-Za-z_-]{11})",
        r"(?:embed\/)([0-9A-Za-z_-]{11})",
        r"^([0-9A-Za-z_-]{11})$",
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return None

这个函数处理标准YouTube链接、缩短的youtu.be链接、嵌入式链接和原始ID。

4.2 获取字幕

字幕摄取在transcript.py中使用youtube-transcript-api

实现先尝试英语,然后回退到任何可用语言:

fetched = api.fetch(video_id, languages=['en'])

如果不存在英语字幕,它会发现可用语言并重试。

输出形状标准化为:

[{"text": …, "start": …, "duration": …}, …]

这为我们提供了原始文本和引用所需的时间戳。

4.3 带时间戳的字幕分块

应用不是嵌入每个句子,而是从字幕构建重叠的块。

这在transcript.py中发生:

def chunk_transcript(transcript: list[dict], chunk_size: int = 300, overlap: int = 50) -> list[dict]:
    chunks = []
    words_buffer = []
    word_timestamps = []

    for entry in transcript:
        words = entry['text'].split()
        start = entry['start']
        duration = entry.get('duration', 2.0)
        for i, word in enumerate(words):
            t = start + (duration * i / max(len(words), 1))
            words_buffer.append(word)
            word_timestamps.append(t)

    step = chunk_size - overlap
    i = 0
    while i < len(words_buffer):
        end_idx = min(i + chunk_size, len(words_buffer))
        chunk_words = words_buffer[i:end_idx]
        chunk_times = word_timestamps[i:end_idx]

        chunks.append({
            "chunk_id": chunk_id,
            "text": " ".join(chunk_words),
            "start_time": chunk_times[0],
            "end_time": chunk_times[-1],
        })
        i += step

这种滑动窗口设计保留了跨块边界的上下文,并保持时间戳与原始视频时间对齐。

以下是完整的transcript.py

"""
transcript.py - Fetch and parse YouTube transcripts with timestamps
Compatible with youtube-transcript-api v1.x
"""

from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import (
    NoTranscriptFound, TranscriptsDisabled, VideoUnavailable,
    CouldNotRetrieveTranscript,
)
import re

def extract_video_id(url: str) -> str | None:
    """Extract video ID from various YouTube URL formats."""
    patterns = [
        r"(?:v=|\/)([0-9A-Za-z_-]{11}).*",
        r"(?:youtu\.be\/)([0-9A-Za-z_-]{11})",
        r"(?:embed\/)([0-9A-Za-z_-]{11})",
        r"^([0-9A-Za-z_-]{11})$",
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return None

def fetch_transcript(video_id: str) -> list[dict]:
    """
    Fetch transcript for a YouTube video.
    Returns list of {text, start, duration} dicts.
    Compatible with youtube-transcript-api v1.x (instance-based API).
    """
    api = YouTubeTranscriptApi()

    # Try English first
    try:
        fetched = api.fetch(video_id, languages=['en'])
        return [{"text": s.text, "start": s.start, "duration": s.duration} for s in fetched]
    except TranscriptsDisabled:
        raise ValueError("Transcripts are disabled for this video.")
    except VideoUnavailable:
        raise ValueError("Video is unavailable or private.")
    except (NoTranscriptFound, CouldNotRetrieveTranscript):
        pass  # Will try other languages below
    except Exception:
        pass  # Will try other languages below

    # Fallback: discover all available languages, use the first one
    try:
        transcript_list = api.list(video_id)
        available = [t.language_code for t in transcript_list]
        if not available:
            raise ValueError("No transcripts available for this video.")
        fetched = api.fetch(video_id, languages=available)
        return [{"text": s.text, "start": s.start, "duration": s.duration} for s in fetched]
    except TranscriptsDisabled:
        raise ValueError("Transcripts are disabled for this video.")
    except VideoUnavailable:
        raise ValueError("Video is unavailable or private.")
    except ValueError:
        raise
    except Exception as e:
        raise ValueError(f"Could not fetch transcript: {str(e)}")

def chunk_transcript(transcript: list[dict], chunk_size: int = 300, overlap: int = 50) -> list[dict]:
    """
    Chunk transcript into overlapping windows, preserving timestamps.
    Each chunk: {text, start_time, end_time, chunk_id}
    """
    chunks = []
    words_buffer = []
    word_timestamps = []

    # Flatten transcript into word-level with timestamps
    for entry in transcript:
        words = entry['text'].split()
        start = entry['start']
        duration = entry.get('duration', 2.0)
        for i, word in enumerate(words):
            t = start + (duration * i / max(len(words), 1))
            words_buffer.append(word)
            word_timestamps.append(t)

    # Slide window
    step = chunk_size - overlap
    chunk_id = 0
    i = 0
    while i < len(words_buffer):
        end_idx = min(i + chunk_size, len(words_buffer))
        chunk_words = words_buffer[i:end_idx]
        chunk_times = word_timestamps[i:end_idx]

        chunk_text = " ".join(chunk_words)
        start_time = chunk_times[0]
        end_time = chunk_times[-1]

        chunks.append({
            "chunk_id": chunk_id,
            "text": chunk_text,
            "start_time": start_time,
            "end_time": end_time,
        })
        chunk_id += 1
        i += step
        if end_idx == len(words_buffer):
            break

    return chunks

def format_timestamp(seconds: float) -> str:
    """Convert seconds to MM:SS or HH:MM:SS string."""
    seconds = int(seconds)
    h = seconds // 3600
    m = (seconds % 3600) // 60
    s = seconds % 60
    if h > 0:
        return f"{h}:{m:02d}:{s:02d}"
    return f"{m}:{s:02d}"

def make_youtube_link(video_id: str, seconds: float) -> str:
    """Create a deep-link YouTube URL at a specific timestamp."""
    t = int(seconds)
    return f"https://www.youtube.com/watch?v={video_id}&t={t}s"

我们还有metadata.py,用于提取视频的缩略图和标题。

"""
metadata.py - Fetch YouTube video metadata (title, thumbnail, duration, channel)
"""

import requests
import json
import re

def fetch_metadata(video_id: str) -> dict:
    """
    Fetch video metadata using YouTube oEmbed API + noembed fallback.
    Returns dict with title, author, thumbnail_url, duration_str.
    """
    # Try YouTube oEmbed (no API key needed)
    oembed_url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json"
    try:
        resp = requests.get(oembed_url, timeout=8)
        if resp.status_code == 200:
            data = resp.json()
            thumbnail = f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg"
            return {
                "title": data.get("title", "Unknown Title"),
                "author": data.get("author_name", "Unknown Channel"),
                "thumbnail_url": thumbnail,
                "video_id": video_id,
                "url": f"https://www.youtube.com/watch?v={video_id}",
            }
    except Exception:
        pass

    # Fallback: minimal info
    return {
        "title": f"Video ({video_id})",
        "author": "Unknown",
        "thumbnail_url": f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg",
        "video_id": video_id,
        "url": f"https://www.youtube.com/watch?v={video_id}",
    }

5、嵌入字幕

5.1 嵌入和FAISS索引

语义检索骨干在embedder.py中。

关键点:

  • 使用OpenAI text-embedding-3-small
  • 批量嵌入字幕块
  • 对嵌入进行归一化用于余弦相似度
  • 将它们存储在FAISS IndexFlatIP

嵌入管道看起来像这样:

embeddings = get_embeddings(texts, client)
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
embeddings = embeddings / (norms + 1e-10)
index = faiss.IndexFlatIP(dim)
index.add(embeddings)

向量搜索实现为:

def search_index(query, index, chunks, client, top_k=5):
    response = client.embeddings.create(model=EMBEDDING_MODEL, input=[query])
    q_emb = np.array([response.data[0].embedding], dtype=np.float32)
    q_emb = q_emb / (np.linalg.norm(q_emb) + 1e-10)
    scores, indices = index.search(q_emb, top_k)

这提供了一种快速找到与问题在语义上最相关的字幕块的方法。

5.2 使用BM25构建关键词索引

语义搜索很强大,但关键词搜索对于精确的短语匹配仍然有帮助。

项目在keyword_index.py中添加了一个本地BM25索引:

self.tokenized_corpus = [self._tokenize(text) for text in self.corpus]
self.bm25 = BM25Okapi(self.tokenized_corpus)

查询搜索就是:

query_tokens = self._tokenize(query)
scores = self.bm25.get_scores(query_tokens)

然后根据BM25排名返回顶级块。

以下是完整的embedder.py

"""
embedder.py - Embed transcript chunks and build a FAISS index
"""

import numpy as np
import faiss
import pickle
import os
from openai import OpenAI

EMBEDDING_MODEL = "text-embedding-3-small"
EMBED_BATCH_SIZE = 64

def get_embeddings(texts: list[str], client: OpenAI) -> np.ndarray:
    """Embed a list of texts using OpenAI embeddings in batches."""
    all_embeddings = []
    for i in range(0, len(texts), EMBED_BATCH_SIZE):
        batch = texts[i:i + EMBED_BATCH_SIZE]
        response = client.embeddings.create(model=EMBEDDING_MODEL, input=batch)
        batch_embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(batch_embeddings)
    return np.array(all_embeddings, dtype=np.float32)

def build_index(chunks: list[dict], client: OpenAI) -> tuple[faiss.Index, list[dict]]:
    """
    Build a FAISS flat L2 index from transcript chunks.
    Returns (index, chunks) — chunks are stored as metadata alongside index.
    """
    texts = [c["text"] for c in chunks]
    embeddings = get_embeddings(texts, client)

    # Normalize for cosine similarity
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    embeddings = embeddings / (norms + 1e-10)

    dim = embeddings.shape[1]
    index = faiss.IndexFlatIP(dim)  # Inner product = cosine after normalization
    index.add(embeddings)

    return index, chunks

def search_index(
    query: str,
    index: faiss.Index,
    chunks: list[dict],
    client: OpenAI,
    top_k: int = 5,
) -> list[dict]:
    """
    Search the FAISS index for chunks most relevant to query.
    Returns top_k chunks with similarity scores.
    """
    response = client.embeddings.create(model=EMBEDDING_MODEL, input=[query])
    q_emb = np.array([response.data[0].embedding], dtype=np.float32)
    # Normalize
    q_emb = q_emb / (np.linalg.norm(q_emb) + 1e-10)

    scores, indices = index.search(q_emb, top_k)

    results = []
    for score, idx in zip(scores[0], indices[0]):
        if idx < len(chunks):
            chunk = chunks[idx].copy()
            chunk["score"] = float(score)
            results.append(chunk)
    return results

以下是完整的keyword_index.py文件:

"""
keyword_index.py — BM25 keyword indexing and retrieval for transcript chunks.

Provides efficient keyword-based search as complement to semantic vector search.
"""

from rank_bm25 import BM25Okapi
from typing import List, Dict

class KeywordIndex:
    """Build and search a BM25 keyword index from transcript chunks."""

    def __init__(self, chunks: List[Dict]):
        """
        Initialize BM25 index from chunks.

        Args:
            chunks: List of chunk dicts with 'text', 'start_time', 'end_time' keys
        """
        self.chunks = chunks
        self.corpus = [chunk["text"] for chunk in chunks]
        # Tokenize: split on whitespace, lowercase, simple punctuation removal
        self.tokenized_corpus = [self._tokenize(text) for text in self.corpus]
        self.bm25 = BM25Okapi(self.tokenized_corpus)

    @staticmethod
    def _tokenize(text: str) -> List[str]:
        """Simple tokenization: lowercase, split on whitespace."""
        import re
        # Convert to lowercase, split on whitespace, remove punctuation
        tokens = re.findall(r'\w+', text.lower())
        return tokens

    def search(self, query: str, top_k: int = 5) -> List[Dict]:
        """
        Search BM25 index for chunks matching query.

        Args:
            query: Search query string
            top_k: Number of top results to return

        Returns:
            List of chunks with 'bm25_score' added; sorted by score descending
        """
        query_tokens = self._tokenize(query)

        # BM25 returns scores for each document in corpus
        scores = self.bm25.get_scores(query_tokens)

        # Sort by score descending, get top_k indices
        top_indices = sorted(
            range(len(scores)),
            key=lambda i: scores[i],
            reverse=True
        )[:top_k]

        results = []
        for idx in top_indices:
            if idx < len(self.chunks):
                chunk = self.chunks[idx].copy()
                chunk["bm25_score"] = float(scores[idx])
                results.append(chunk)

        return results

def build_keyword_index(chunks: List[Dict]) -> KeywordIndex:
    """
    Convenience function to build a BM25 index from chunks.

    Args:
        chunks: List of chunk dicts

    Returns:
        KeywordIndex instance ready for search
    """
    return KeywordIndex(chunks)

6、使用RRF融合进行混合检索

为了结合语义搜索和关键词搜索的优势,应用在retrieval_fusion.py中使用倒数排名融合(RRF)。

工作原理:

  • 从BM25和向量搜索中检索顶级候选结果
  • 使用RRF按排名对每个候选评分
  • 合并重复项并按融合分数排序
  • 返回最终的top-k列表

公式为:

score = sum(1 / (rank + k))

这里k取值为60。

这保留了在任何一种检索技术中表现强劲的结果,同时避免过度依赖单一方法。

步骤4:查询路由——全局 vs 特定

助手将每个用户问题分类为:

  • global:需要理解整个视频
  • rag:需要特定的、局部的字幕证据

chat.py中,一个轻量级的路由器提示将问题发送给GPT:

response = client.chat.completions.create(
    model=MODEL,
    messages=[
        {"role": "system", "content": ROUTER_PROMPT},
        {"role": "user", "content": user_message},
    ],
)

如果答案是global,应用从完整字幕构建上下文。如果是rag,它从检索到的块构建较小的上下文。

以下是完整的retrieval_fusion.py

"""
retrieval_fusion.py — Reciprocal Rank Fusion (RRF) for hybrid search.

Merges keyword (BM25) and semantic (vector) results into a single ranked list.
Uses reciprocal rank fusion formula: score = sum(1 / (rank + k))
"""

from typing import List, Dict, Tuple

def reciprocal_rank_fusion(
    keyword_results: List[Dict],
    vector_results: List[Dict],
    k: int = 60,
) -> List[Dict]:
    """
    Merge and rank results from keyword and vector searches using RRF.

    RRF formula for each result:
        score = sum(1 / (rank_keyword + k) + 1 / (rank_vector + k))

    Where rank is 0-indexed position in each result list.
    Results appearing in both lists get scores from both; results in one list only
    contribute their single score.

    Args:
        keyword_results: List of chunks from BM25 search (with 'bm25_score' field)
        vector_results: List of chunks from vector search (with 'score' field for cosine similarity)
        k: RRF parameter; higher k diminishes effect of rank position
           Default 60 is standard; tune based on result quality

    Returns:
        List of unique chunks sorted by fused RRF score (descending)
        Each chunk has 'rrf_score', 'keyword_rank', 'vector_rank' fields added
    """

    # Build ranking maps: chunk_id -> (rank, original_chunk_dict)
    # Using (start_time, end_time) as unique chunk ID
    keyword_ranks: Dict[Tuple, Tuple[int, Dict]] = {}
    vector_ranks: Dict[Tuple, Tuple[int, Dict]] = {}

    for rank, chunk in enumerate(keyword_results):
        chunk_id = (chunk.get("start_time"), chunk.get("end_time"))
        keyword_ranks[chunk_id] = (rank, chunk)

    for rank, chunk in enumerate(vector_results):
        chunk_id = (chunk.get("start_time"), chunk.get("end_time"))
        vector_ranks[chunk_id] = (rank, chunk)

    # Compute RRF scores for all unique chunks
    rrf_scores: Dict[Tuple, float] = {}
    all_chunks: Dict[Tuple, Dict] = {}
    chunk_metadata: Dict[Tuple, Dict] = {}  # Track rank info

    # Process keyword results
    for chunk_id, (rank, chunk) in keyword_ranks.items():
        rrf_scores[chunk_id] = 1.0 / (rank + k)
        all_chunks[chunk_id] = chunk
        chunk_metadata[chunk_id] = {"keyword_rank": rank, "vector_rank": None}

    # Process vector results
    for chunk_id, (rank, chunk) in vector_ranks.items():
        vector_contribution = 1.0 / (rank + k)
        if chunk_id in rrf_scores:
            rrf_scores[chunk_id] += vector_contribution
            chunk_metadata[chunk_id]["vector_rank"] = rank
        else:
            rrf_scores[chunk_id] = vector_contribution
            all_chunks[chunk_id] = chunk
            chunk_metadata[chunk_id] = {"keyword_rank": None, "vector_rank": rank}

    # Sort by RRF score descending
    sorted_chunks = sorted(
        all_chunks.items(),
        key=lambda item: rrf_scores[item[0]],
        reverse=True
    )

    # Build result list with metadata
    results = []
    for chunk_id, chunk in sorted_chunks:
        result_chunk = chunk.copy()
        result_chunk["rrf_score"] = rrf_scores[chunk_id]
        result_chunk["keyword_rank"] = chunk_metadata[chunk_id]["keyword_rank"]
        result_chunk["vector_rank"] = chunk_metadata[chunk_id]["vector_rank"]
        results.append(result_chunk)

    return results

def fuse_and_get_top_k(
    keyword_results: List[Dict],
    vector_results: List[Dict],
    top_k: int = 5,
    rrf_k: int = 60,
) -> List[Dict]:
    """
    Convenience function: fuse results and return top_k.

    Args:
        keyword_results: BM25 search results
        vector_results: Vector search results
        top_k: Number of results to return from fused list
        rrf_k: RRF parameter

    Returns:
        Top k chunks from fused ranking
    """
    fused = reciprocal_rank_fusion(keyword_results, vector_results, k=rrf_k)
    return fused[:top_k]

7、构建提示词

对于global问题,系统提示被设计为:

  • 使用完整字幕
  • 产生结构化答案
  • 在Markdown中内联引用时间戳

对于rag问题,提示更严格:

  • 仅从提供的摘录中回答
  • 内联引用1-3个时间戳
  • 当答案在视频中找不到时明确说明

chat.py中的时间戳要求示例:

[MM:SS](https://www.youtube.com/watch?v={video_id}&t=Xs)

这确保每个链接都一致且可点击。

我们使用OpenAI API的gpt-4o-mini。

以下是完整的chat.py

"""
chat.py — LLM-routed chat with inline timestamp citations.

Router:  one fast GPT call → "global" | "rag"
Global:  full transcript passed as context (up to 80k tokens)
RAG:     hybrid retrieval (BM25 keyword + vector semantic) with RRF fusion,
         LLM cites 1-2 timestamps inline in answer
"""

from openai import OpenAI
from embedder import search_index
from keyword_index import build_keyword_index
from retrieval_fusion import fuse_and_get_top_k
from transcript import format_timestamp, make_youtube_link
import faiss
import json

MODEL = "gpt-4o-mini"
MAX_FULL_TRANSCRIPT_WORDS = 60_000

# ── System prompts ─────────────────────────────────────────────────────────────

ROUTER_PROMPT = """You are a query classifier for a YouTube video Q&A assistant.

Classify the user's question as one of two types:

"global"  — The question requires understanding the ENTIRE video.
            Examples: summarize, overview, main topics, key takeaways, 
            chapters, structure, what is this video about, full recap.

"rag"     — The question is about a SPECIFIC fact, moment, person, concept, 
            or timestamp in the video. Examples: when did X happen, 
            what did the speaker say about Y, explain concept Z.

Reply with ONLY a JSON object: {"route": "global"} or {"route": "rag"}
No explanation. No other text."""

GLOBAL_SYSTEM_PROMPT = """You are an intelligent video assistant. You have been given the COMPLETE transcript of a YouTube video with timestamps.

Instructions:
- Answer the user's question using the full transcript comprehensively.
- For summaries: cover ALL major sections, not just the beginning.
- For "main sections/topics": identify distinct topic shifts and list each with its start timestamp.
- Cite timestamps inline using this EXACT markdown format: [MM:SS](https://www.youtube.com/watch?v={video_id}&t=Xs)
  where X is the timestamp in seconds. Always include the 's' suffix after the number. Example: &t=315s not &t=315
- Be well-structured — use numbered lists or clear sections.
- Only cite timestamps that are genuinely relevant to that point.
"""

SPECIFIC_SYSTEM_PROMPT = """You are an intelligent video assistant. You have been given relevant excerpts from a YouTube video transcript.

Instructions:
- Answer the question based ONLY on the provided transcript excerpts.
- Cite 1 to 3 timestamps INLINE in your answer using this EXACT markdown format:
  [MM:SS](https://www.youtube.com/watch?v={video_id}&t=Xs)
  where X is the timestamp in seconds. Always include the 's' suffix. Example: &t=315s not &t=315
- Only cite a timestamp when it directly supports the specific sentence you are writing.
- Do NOT list all timestamps at the end — weave them naturally into the answer.
- If the answer spans multiple parts of the video, show each part as a numbered point with its own inline timestamp.
- If the context does not contain the answer, say: "I couldn't find information about that in this video."
- Never make up information not present in the provided context.
"""

# ── LLM Router ────────────────────────────────────────────────────────────────

def classify_query(user_message: str, client: OpenAI) -> str:
    """
    Ask GPT-4o-mini to classify the query as 'global' or 'rag'.
    Falls back to 'rag' on any error.
    """
    try:
        response = client.chat.completions.create(
            model=MODEL,
            messages=[
                {"role": "system", "content": ROUTER_PROMPT},
                {"role": "user", "content": user_message},
            ],
            temperature=0,
            max_tokens=20,
        )
        raw = response.choices[0].message.content.strip()
        parsed = json.loads(raw)
        route = parsed.get("route", "rag")
        return route if route in ("global", "rag") else "rag"
    except Exception:
        return "rag"  # safe default

# ── Context builders ──────────────────────────────────────────────────────────

def build_full_transcript_context(all_chunks: list[dict]) -> str:
    """Concatenate ALL chunks sorted by time, capped at MAX_FULL_TRANSCRIPT_WORDS."""
    sorted_chunks = sorted(all_chunks, key=lambda c: c["start_time"])
    parts = []
    words_so_far = 0
    for chunk in sorted_chunks:
        chunk_words = len(chunk["text"].split())
        if words_so_far + chunk_words > MAX_FULL_TRANSCRIPT_WORDS:
            parts.append("[... transcript truncated for length ...]")
            break
        ts = format_timestamp(chunk["start_time"])
        parts.append(f"[{ts}] {chunk['text']}")
        words_so_far += chunk_words
    return "\n".join(parts)

def build_rag_context(chunks: list[dict]) -> str:
    """Format retrieved chunks into a timestamped context block."""
    parts = []
    for chunk in chunks:
        ts = format_timestamp(chunk["start_time"])
        end_ts = format_timestamp(chunk["end_time"])
        parts.append(f"[{ts} - {end_ts}]\n{chunk['text']}")
    return "\n\n---\n\n".join(parts)

# ── Source extractor (parses inline links from LLM reply) ────────────────────

def extract_sources_from_reply(reply: str, video_id: str) -> list[dict]:
    """
    Parse timestamp markdown links that the LLM wrote inline.
    Matches patterns like [4:32](https://...&t=272s)
    Returns deduplicated list of {timestamp, seconds, link}.
    """
    import re
    # Match [MM:SS] or [H:MM:SS] followed by a YouTube URL with &t=Xs
    # s suffix is optional — LLM sometimes writes &t=315 not &t=315s
    pattern = r'\[([\d]{1,2}:\d{2}(?::\d{2})?)\]\((https://www\.youtube\.com/watch\?v=[\w-]+&t=(\d+)s?)\)'
    matches = re.findall(pattern, reply)
    seen = set()
    sources = []
    for ts_label, url, seconds_str in matches:
        seconds = int(seconds_str)
        if seconds not in seen:
            seen.add(seconds)
            sources.append({
                "timestamp": ts_label,
                "seconds": float(seconds),
                "link": url,
            })
    return sources

# ── Main chat function ────────────────────────────────────────────────────────

def chat_with_video(
    user_message: str,
    conversation_history: list[dict],
    index: faiss.Index,
    chunks: list[dict],
    video_id: str,
    client: OpenAI,
    top_k: int = 5,
    keyword_index=None,
) -> tuple[str, list[dict]]:
    """
    1. Classify query → global | rag
    2. Build context accordingly
    3. For 'rag': use hybrid retrieval (BM25 + semantic with RRF fusion)
    4. Call GPT-4o-mini with inline-timestamp instructions
    5. Parse timestamps from reply for UI chips

    Args:
        user_message: User's query
        conversation_history: Previous messages in conversation
        index: FAISS vector index
        chunks: All transcript chunks
        video_id: YouTube video ID
        client: OpenAI client
        top_k: Number of results to return after fusion
        keyword_index: KeywordIndex instance for BM25 search (optional)

    Returns:
        Tuple of (reply_text, sources)
    """

    # ── Step 1: Route ──────────────────────────────────────────────────────
    route = classify_query(user_message, client)

    # ── Step 2: Build context ──────────────────────────────────────────────
    if route == "global":
        context = build_full_transcript_context(chunks)
        system_prompt = GLOBAL_SYSTEM_PROMPT.replace("{video_id}", video_id)
        context_label = "FULL VIDEO TRANSCRIPT (with timestamps):"
        max_tokens = 1800
    else:
        # ── Hybrid retrieval: BM25 keyword + semantic vector with RRF fusion ──
        # Retrieve top 10 from each method, then fuse to top_k
        vector_results = search_index(user_message, index, chunks, client, top_k=10)

        if keyword_index is not None:
            keyword_results = keyword_index.search(user_message, top_k=10)
            # Fuse using Reciprocal Rank Fusion
            retrieved = fuse_and_get_top_k(
                keyword_results,
                vector_results,
                top_k=top_k,
                rrf_k=60
            )
        else:
            # Fallback: use vector search only if keyword index not available
            retrieved = vector_results[:top_k]

        context = build_rag_context(retrieved)
        system_prompt = SPECIFIC_SYSTEM_PROMPT.replace("{video_id}", video_id)
        context_label = "RELEVANT TRANSCRIPT EXCERPTS (hybrid keyword + semantic search):"
        max_tokens = 900

    # ── Step 3: Call LLM ───────────────────────────────────────────────────
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "system", "content": f"{context_label}\n\n{context}"},
    ]
    messages.extend(conversation_history)
    messages.append({"role": "user", "content": user_message})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        max_tokens=max_tokens,
    )
    reply = response.choices[0].message.content

    # ── Step 4: Extract inline timestamp links as UI chips ─────────────────
    sources = extract_sources_from_reply(reply, video_id)

    return reply, sources

8、提取来源时间戳

在助手回复后,应用解析内联的markdown时间戳链接,以便渲染可点击的按钮。

这个逻辑在extract_sources_from_reply()中:

pattern = r'\[([\d]{1,2}:\d{2}(?::\d{2})?)\]\((https://www\.youtube\.com/watch\?v=[\w-]+&t=(\d+)s?)\)'

应用将每个引用的时间戳变成一个可跳转嵌入式YouTube播放器的UI芯片。

9、Streamlit UI和会话状态

UI完全在app.py中实现。

关键元素:

  • 顶部栏和URL输入
  • 加载进度条
  • 嵌入式YouTube iframe
  • 视频元数据面板
  • 聊天消息气泡
  • 建议的起始问题
  • 时间戳跳转按钮
  • 清除聊天按钮

应用在 st.session_state 中保持以下状态:

  • 已加载的视频
  • 活跃的视频ID
  • 对话历史
  • 待处理的提问文本
  • 处理状态

这允许应用在一个浏览器会话中维护多个视频和持续的聊天历史。

以下是项目中使用的完整style.css

@import url('https://fonts.googleapis.com/css2?family=Google+Sans:wght@400;500;600&family=Roboto:wght@300;400;500&display=swap');

/* Reset & base */
html, body, [class*="css"] {
    font-family: 'Roboto', sans-serif;
    margin: 0; padding: 0;
}

.stApp {
    background: #0f0f0f;
    color: #f1f1f1;
}

/* Hide streamlit chrome */
#MainMenu, footer, header { visibility: hidden; }
[data-testid="stSidebar"] { display: none; }
.block-container {
    padding: 0 !important;
    max-width: 100% !important;
}

/* ── TOP NAV BAR ── */
.topbar {
    display: flex;
    align-items: center;
    justify-content: space-between;
    background: #0f0f0f;
    border-bottom: 1px solid #272727;
    padding: 10px 20px;
    position: sticky;
    top: 0;
    z-index: 100;
}
.topbar-left {
    display: flex;
    align-items: center;
    gap: 12px;
}
.yt-logo {
    font-size: 1.3rem;
    font-weight: 700;
    color: #fff;
    letter-spacing: -0.5px;
}
.yt-logo span { color: #ff0000; }
.url-input-wrap {
    flex: 1;
    max-width: 600px;
    margin: 0 24px;
}

/* ── MAIN TWO-PANEL LAYOUT ── */
.main-panels {
    display: flex;
    height: calc(100vh - 57px);
    overflow: hidden;
}

/* Left: video panel */
.video-panel {
    flex: 1;
    background: #000;
    display: flex;
    flex-direction: column;
    overflow: hidden;
}
.video-embed-wrap {
    position: relative;
    width: 100%;
    padding-top: 56.25%; /* 16:9 */
    background: #000;
    flex-shrink: 0;
}
.video-embed-wrap iframe {
    position: absolute;
    top: 0; left: 0;
    width: 100%; height: 100%;
    border: none;
}
.video-info {
    padding: 16px 20px;
    border-top: 1px solid #272727;
    background: #0f0f0f;
    flex-shrink: 0;
}
.video-title {
    font-family: 'Roboto', sans-serif;
    font-size: 1.1rem;
    font-weight: 500;
    color: #f1f1f1;
    margin: 0 0 4px 0;
    line-height: 1.4;
}
.video-channel {
    font-size: 0.82rem;
    color: #aaa;
    margin: 0;
}

/* Right: chat panel */
.chat-panel {
    width: 400px;
    min-width: 340px;
    max-width: 420px;
    background: #212121;
    border-left: 1px solid #272727;
    display: flex;
    flex-direction: column;
    overflow: hidden;
    height: 100%;
}

.chat-header {
    padding: 14px 18px 12px;
    border-bottom: 1px solid #333;
    flex-shrink: 0;
    background: #212121;
}
.chat-header-top {
    display: flex;
    align-items: center;
    justify-content: space-between;
    margin-bottom: 2px;
}
.chat-title {
    font-family: 'Roboto', sans-serif;
    font-size: 1rem;
    font-weight: 500;
    color: #f1f1f1;
    margin: 0;
}
.gemini-star {
    font-size: 1.1rem;
    margin-right: 6px;
}
.chat-subtitle {
    font-size: 0.75rem;
    color: #aaa;
    margin-top: 2px;
}

/* Suggested questions */
.suggestions {
    padding: 14px 16px 8px;
    border-bottom: 1px solid #2d2d2d;
    flex-shrink: 0;
}
.suggestions-label {
    font-size: 0.78rem;
    color: #aaa;
    margin-bottom: 8px;
}
.suggestion-chips {
    display: flex;
    flex-direction: column;
    gap: 6px;
}
.suggestion-chip {
    background: transparent;
    border: 1px solid #3d3d3d;
    border-radius: 18px;
    padding: 7px 14px;
    font-size: 0.8rem;
    color: #c8c8c8;
    cursor: pointer;
    text-align: right;
    width: fit-content;
    align-self: flex-end;
    transition: background 0.15s, border-color 0.15s;
    line-height: 1.3;
}
.suggestion-chip:hover {
    background: #2d2d2d;
    border-color: #555;
    color: #f1f1f1;
}

/* Chat messages area */
.chat-messages {
    flex: 1;
    overflow-y: auto;
    padding: 16px;
    display: flex;
    flex-direction: column;
    gap: 14px;
    scrollbar-width: thin;
    scrollbar-color: #3d3d3d #212121;
}
/* Thinking dots */
@keyframes thinking-pulse {
    0%, 80%, 100% { opacity: 0.2; transform: scale(0.8); }
    40% { opacity: 1; transform: scale(1.1); }
}
.thinking-dot {
    display: inline-block;
    width: 7px; height: 7px;
    border-radius: 50%;
    background: #666;
    animation: thinking-pulse 1.2s ease-in-out infinite;
}

/* Timestamp dropdown chip — pure CSS, no JS */
.ts-dropdown {
    position: relative;
    display: inline-block;
    vertical-align: middle;
    margin: 0 2px;
}
.ts-chip {
    display: inline-flex;
    align-items: center;
    gap: 3px;
    background: linear-gradient(180deg, #1c2a3a 0%, #142233 100%);
    border: 1px solid #2a3f5a;
    border-radius: 5px;
    padding: 1px 8px;
    font-size: 0.78rem;
    font-family: 'Roboto Mono', monospace;
    color: #8ab4f8;
    cursor: pointer;
    user-select: none;
    white-space: nowrap;
    transition: background 0.12s;
    box-shadow: 0 1px 4px rgba(0,0,0,0.35);
}
.ts-chip:hover { background: #253549; color: #b0ccff; }
.ts-menu {
    display: none;
    position: absolute;
    bottom: 100%;
    margin-bottom: 6px;
    left: 0;
    background: #141a24;
    border: 1px solid #2b3a52;
    border-radius: 8px;
    min-width: 210px;
    z-index: 9999;
    overflow: hidden;
    box-shadow: 0 10px 24px rgba(0,0,0,0.55);
}
.ts-dropdown:hover .ts-menu,
.ts-dropdown:focus-within .ts-menu {
    display: block;
}
.ts-menu::after {
    content: "";
    position: absolute;
    bottom: -6px;
    left: 0;
    width: 100%;
    height: 6px;
}
.ts-option {
    display: block;
    padding: 9px 14px;
    font-size: 0.82rem;
    color: #cfd8ea;
    text-decoration: none;
    cursor: pointer;
    transition: background 0.12s;
    white-space: nowrap;
}
.ts-option:hover { background: #223248; color: #fff; }
.ts-option + .ts-option { border-top: 1px solid #2d3a50; }
.chat-messages::-webkit-scrollbar { width: 4px; }
.chat-messages::-webkit-scrollbar-track { background: #212121; }
.chat-messages::-webkit-scrollbar-thumb { background: #3d3d3d; border-radius: 2px; }

/* Message bubbles */
.msg-user {
    align-self: flex-end;
    background: #2d2d2d;
    border-radius: 18px 18px 4px 18px;
    padding: 10px 14px;
    max-width: 85%;
    font-size: 0.87rem;
    color: #f1f1f1;
    line-height: 1.5;
    word-wrap: break-word;
}
.msg-ai-wrap {
    align-self: flex-start;
    max-width: 95%;
    display: flex;
    flex-direction: column;
    gap: 6px;
}
.msg-ai-label {
    font-size: 0.72rem;
    color: #888;
    display: flex;
    align-items: center;
    gap: 4px;
    margin-bottom: 2px;
}
.msg-ai {
    background: transparent;
    font-size: 0.87rem;
    color: #e0e0e0;
    line-height: 1.6;
    word-wrap: break-word;
}
.msg-ai a {
    color: #8ab4f8;
    text-decoration: none;
}
.msg-ai a:hover { text-decoration: underline; }

/* Timestamp source chips */
.source-chips {
    display: flex;
    flex-wrap: wrap;
    gap: 5px;
    margin-top: 4px;
}
.inline-ts {
    display: inline-flex;
    align-items: center;
    gap: 3px;
    background: #1e2a3a;
    border: 1px solid #2a3f5a;
    border-radius: 5px;
    padding: 1px 7px;
    font-size: 0.78rem;
    font-family: 'Roboto Mono', monospace;
    color: #8ab4f8;
    cursor: pointer;
    transition: background 0.12s, transform 0.1s;
    user-select: none;
    white-space: nowrap;
    vertical-align: middle;
    margin: 0 2px;
}
.inline-ts:hover { background: #253549; color: #b0ccff; transform: translateY(-1px); }
.inline-ts:active { transform: translateY(0); background: #2a3f5a; }

/* Chat input area */
.chat-input-area {
    padding: 10px 14px 8px;
    border-top: 1px solid #2d2d2d;
    background: #212121;
    flex-shrink: 0;
}
.chat-disclaimer {
    text-align: center;
    font-size: 0.67rem;
    color: #666;
    padding: 4px 0 0;
}

/* Streamlit input overrides */
.stTextInput > div > div > input {
    background: #2d2d2d !important;
    border: 1px solid #3d3d3d !important;
    border-radius: 22px !important;
    color: #f1f1f1 !important;
    font-size: 0.87rem !important;
    padding: 10px 18px !important;
    font-family: 'Roboto', sans-serif !important;
}
.stTextInput > div > div > input:focus {
    border-color: #555 !important;
    box-shadow: none !important;
    outline: none !important;
}
.stTextInput > div > div > input::placeholder { color: #888 !important; }

.stButton > button {
    background: transparent;
    border: none;
    color: #8ab4f8;
    font-size: 0.85rem;
    font-weight: 500;
    padding: 6px 12px;
    border-radius: 4px;
    cursor: pointer;
    font-family: 'Roboto', sans-serif;
    transition: background 0.15s;
}
.stButton > button:hover { background: #2d2d2d; color: #c0d4ff; }

/* Empty / loading states */
.empty-state {
    flex: 1;
    display: flex;
    flex-direction: column;
    align-items: center;
    justify-content: center;
    text-align: center;
    padding: 32px 24px;
    color: #888;
}
.empty-icon { font-size: 2rem; margin-bottom: 10px; }
.empty-text { font-size: 0.85rem; line-height: 1.6; }

/* Top URL bar inputs */
div[data-testid="stHorizontalBlock"] .stTextInput > div > div > input {
    background: #121212 !important;
    border: 1px solid #303030 !important;
    border-radius: 22px !important;
    color: #f1f1f1 !important;
    font-size: 0.88rem !important;
    padding: 9px 16px !important;
}

/* Spinner */
.stSpinner > div { border-top-color: #aaa !important; }

/* Chips (suggestion buttons) styled via st.button with key trick */
div[data-suggestion="true"] .stButton > button {
    background: transparent !important;
    border: 1px solid #3d3d3d !important;
    border-radius: 18px !important;
    color: #c8c8c8 !important;
    font-size: 0.8rem !important;
    padding: 7px 14px !important;
    width: 100% !important;
    text-align: right !important;
    justify-content: flex-end !important;
}

以下是完整的app.py

"""
app.py - YouTube AI Chat — Streamlit frontend
Two-panel layout: embedded video left, chat right
"""

import streamlit as st
import streamlit.components.v1 as components
from openai import OpenAI
import time
import os
from dotenv import load_dotenv

load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")

from transcript import extract_video_id, fetch_transcript, chunk_transcript, format_timestamp, make_youtube_link
from metadata import fetch_metadata
from embedder import build_index
from keyword_index import build_keyword_index
from chat import chat_with_video

# ── Page config ────────────────────────────────────────────────────────────────
st.set_page_config(
    page_title="YT Chat",
    page_icon="🎬",
    layout="wide",
    initial_sidebar_state="collapsed",
)

# ── Custom CSS ─────────────────────────────────────────────────────────────────

def load_local_css(filename: str):
    css_path = os.path.join(os.path.dirname(__file__), filename)
    if os.path.exists(css_path):
        with open(css_path, "r", encoding="utf-8") as f:
            st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
    else:
        st.warning(f"Missing CSS file: {css_path}")

load_local_css("style.css")

# ── Query param handler is already set up above ──────────────────────────────────────

# ── Session state ──────────────────────────────────────────────────────────────
def init_state():
    defaults = {
        "videos": {},
        "active_video_id": None,
        "conversations": {},
        "client": None,
        "pending_input": "",
        "awaiting_answer": "",  # question waiting for LLM response
        "processing_answer": False,
    }
    for k, v in defaults.items():
        if k not in st.session_state:
            st.session_state[k] = v

init_state()

# Handle jump-to-timestamp from HTML button click
if "jump_to" in st.query_params:
    try:
        jump_seconds = st.query_params.get("jump_to")
        if jump_seconds:
            st.session_state.jump_to_seconds = int(jump_seconds)
            # Clear the param to avoid re-triggering
            params = dict(st.query_params)
            del params["jump_to"]
            st.query_params.clear()
            for k, v in params.items():
                st.query_params[k] = v
    except Exception as e:
        print(f"Error handling jump_to param: {e}")

def get_client():
    return st.session_state.client

def active_video():
    vid = st.session_state.active_video_id
    if vid and vid in st.session_state.videos:
        return st.session_state.videos[vid]
    return None

def active_conversation():
    vid = st.session_state.active_video_id
    if vid and vid not in st.session_state.conversations:
        st.session_state.conversations[vid] = []
    if vid:
        return st.session_state.conversations[vid]
    return []

# ── TOP NAV ────────────────────────────────────────────────────────────────────
st.markdown("""
<div class="topbar">
  <div style="font-size:1.25rem;font-weight:700;color:#fff;letter-spacing:-0.3px;">
    <span style="color:#ff0000;">▶</span> YT Chat
  </div>
</div>
""", unsafe_allow_html=True)

# Controls row below topbar
ctrl_col1, ctrl_col2, ctrl_col3 = st.columns([4, 1, 1])
with ctrl_col1:
    yt_url = st.text_input(
        "url", placeholder="Paste YouTube URL...",
        label_visibility="collapsed", key="url_input"
    )
with ctrl_col2:
    load_btn = st.button("Load Video", use_container_width=True)
with ctrl_col3:
    # Show loaded videos selector if multiple
    if len(st.session_state.videos) > 1:
        video_options = {v["meta"]["title"][:28] + "…": k
                        for k, v in st.session_state.videos.items()}
        selected_label = st.selectbox(
            "Switch", list(video_options.keys()),
            label_visibility="collapsed"
        )
        st.session_state.active_video_id = video_options[selected_label]
    elif len(st.session_state.videos) == 1:
        st.markdown(
            f'<div style="font-size:0.75rem;color:#888;padding:8px 0;">1 video loaded</div>',
            unsafe_allow_html=True
        )

# Handle load
if load_btn:
    url_val = yt_url.strip()

    if not OPENAI_API_KEY:
        st.error("OPENAI_API_KEY not found in .env file.")
        st.stop()
    elif not url_val:
        st.error("Paste a YouTube URL.")
    else:
        if not st.session_state.client:
            st.session_state.client = OpenAI(api_key=OPENAI_API_KEY)

        video_id = extract_video_id(url_val)
        if not video_id:
            st.error("Couldn't parse a video ID from that URL.")
        elif video_id in st.session_state.videos:
            st.session_state.active_video_id = video_id
            st.success("Already loaded — switched to it.")
            st.rerun()
        else:
            prog = st.progress(0, text="Fetching metadata...")
            try:
                meta = fetch_metadata(video_id)
                prog.progress(15, text="Fetching transcript...")
                raw = fetch_transcript(video_id)
                prog.progress(40, text="Chunking transcript...")
                chunks = chunk_transcript(raw)
                prog.progress(60, text=f"Embedding {len(chunks)} chunks...")
                index, chunks = build_index(chunks, get_client())
                prog.progress(80, text="Building keyword index...")
                keyword_index = build_keyword_index(chunks)
                prog.progress(95, text="Almost done...")
                st.session_state.videos[video_id] = {
                    "meta": meta, "chunks": chunks,
                    "index": index, "keyword_index": keyword_index,
                    "chunk_count": len(chunks),
                }
                st.session_state.active_video_id = video_id
                st.session_state.conversations[video_id] = []
                prog.progress(100, text="Ready!")
                time.sleep(0.3)
                prog.empty()
                st.rerun()
            except ValueError as e:
                prog.empty()
                st.error(str(e))
            except Exception as e:
                prog.empty()
                st.error(f"Error: {e}")

st.markdown("<div style='height:1px;background:#272727;margin:0;'></div>", unsafe_allow_html=True)

# ── MAIN TWO-PANEL LAYOUT ──────────────────────────────────────────────────────
video = active_video()

if not video:
    # Empty state
    st.markdown("""
    <div style="display:flex;align-items:center;justify-content:center;
                height:calc(100vh - 120px);flex-direction:column;
                text-align:center;color:#555;gap:12px;">
        <div style="font-size:3rem;">▶</div>
        <div style="font-size:1rem;color:#888;font-weight:500;">Paste a YouTube URL above to get started</div>
        <div style="font-size:0.82rem;color:#555;max-width:380px;line-height:1.6;">
            Chat with any video — answers grounded in the transcript with clickable timestamps
        </div>
    </div>
    """, unsafe_allow_html=True)
else:
    meta = video["meta"]
    chunks = video["chunks"]
    index = video["index"]
    video_id = meta["video_id"]
    conversation = active_conversation()

    # ── Two columns: video | chat ──────────────────────────────────────────────
    left_col, right_col = st.columns([1.15, 0.85], gap="small")

    # ── LEFT: Video embed + info ───────────────────────────────────────────────
    with left_col:
        origin = "http://localhost:8501"

        # Create placeholder for video panel to allow re-rendering on jump
        video_placeholder = st.empty()

        # Check if we need to update start time
        start_time = 0
        if hasattr(st.session_state, 'jump_to_seconds') and st.session_state.jump_to_seconds:
            start_time = st.session_state.jump_to_seconds
            st.session_state.jump_to_seconds = None  # Reset for next jump

        with video_placeholder.container():
            st.markdown(f"""
            <div class="video-panel">
                <div class="video-embed-wrap">
                    <iframe
                        id="yt-player"
                        name="yt-player"
                        src="https://www.youtube.com/embed/{video_id}?rel=0&modestbranding=1&enablejsapi=1&autoplay=1&start={start_time}&origin={origin}"
                        allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
                        allowfullscreen>
                    </iframe>
                </div>
                <div class="video-info">
                    <div class="video-title">{meta['title']}</div>
                    <div class="video-channel">{meta['author']}</div>
                    <div class="meta-row" style="display:flex;align-items:center;gap:12px;">
                        <span class="badge">{video['chunk_count']} chunks indexed</span>
                        <a class="yt-link" href="{meta['url']}" target="_blank">↗ Open on YouTube</a>
                    </div>
                </div>
            </div>
            """, unsafe_allow_html=True)

    # ── RIGHT: Chat panel ──────────────────────────────────────────────────────
    with right_col:
        # ... (full app.py UI code continues - see repo for complete file)

注意: 由于篇幅限制,上面app.py的完整代码已截断。完整代码可在GitHub仓库中找到。

10、运行应用

前置条件:

  • Python 3.10+
  • OpenAI API密钥

安装依赖:

cd medium/chat-with-video
pip install -r requirements.txt

运行应用:

streamlit run app.py

然后在浏览器中打开http://localhost:8501

在我们的实验中,我们选择了https://www.youtube.com/watch?v=9QXCkMTbrSk,这是法国总统埃马纽埃尔·马克龙的播客,由Raj Shamani主持。这是一个40分钟的视频。

应用截图
Streamlit中的YT Chat界面

让我们问第一个问题。我们首先在全局类别中提问。

What are the main topics discussed?
全局问题结果

我们看到答案中按时间戳分隔了主题。

让我们问下一个问题。这将把系统路由到RAG系统。

What is mentioned about the relation of America with France?
RAG问题结果

我们看到清晰的时间戳答案。

就是这样。我们现在有了一个完整的端到端应用,可以与视频聊天。

11、后续改进

如果你想扩展这个项目,下一个合理的改进是:

  • 添加离线字幕缓存
  • 添加多视频支持
  • 两个或多个视频的比较
  • 多语言支持

12、结束语

这个项目展示了如何通过结合字幕、混合检索和LLM,将视频转变为可交互、可搜索的知识系统。通过使用时间戳来锚定答案,并平衡语义和关键词搜索,它同时实现了准确性和可用性。

最终,它突显了一个强大的转变——从被动地观看内容到主动地查询和理解内容。


原文链接: Stop Watching YouTube Videos. Start Chatting With Them

汇智网翻译整理,转载请标明出处