从会议笔记构建知识图谱

会议笔记是组织智能的宝库。它们捕获了决策、行动项目、参与者信息,以及人员与任务之间的关系。然而,大多数组织将它们视为静态文档——只能通过基本文本搜索进行搜索。

从会议笔记构建知识图谱
AI编程/Vibe Coding 遇到问题需要帮助的,联系微信 ezpoda,免费咨询。

会议笔记是组织智能的宝库。它们捕获了决策、行动项目、参与者信息,以及人员与任务之间的关系。然而,大多数组织将它们视为静态文档——只能通过基本文本搜索进行搜索。

想象一下,能够像查询数据库一样查询您的会议:

  • "谁参加了主题为'预算规划'的会议?"
  • "Sarah 在所有会议中被分配了哪些任务?"
  • "显示第四季度涉及工程团队的所有决策。"

这正是知识图谱的闪光之处。通过从非结构化的会议笔记中提取结构化信息并构建图表示,您可以解锁强大的基于关系的查询,这在传统文档存储中是不可能实现的。

在这篇文章中,我们将构建一个实用的 CocoIndex 流水线,它可以:

  1. 从 Google Drive 读取 Markdown 会议笔记
  2. 使用 LLM 提取结构化实体(会议、参与者、任务)
  3. 将所有内容作为知识图谱持久化到 Neo4j
  4. 仅在源文档更改时自动更新

完整源代码可在 GitHub 上获得。

1、架构概述

流水线遵循清晰的数据流,在每个阶段都内置了增量处理:

Google Drive(文档 - 带有更改跟踪)
  → 识别已更改的文档
  → 拆分为会议
  → 使用 LLM 提取结构化数据(仅针对已更改的文档)
  → 收集节点和关系
  → 导出到 Neo4j(带有 upsert 逻辑)

先决条件

  • 安装 Neo4j 并在本地启动 默认本地浏览器:http://localhost:7474本示例中使用的默认凭据:用户名 neo4j,密码 cocoindex
  • 配置您的 OpenAI API 密钥
  • 准备 Google Drive: 创建 Google Cloud 服务账户并下载其 JSON 凭据 与服务账户电子邮件共享源文件夹 收集您要摄取的根文件夹 ID
  • 有关详细信息,请参阅 Google Drive 设置

环境

设置以下环境变量:

export OPENAI_API_KEY=sk-...
export GOOGLE_SERVICE_ACCOUNT_CREDENTIAL=/absolute/path/to/service_account.json
export GOOGLE_DRIVE_ROOT_FOLDER_IDS=folderId1,folderId2

注意:

  • GOOGLE_DRIVE_ROOT_FOLDER_IDS 接受以逗号分隔的文件夹 ID 列表
  • 流程轮询最近的更改并定期刷新

让我们分解每个组件:

2、流定义

2.1 添加源和收集器

@cocoindex.flow_def(name="MeetingNotesGraph")
def meeting_notes_graph_flow(
    flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
    """
    Define an example flow that extracts triples from files and builds knowledge graph.
    """
    credential_path = os.environ["GOOGLE_SERVICE_ACCOUNT_CREDENTIAL"]
    root_folder_ids = os.environ["GOOGLE_DRIVE_ROOT_FOLDER_IDS"].split(",")
    data_scope["documents"] = flow_builder.add_source(
            cocoindex.sources.GoogleDrive(
                service_account_credential_path=credential_path,
                root_folder_ids=root_folder_ids,
                recent_changes_poll_interval=datetime.timedelta(seconds=10),
            ),
            refresh_interval=datetime.timedelta(minutes=1),
        )

流水线首先使用服务账户连接到 Google Drive。CocoIndex 内置的源连接器处理身份验证并提供增量更改检测recent_changes_poll_interval 参数意味着源每 10 秒检查一次新文件或修改的文件,而 refresh_interval 决定整个流程何时重新运行(每分钟)。

这是 CocoIndex 的超能力之一:具有自动更改跟踪的增量处理。框架不会在每次运行时重新处理所有文档,而是:

  1. 从 Google Drive 列出文件及其最后修改时间
  2. 仅识别自上次成功运行以来添加或修改的文件
  3. 完全跳过未更改的文件
  4. 仅向下游传递已更改的文档

结果如何?在每天有 1% 流失率的企业中,只有 1% 的文档会触发下游处理。未更改的文件永远不会访问您的 LLM API,永远不会生成 Neo4j 查询,也永远不会消耗计算资源。

添加收集器:

meeting_nodes = data_scope.add_collector()
attended_rels = data_scope.add_collector()
decided_tasks_rels = data_scope.add_collector()
assigned_rels = data_scope.add_collector()

然后,流水线将数据收集到不同实体类型和关系的专用收集中:

  • 会议节点 — 存储会议本身及其日期和笔记
  • 出席关系 — 捕获谁参加了会议以及他们是否是组织者
  • 任务决策关系 — 将会议链接到决策(已决定的任务)
  • 任务分配关系 — 将特定任务分配给人员

2.2 处理每个文档

提取会议

with data_scope["documents"].row() as document:
    document["meetings"] = document["content"].transform(
        cocoindex.functions.SplitBySeparators(
            separators_regex=[r"\n\n##?\ "], keep_separator="RIGHT"
        )
    )

会议文档通常在一个文件中包含多个会议。此步骤在前面有空行的 Markdown 标题(## 或 #)处拆分文档,将每个部分视为单独的会议。keep_separator="RIGHT" 意味着分隔符(标题)与右段一起保留,保留了上下文。

定义会议模式

@dataclass
class Person:
    name: str
@dataclass
class Task:
    description: str
    assigned_to: list[Person]
@dataclass
class Meeting:
    time: datetime.date
    note: str
    organizer: Person
    participants: list[Person]
    tasks: list[Task]

这为 LLM 提供了关于要提取什么信息及其模式的直接指导。这比要求 LLM 生成自由形式输出要可靠得多,后者无法提供构建知识图谱的结构化信息。

2.3 提取和收集关系

with document["meetings"].row() as meeting:
    parsed = meeting["parsed"] = meeting["text"].transform(
        cocoindex.functions.ExtractByLlm(
            llm_spec=cocoindex.LlmSpec(
                api_type=cocoindex.LlmApiType.OPENAI, model="gpt-5"
            ),
            output_type=Meeting,
        )
    )

重要的是,这一步也受益于增量处理。由于 ExtractByLlm 是一个繁重的步骤,我们保持输出在缓存中,只要输入(输入数据文本、模型、输出类型定义)没有变化,我们就重用缓存的输出而无需重新运行 LLM。

2.4 收集关系

meeting_key = {"note_file": document["filename"], "time": parsed["time"]}
meeting_nodes.collect(**meeting_key, note=parsed["note"])
attended_rels.collect(
    id=cocoindex.GeneratedField.UUID,
    **meeting_key,
    person=parsed["organizer"]["name"],
    is_organizer=True,
)
with parsed["participants"].row() as participant:
    attended_rels.collect(
        id=cocoindex.GeneratedField.UUID,
        **meeting_key,
        person=participant["name"],
    )
with parsed["tasks"].row() as task:
    decided_tasks_rels.collect(
        id=cocoindex.GeneratedField.UUID,
        **meeting_key,
        description=task["description"],
    )
    with task["assigned_to"].row() as assigned_to:
        assigned_rels.collect(
            id=cocoindex.GeneratedField.UUID,
            **meeting_key,
            task=task["description"],
            person=assigned_to["name"],
        )

CocoIndex 中的收集器就像内存缓冲区:您为不同类别(会议节点、出席、任务、分配)声明收集器,然后在处理每个文档时"收集"相关条目。

此块从解析的会议笔记中收集节点和关系,使用 CocoIndex 在 Neo4j 中构建知识图谱:

  • Person → Meeting (ATTENDED) 将参与者(包括组织者)链接到他们参加的会议
  • Meeting → Task (DECIDED) 将会议链接到已决定的任务或决定
  • Person → Task (ASSIGNED_TO) 将任务链接回负责它们的人员

3、映射到图数据库

我们将创建一个具有以下节点和关系的属性图:要了解更多关于属性图的信息,请参阅 CocoIndex 的属性图目标文档。

3.1 映射会议节点

会议笔记

meeting_nodes.export(
    "meeting_nodes",
    cocoindex.targets.Neo4j(
        connection=conn_spec, mapping=cocoindex.targets.Nodes(label="Meeting")
    ),
    primary_key_fields=["note_file", "time"],
)

3.2 声明 Person 和 Task 节点

flow_builder.declare(
    cocoindex.targets.Neo4jDeclaration(
        connection=conn_spec,
        nodes_label="Person",
        primary_key_fields=["name"],
    )
)
flow_builder.declare(
    cocoindex.targets.Neo4jDeclaration(
        connection=conn_spec,
        nodes_label="Task",
        primary_key_fields=["description"],
    )
)

3.3 映射 ATTENDED 关系

ATTENDED 关系

attended_rels.export(
    "attended_rels",
    cocoindex.targets.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.targets.Relationships(
            rel_type="ATTENDED",
            source=cocoindex.targets.NodeFromFields(
                label="Person",
                fields=[
                    cocoindex.targets.TargetFieldMapping(
                        source="person", target="name"
                    )
                ],
            ),
            target=cocoindex.targets.NodeFromFields(
                label="Meeting",
                fields=[
                    cocoindex.targets.TargetFieldMapping("note_file"),
                    cocoindex.targets.TargetFieldMapping("time"),
                ],
            ),
        ),
    ),
    primary_key_fields=["id"],
)
  • 此调用确保 ATTENDED 关系 —— 即"Person → Meeting"(组织者或参与者 → 会议)—— 被显式编码为 Neo4j 图中的边
  • 它通过 ATTENDED 关系将 Person 节点与 Meeting 节点链接,启用诸如"Alice 参加了哪些会议?"或"谁参加了会议 X?"之类的查询
  • 通过正确且一致地映射 PersonMeeting 节点(使用唯一键),它确保了一个干净的图,没有重复的人员或会议
  • 由于关系获得唯一 ID 并使用一致的键导出,因此图在增量更新期间保持稳定:重新运行不会复制边或节点

3.4 映射 DECIDED 关系

DECIDED 关系

decided_tasks_rels.export(
    "decided_tasks_rels",
    cocoindex.targets.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.targets.Relationships(
            rel_type="DECIDED",
            source=cocoindex.targets.NodeFromFields(
                label="Meeting",
                fields=[
                    cocoindex.targets.TargetFieldMapping("note_file"),
                    cocoindex.targets.TargetFieldMapping("time"),
                ],
            ),
            target=cocoindex.targets.NodeFromFields(
                label="Task",
                fields=[
                    cocoindex.targets.TargetFieldMapping("description"),
                ],
            ),
        ),
    ),
    primary_key_fields=["id"],
)
  • 此调用确保 DECIDED 关系 —— 即"Meeting → Task" —— 被显式编码为 Neo4j 图中的边
  • 它通过 DECIDED 关系将 Meeting 节点与 Task 节点链接,启用诸如以下的查询:
  • "会议 X 中决定了哪些任务?"
  • "任务 Y 源自哪个会议?"
  • 通过一致地映射 MeetingTask 节点(对会议使用 note_file + time,对任务使用 description),它防止了图中重复的任务或会议节点
  • 由于关系具有唯一 ID 并使用一致的键导出,因此图在增量更新期间保持稳定:重新运行流水线不会创建重复的边或节点

3.5 映射 ASSIGNED_TO 关系

ASSIGNED_TO 关系

assigned_rels.export(
    "assigned_rels",
    cocoindex.targets.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.targets.Relationships(
            rel_type="ASSIGNED_TO",
            source=cocoindex.targets.NodeFromFields(
                label="Person",
                fields=[
                    cocoindex.targets.TargetFieldMapping(
                        source="person", target="name"
                    ),
                ],
            ),
            target=cocoindex.targets.NodeFromFields(
                label="Task",
                fields=[
                    cocoindex.targets.TargetFieldMapping(
                        source="task", target="description"
                    ),
                ],
            ),
        ),
    ),
    primary_key_fields=["id"],
)

4、生成的图

运行此流水线后,您的 Neo4j 数据库包含一个丰富的、可查询的图:

节点:

  • Meeting - 代表具有日期和笔记等属性的个人会议
  • Person - 代表参与会议的个人
  • Task - 代表会议中决定的可操作项目

关系:

  • ATTENDED - 将人员与其参加的会议连接起来
  • DECIDED - 将会议与已决定的任务连接起来
  • ASSIGNED_TO - 将人员与其负责的任务连接起来

重要的是,在导出到知识图谱的最后一步中,CocoIndex 也进行增量处理。CocoIndex 只对有更改的节点或关系更改知识图谱,对于未更改的内容,它是一个无操作。这避免了对目标数据库的不必要搅动,并最小化目标写入操作的成本。

5、运行

构建/更新图

安装依赖项:

pip install -e .

更新索引(运行一次流程以构建/更新图):

cocoindex update main

浏览知识图谱

http://localhost:7474 打开 Neo4j 浏览器。

示例 Cypher 查询:

// 所有关系
MATCH p=()-->() RETURN p
// 谁参加了哪些会议(包括组织者)
MATCH (p:Person)-[:ATTENDED]->(m:Meeting)
RETURN p, m
// 会议中决定的任务
MATCH (m:Meeting)-[:DECIDED]->(t:Task)
RETURN m, t
// 任务分配
MATCH (p:Person)-[:ASSIGNED_TO]->(t:Task)
RETURN p, t

6、真实世界的企业应用

此模式远不止于会议笔记:

  • 研究论文分析 — 从组织存储库中提取论文,在数千个文档中构建概念和引用的知识图谱,并跟踪对引用和概念的更新
  • 客户支持票据 — 提取问题、解决方案以及票据和客户之间的关系;在处理频繁编辑和状态更新的同时,识别数千个票据中的模式
  • 电子邮件线程摘要 — 在数百万封电子邮件中构建沟通模式和决策结果的图;处理团队转发、编辑和引用先前讨论的现实情况
  • 合规文档 — 从政策文档中提取监管要求;通过图结构跟踪对策略的更改和级联影响;维护文档版本的审计跟踪
  • 竞争情报 — 从公共文档和新闻文章中提取数据;在处理不断更新的同时,构建竞争对手关系、产品和市场定位的知识图谱

原文链接: Building a Self-Updating Knowledge Graph From Meeting Notes With LLM Extraction and Neo4j

汇智网翻译整理,转载请标明出处