Docling + RAG = 轻松解析文档

作为研发工作的一部分,我需要尽可能准确地从 PDF 文件中提取文本数据。过去,我曾使用 PyMuPDF 或 OCR 引擎 Tesseract 从 PDF 文件中提取文本。

Docling + RAG = 轻松解析文档

如果您一直关注开源LLM/VLM,您就会知道它已经成为一个竞争非常激烈的领域。每隔几个月,就会出现一个新的模型,声称它突破了旧的限制,而其中一些确实做到了。

就在两天前,我安静地考完试,进入学习状态后,深夜还在上网浏览网页。 DeepSeek一如既往地在人工智能界引起了轰动。

DeepSeek发布了其最新的智能体模型“DeepSeek-V3.2”及其高性能版本。

这些模型显著提升了推理能力,融合了高效稀疏注意力机制和大规模强化学习等技术创新。

DeepSeek-V3.2可以与GPT-5一较高下,而Speciale结合了长期思维和定理证明能力,性能堪比Gemini-3.0-Pro。一位读者评论道:“这个模型不应该叫V3.2,应该叫V4。”

尤其值得一提的是,Speciale版本在2025年国际数学奥林匹克(IMO)、国际人工智能组织(IOI)和国际计算机科学与技术竞赛(ICPC)世界锦标赛上取得了金牌级别的成绩,在ICPC世界锦标赛中位列前两名,在IOI中位列前十,达到了“金牌级”水平。

作为研发工作的一部分,我需要尽可能准确地从 PDF 文件中提取文本数据。过去,我曾使用 PyMuPDF 或 OCR 引擎 Tesseract 从 PDF 文件中提取文本。

这些都是功能强大的工具,多年来已被广泛应用于众多项目中。然而,这次我遇到了一个问题,可能是由于我处理的 PDF 文件本身的问题。

Docling 是由 IBM 研究院开发的开源库,它能有效解决这些问题。Docling 是一款强大的工具,可以将 PDF 和 Word 等文档结构化并转换为 Markdown 等格式。

那么,让我给你演示一下实时聊天机器人,让你明白我的意思。

我会上传一份 Ocean AI 的 PDF 文件,并向聊天机器人提问:

“Ocean AI 是什么?它与 OpenAI 有何不同?”

观察聊天机器人如何生成输出,你会发现它首先会进行相关性检查,以确定问题是否与你上传的文档相关。如果不相关,它会立即拒绝该问题,而不是生成一个莫名其妙的答案。

对于相关的问题,它会将文档解析为 Markdown 或 JSON 等结构化格式。然后,它会使用 BM25 关键词搜索和向量嵌入进行混合检索,以找到最相关的部分,即使这些部分跨越多个文档。

研究代理会使用检索到的内容生成答案,然后验证代理会将答案与原始文档进行交叉验证,以确认事实准确性并找出不实的说法或矛盾之处。

如果验证失败,系统会自动启动一个自纠错循环,使用调整后的参数重新运行检索和研究,直到答案通过所有检查为止。答案完全验证后,系统会将其返回。如果在任何时候发现问题与上传的内容无关,系统都会明确告知您,而不是凭空捏造。

1、DeepSeek-V3.2 的独特之处是什么?

大多数强大的 AI 模型都面临着一个共同的问题:随着文件长度的增加,模型执行速度会显著下降,而成本则会急剧上升。这是因为传统模型会尝试将每个词与其他所有词进行比较,以理解上下文。

DeepSeek-V3.2 通过引入一种名为 DeepSeek Sparse Attention (DSA) 的新方法来解决这个问题。您可以将其想象成一位研究人员在图书馆进行研究:

  • 传统方法(密集注意力):研究人员逐页阅读书架上的每一本书,只为回答一个问题。虽然这种方法很全面,但速度极慢,而且需要耗费大量精力。

新方法(DeepSeek-V3.2):研究人员使用数字索引(Lightning Indexer)快速找到关键页面,然后只阅读这些页面。这种方法同样准确,但速度却快得多。

2、Docling 的独特之处是什么?

Docling之所以能从现有工具中脱颖而出,最大的原因在于其设计理念基于与生成式人工智能(尤其是检索增强生成,RAG)的协作。

现代人工智能应用需要的不仅仅是提取文本。为了让人工智能深入理解文档内容并生成准确的答案,它需要了解文档的含义,包括:

  • 这句话是论文的“摘要”还是“结论”?
  • 这串数字不仅仅是文本,而是一个“表格”,那么每个单元格代表什么?
  • 这张图片的“标题”是什么?

PyMuPDF和Tesseract将文本提取为“字符串”,而Docling则利用视觉语言模型(VLM)的强大功能来分析这些结构和关系,并将其输出为包含丰富信息的“DoclingDocument”对象。

这些结构化数据是显著提升RAG检索和答案生成质量的关键。

3、开始编码

现在让我们一步一步地探索并揭开如何使用 RagAnything 和 Multimodal RAG 的答案。我们将安装支持该模型的库。为此,我们将执行 pip install requirements 命令。

pip install requirements

下一步是常规步骤:我们将导入相关的库,其重要性将在后续步骤中逐渐显现。

  • DocumentConverter:一个高级 Python 类,用于将文档转换为结构化的 DoclingDocument 格式。
  • EnsembleRetriever:一个集成检索器,它使用加权倒数排序融合来聚合和排序多个检索器的结果。

4、DocLing

我创建了一个 VerificationAgent 类,用于根据源文档对 AI 生成的答案进行事实核查。在 __init__ 函数中,我实例化了一个温度为零的 deepseek-v3.2 模型,用于确定性输出,并构建了一个提示模板,该模板要求 LLM 从四个特定方面验证答案:声明是否直接支持、哪些内容不支持、哪些内容相互矛盾以及它是否相关,并强制使用结构化响应格式以确保解析的一致性。

check() 函数中,我接收答案字符串和一个 Document 对象列表,提取并连接所有文档文本到一个上下文字符串中,然后创建一个 LangChain 管道(提示 → LLM → 字符串解析器),并使用答案和上下文调用该管道以获取验证报告。

我记录报告和上下文以进行调试,重新抛出任何发生的错误,并返回一个包含验证报告文本和我使用的上下文字符串的字典——这样做的目的是通过检查 RAG 系统生成的答案是否确实得到源文档的支持来捕获幻觉。

import os
import hashlib
import pickle
from datetime import datetime, timedelta
from pathlib import Path
from typing import List
from docling.document_converter import DocumentConverter
from langchain_text_splitters import MarkdownHeaderTextSplitter
from config import constants
from config.settings import settings
from utils.logging import logger

class DocumentProcessor:
    def __init__(self):
        self.headers = [("#", "Header 1"), ("##", "Header 2")]
        self.cache_dir = Path(settings.CACHE_DIR)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        
    def validate_files(self, files: List) -> None:
        """Validate the total size of the uploaded files."""
        total_size = sum(os.path.getsize(f.name) for f in files)
        if total_size > constants.MAX_TOTAL_SIZE:
            raise ValueError(f"Total size exceeds {constants.MAX_TOTAL_SIZE//1024//1024}MB limit")

    def process(self, files: List) -> List:
        """Process files with caching for subsequent queries"""
        self.validate_files(files)
        all_chunks = []
        seen_hashes = set()
        
        for file in files:
            try:
                # Generate content-based hash for caching
                with open(file.name, "rb") as f:
                    file_hash = self._generate_hash(f.read())
                
                cache_path = self.cache_dir / f"{file_hash}.pkl"
                
                if self._is_cache_valid(cache_path):
                    logger.info(f"Loading from cache: {file.name}")
                    chunks = self._load_from_cache(cache_path)
                else:
                    logger.info(f"Processing and caching: {file.name}")
                    chunks = self._process_file(file)
                    self._save_to_cache(chunks, cache_path)
                
                # Deduplicate chunks across files
                for chunk in chunks:
                    chunk_hash = self._generate_hash(chunk.page_content.encode())
                    if chunk_hash not in seen_hashes:
                        all_chunks.append(chunk)
                        seen_hashes.add(chunk_hash)
                        
            except Exception as e:
                logger.error(f"Failed to process {file.name}: {str(e)}")
                continue
                
        logger.info(f"Total unique chunks: {len(all_chunks)}")
        return all_chunks

    def _process_file(self, file) -> List:
        """Original processing logic with Docling"""
        if not file.name.endswith(('.pdf', '.docx', '.txt', '.md')):
            logger.warning(f"Skipping unsupported file type: {file.name}")
            return []

        converter = DocumentConverter()
        markdown = converter.convert(file.name).document.export_to_markdown()
        splitter = MarkdownHeaderTextSplitter(self.headers)
        return splitter.split_text(markdown)

    def _generate_hash(self, content: bytes) -> str:
        return hashlib.sha256(content).hexdigest()

    def _save_to_cache(self, chunks: List, cache_path: Path):
        with open(cache_path, "wb") as f:
            pickle.dump({
                "timestamp": datetime.now().timestamp(),
                "chunks": chunks
            }, f)

    def _load_from_cache(self, cache_path: Path) -> List:
        with open(cache_path, "rb") as f:
            data = pickle.load(f)
        return data["chunks"]

    def _is_cache_valid(self, cache_path: Path) -> bool:
        if not cache_path.exists():
            return False
            
        cache_age = datetime.now() - datetime.fromtimestamp(cache_path.stat().st_mtime)
        return cache_age < timedelta(days=settings.CACHE_EXPIRE_DAYS)

5、RelevanceChecker

我创建了一个 RelevanceChecker 类,它通过将检索到的文档分类为三个类别来确定它们是否可以回答用户的问题。

__init__ 方法中,我使用 API 密钥初始化了一个 deepseek-v3.2 模型,并创建了一个提示模板,指示语言学习模型 (LLM) 将段落分类为“CAN_ANSWER”(完全回答)、“PARTIAL”(提及主题但不完整)或“NO_MATCH”(完全未讨论该主题),并强调任何主题提及都应为“PARTIAL”,而不是“NO_MATCH”。我通过管道将提示 → LLM → 字符串解析器连接起来,构建了一个 LangChain 链。

check() 方法中,我接收一个问题、一个检索器对象和一个参数 k(默认为 3),用于指定要分析的前 k 个文档的数量。我使用问题调用检索器以获取相关的文本块,如果没有返回任何内容,则立即返回“NO_MATCH”。

为了便于查看,我打印调试信息,显示文档计数以及前 k 个文本块的 200 字符预览。我将前 k 个文档的文本合并成一个字符串,字符串之间用双换行符分隔,然后使用问题和合并后的内容调用 LLM 链,并获取一个分类字符串。

我通过将响应转换为大写并检查是否符合有效选项来验证响应是否为三个有效标签之一,如果 LLM 返回了意外结果,则强制返回“NO_MATCH”。

最后,我返回经过验证的分类结果,从而清楚地了解检索器是否找到了可用的文档,或者是否需要回退到其他方法,例如网络搜索。

# agents/relevance_checker.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_deepseek import ChatDeepSeek
from config.settings import settings

class RelevanceChecker:
    def __init__(self):
        # self.llm = ChatOpenAI(api_key=settings.OPENAI_API_KEY, model="gpt-4o")
        self.llm = ChatDeepSeek(api_key=settings.DEEPSEEK_API_KEY, model="deepseek-chat")


        self.prompt = ChatPromptTemplate.from_template(
            """
            You are given a user question and some passages from uploaded documents.
            
            Classify how well these passages address the user's question. 
            Choose exactly one of the following responses (respond ONLY with that label):
            
            1) "CAN_ANSWER": The passages contain enough explicit info to fully answer the question.
            2) "PARTIAL": The passages mention or discuss the question's topic (e.g., relevant years, facility names)
            but do not provide all the data or details needed for a complete answer.
            3) "NO_MATCH": The passages do not discuss or mention the question's topic at all.
            
            Important: If the passages mention or reference the topic or timeframe of the question in ANY way,
            even if incomplete, you should respond "PARTIAL", not "NO_MATCH".
            
            Question: {question}
            Passages: {document_content}
            
            Respond ONLY with "CAN_ANSWER", "PARTIAL", or "NO_MATCH".
            """
        )

        self.chain = self.prompt | self.llm | StrOutputParser()

    def check(self, question: str, retriever, k=3) -> str:
        """
        1. Retrieve the top-k document chunks from the global retriever.
        2. Combine them into a single text string.
        3. Pass that text + question to the LLM chain for classification.
        
        Returns: "CAN_ANSWER" or "PARTIAL" or "NO_MATCH".
        """

        print(f"[DEBUG] RelevanceChecker.check called with question='{question}' and k={k}")
        
        # Retrieve doc chunks from the retriever
        top_docs = retriever.invoke(question)[:k]  # Only use top k docs
        if not top_docs:
            print("[DEBUG] No documents returned from retriever.invoke(). Classifying as NO_MATCH.")
            return "NO_MATCH"

        print(f"[DEBUG] Retriever returned {len(top_docs)} docs.")

        # Show a quick snippet of each chunk for debugging
        for i, doc in enumerate(top_docs):
            snippet = doc.page_content[:200].replace("\n", "\\n")
            print(f"[DEBUG] Chunk #{i+1} preview (first 200 chars): {snippet}...")

        # Combine the top k chunk texts into one string
        document_content = "\n\n".join(doc.page_content for doc in top_docs)
        print(f"[DEBUG] Combined text length for top {k} chunks: {len(document_content)} chars.")

        # Call the LLM
        response = self.chain.invoke({
            "question": question, 
            "document_content": document_content
        }).strip()
        
        print(f"[DEBUG] LLM raw classification response: '{response}'")

        # Convert to uppercase, check if it's one of our valid labels
        classification = response.upper()
        valid_labels = {"CAN_ANSWER", "PARTIAL", "NO_MATCH"}
        if classification not in valid_labels:
            print("[DEBUG] LLM did not respond with a valid label. Forcing 'NO_MATCH'.")
            classification = "NO_MATCH"
        else:
            print(f"[DEBUG] Classification recognized as '{classification}'.")

        return classification

6、ResearchAgent

我创建了一个名为 ResearchAgent 的类,它使用检索到的文档作为上下文来生成问题的答案。

我创建了一个提示模板,要求 LLM 根据提供的上下文回答问题,做到准确且基于事实,并指示 LLM 在上下文不足时明确表示“我无法根据提供的文档回答此问题”。

在 generate() 方法中,我接收一个问题字符串和一个 Document 对象列表,然后提取所有文档文本,并使用双换行符作为分隔符将其连接成一个上下文字符串。

我使用问题和上下文调用该链,将其替换到模板中,向 DeepSeek 发送请求,并将生成的答案作为字符串返回。我将此过程包装在 try-except 语句中,以便记录答案和完整的上下文进行调试,并重新引发任何发生的异常。

最后,我返回一个包含草稿答案和所用上下文的字典,从而同时获得生成的响应和上下文。可追溯用于创建它的源材料。

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import Dict, List
from langchain_core.documents import Document
from langchain_deepseek import ChatDeepSeek
from config.settings import settings
import logging

logger = logging.getLogger(__name__)

class ResearchAgent:
    def __init__(self):
        """Initialize the research agent with the OpenAI model."""
        # self.llm = ChatOpenAI(
        #     model="gpt-4-turbo",
        #     temperature=0.3,
        #     api_key=settings.OPENAI_API_KEY  # Pass the API key here
        # )
        self.llm = ChatDeepSeek(
            model="deepseek-chat",
            temperature=0.3,
            api_key=settings.DEEPSEEK_API_KEY  # Pass the API key here
        )
        self.prompt = ChatPromptTemplate.from_template(
            """Answer the following question based on the provided context. Be precise and factual.
            
            Question: {question}
            
            Context:
            {context}
            
            If the context is insufficient, respond with: "I cannot answer this question based on the provided documents."
            """
        )
        
    def generate(self, question: str, documents: List[Document]) -> Dict:
        """Generate an initial answer using the provided documents."""
        context = "\n\n".join([doc.page_content for doc in documents])
        
        chain = self.prompt | self.llm | StrOutputParser()
        try:
            answer = chain.invoke({
                "question": question,
                "context": context
            })
            logger.info(f"Generated answer: {answer}")
            logger.info(f"Context used: {context}")
        except Exception as e:
            logger.error(f"Error generating answer: {e}")
            raise
        
        return {
            "draft_answer": answer,
            "context_used": context
        }

7、验证代理

我创建了一个 VerificationAgent 类,用于对 AI 生成的答案进行事实核查,以识别虚假信息。在 __init__ 方法中,我初始化了一个温度为 0(完全确定性)的 deepseek-v3.2 模型,创建了一个提示模板,指示 LLM 使用结构化的响应格式验证四个方面(直接事实支持、未经证实的说法、矛盾和相关性),然后构建了一个 LangChain 链。

check() 函数中,我接收一个响应字符串和一个 Document 对象列表,将所有文档文本用双换行符连接成一个上下文字符串,然后使用响应字符串和上下文字符串调用链式调用以获取验证报告,并在 try-except 块中记录报告和上下文字符串以进行调试,最后返回一个包含验证报告和上下文字符串的字典,用于溯源。

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import Dict, List
from langchain_core.documents import Document
from langchain_deepseek import ChatDeepSeek
from config.settings import settings
import logging

logger = logging.getLogger(__name__)

class VerificationAgent:
    def __init__(self):
        # self.llm = ChatOpenAI(
        #     model="gpt-4-turbo",
        #     temperature=0,
        #     api_key=settings.OPENAI_API_KEY  # Pass the API key here
        # )
        self.llm = ChatDeepSeek(
            model="deepseek-chat",
            temperature=0,
            api_key=settings.DEEPSEEK_API_KEY  # Pass the API key here
        )
        self.prompt = ChatPromptTemplate.from_template(
            """Verify the following answer against the provided context. Check for:
            1. Direct factual support (YES/NO)
            2. Unsupported claims (list)
            3. Contradictions (list)
            4. Relevance to the question (YES/NO)
            
            Respond in this format:
            Supported: YES/NO
            Unsupported Claims: [items]
            Contradictions: [items]
            Relevant: YES/NO
            
            Answer: {answer}
            Context: {context}
            """
        )
        
    def check(self, answer: str, documents: List[Document]) -> Dict:
        """Verify the answer against the provided documents."""
        context = "\n\n".join([doc.page_content for doc in documents])
        
        chain = self.prompt | self.llm | StrOutputParser()
        try:
            verification = chain.invoke({
                "answer": answer,
                "context": context
            })
            logger.info(f"Verification report: {verification}")
            logger.info(f"Context used: {context}")
        except Exception as e:
            logger.error(f"Error verifying answer: {e}")
            raise
        
        return {
            "verification_report": verification,
            "context_used": context
        }

8、结束语

DeepSeek V3.2 的优势不在于规模,而在于更智能的思维。凭借其稀疏注意力机制、更低的成本、更强的长上下文感知能力以及卓越的工具使用推理能力,它证明了开源模型如何在无需巨额硬件预算的情况下保持竞争力。

虽然它可能无法在所有基准测试中都名列前茅,但它显著改善了用户与人工智能的交互方式。而这正是它在竞争激烈的市场中脱颖而出的原因。


原文链接:DeepSeek-V3.2 + DocLing + Agentic RAG: Parse Any Document with Ease

汇智网翻译整理,转载请标明出处