CocoIndex:AI原生数据管道
我在编码竞争情报管道时如坐针毡,制作了这个表情包——用Claude Code帮助原型化LLM提取,不断瞥眼使用量仪表盘,祈祷不会因为意外重新处理而遇到意想不到的"额外使用"飙升。
我们都经历过:构建AI驱动的管道感觉就像赌博,一个疏忽就会变成螺旋式上升的令牌成本。
但事实不必如此。
想象你是杰森·伯恩——敏锐、机智,从网络上分散的来源中拼凑关键情报。你的任务很明确:通过追踪竞争对手的每一个产品发布、合作伙伴关系和战略行动,保持领先一步。你需要准确、最新的洞察,而且要快。CocoIndex 就是将这种能力交到你手中的工具:一个为AI工作负载构建的开源ETL框架。它处理非结构化数据,无缝集成LLM转换,而且——至关重要的是——将增量处理、持久重试和智能指纹识别作为默认设置,让你的"令牌余额"保持安全。
本文深入探讨CocoIndex的架构(包括最新更新),然后演示那个激发表情包灵感的管道:一个使用Tavily搜索、GPT-4o-mini提取和PostgreSQL存储的竞争情报监控器——所有代码均来自开源仓库。
快速说明: 本教程优先展示CocoIndex的工作原理,而非呈现生产环境基准测试。请将其视为蓝图——你实际的令牌节省、处理时间和情报质量将取决于你的具体竞争对手、刷新频率和数据量。框架为你提供工具;指标取决于你如何部署。
1、为什么需要AI原生数据管道?
传统ETL工具是为结构化数据和静态模式设计的。AI应用处理的是非结构化文本、图像和音频;它们需要LLM推理、嵌入和其他计算密集型转换。CocoIndex通过提供一个超高性能框架来填补这一差距,该框架支持:
- 增量处理:CocoIndex跟踪已处理的行,仅重新计算发生变化的部分,并在可能时重用缓存结果。2025年引入的新引擎优化使用指纹识别来检测未更改的数据并跳过昂贵的读取。
- 超越SQL的自定义逻辑:你可以插入LLM调用、嵌入、OCR或任何Python函数作为转换。CocoIndex标准化了接口,使切换组件就像重新排列积木一样简单。
- 数据血缘和可观察性:管道的每一步都被记录。CocoInsight提供了一个交互式界面来可视化每个输出行如何与来源和转换相关联。
- 多种执行模式:定义一次流程,然后作为批处理作业、监视源的实时更新或采样快速预览(使用CocoInsight)运行。
2、2025年有什么新变化?
2025年10月的更新日志引入了使CocoIndex更加强大和高效的功能:
- 持久执行自动仅重试失败的行,在不丢失进度的情况下捕获失败。它改进了对短期认证密钥的处理,使重试保持稳定。
- 避免不必要的重新处理:精细化的变更检测逻辑减少了误报,确保流程仅在源或流程定义实际更改时才重新处理数据。
- 重新导出和重置选项让你可以显式控制重建目标或以干净设置开始。
- 快速指纹折叠使用轻量级内容哈希来跳过读取未更改的源行,节省API调用和计算。
- 改进的GPU隔离和错误容忍度确保GPU工作负载在子进程中运行,瞬态API错误被记录而不是中止实时更新。
- 增强的查询处理器和CocoInsight集成使你更容易为索引数据公开自定义查询并以可视化方式探索它们。
这些改进强化了CocoIndex的目标:提供一个为AI设计的、生产就绪的、自修复的ETL引擎。
3、用例:构建竞争情报监控器
我们的用例从Tavily AI搜索收集关于指定竞争对手(例如OpenAI、Anthropic、Google AI)的新闻文章,使用LLM提取结构化事件,并将文章和事件存储在PostgreSQL中。流程以增量方式运行,因此每次新运行只处理之前未被索引的文章。结果是一个可查询的竞争情报数据库和即用的分析功能。
3.1 架构概览
管道由三个阶段组成:摄取、提取和索引:
- 摄取:自定义连接器调用Tavily AI的搜索API,获取包含与融资、合作伙伴关系、产品发布、收购和高管招聘相关关键词的文章。
- 提取:每篇文章的内容通过
ExtractByLlm进行转换,使用GPT-4o-mini提取CompetitiveEvent对象列表,例如产品发布或关键招聘。 - 索引:CocoIndex将原始文章收集到一个表中,将结构化事件收集到另一个表中,然后将它们导出到PostgreSQL。
3.2 定义数据模型
首先,我们定义一个数据类来表示提取的事件:
@dataclasses.dataclass
class CompetitiveEvent:
"""A competitive intelligence event extracted from text"""
event_type: str # "product_launch", "partnership", "funding", etc.
competitor: str # Primary company involved
description: str # Summary of the event
significance: str # "high", "medium", "low"
related_companies: list[str] # Other companies mentioned
这个类型化模型确保我们的LLM提取返回可以轻松存储和查询的结构化数据。
3.3 创建自定义源连接器
要从Tavily AI获取文章,我们使用CocoIndex的@source_connector装饰器实现自定义源。连接器构建一个搜索查询,将竞争对手名称与我们的事件关键词结合,并为每篇文章生成一个轻量级行:
class TavilySearchSource(SourceSpec):
competitor: str
days_back: int = 7
max_results: int = 10
@source_connector(
spec_cls=TavilySearchSource,
key_type=_ArticleKey,
value_type=_Article,
)
class TavilySearchConnector:
def __init__(self, spec: TavilySearchSource, api_key: str):
self._spec = spec
self._api_key = api_key
async def list(self) -> AsyncIterator[PartialSourceRow[_ArticleKey, _Article]]:
search_query = (
f"{self._spec.competitor} AND "
f"(funding OR partnership OR product launch OR acquisition OR executive hire)"
)
client = TavilyClient(api_key=self._api_key)
response = client.search(
query=search_query,
search_depth="advanced",
max_results=self._spec.max_results,
include_raw_content=True,
)
for result in response.get("results", []):
url = result["url"]
pub_date = result.get("published_date")
ordinal = int(datetime.fromisoformat(pub_date).timestamp()) if pub_date else NO_ORDINAL
yield PartialSourceRow(key=_ArticleKey(url=url), data=PartialSourceRowData(ordinal=ordinal))
这个连接器展示了CocoIndex如何让你轻松引入自己的API:你定义一个规格,实现list以任意顺序发出键,并可选地实现get_value来获取完整内容。
3.4 编写流程定义
管道本身定义为flow_def。我们遍历环境变量中的竞争对手列表,为每个竞争对手添加Tavily搜索源,并指定源应以可配置的间隔刷新。在流程内部,我们使用LLM转换文章内容,并收集原始文章和提取的事件:
@cocoindex.flow_def(name="CompetitiveIntelligence")
def competitive_intelligence_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
competitors = os.getenv("COMPETITORS", "OpenAI,Anthropic").split(",")
refresh_interval = int(os.getenv("REFRESH_INTERVAL_SECONDS", "3600"))
search_days_back = int(os.getenv("SEARCH_DAYS_BACK", "7"))
# Add Tavily search source for each competitor
for competitor in competitors:
data_scope[f"articles_{competitor.strip()}"] = flow_builder.add_source(
TavilySearchSource(
competitor=competitor.strip(),
days_back=search_days_back,
max_results=10,
),
refresh_interval=timedelta(seconds=refresh_interval),
)
articles_index = data_scope.add_collector()
events_index = data_scope.add_collector()
for competitor in competitors:
with data_scope[f"articles_{competitor.strip()}"] .row() as article:
# LLM extraction
article["events"] = article["content"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="openai/gpt-4o-mini",
address="https://openrouter.ai/api/v1",
),
output_type=list[CompetitiveEvent],
instruction=(
"Extract competitive intelligence events from this article. "
"Focus on: product launches, partnerships, funding rounds, key hires, "
"acquisitions, and other strategic moves. Return an empty list if no events found."
),
)
)
# Collect raw article
articles_index.collect(
id=article["url"],
title=article["title"],
content=article["content"],
url=article["url"],
source=article["source"],
published_at=article["published_at"],
score=article["score"],
)
# Collect each extracted event
with article["events"].row() as event:
events_index.collect(
article_id=article["url"],
event_type=event["event_type"],
competitor=event["competitor"],
description=event["description"],
significance=event["significance"],
related_companies=event["related_companies"],
)
# Export to PostgreSQL
articles_index.export(
"intel_articles",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
)
events_index.export(
"intel_events",
cocoindex.targets.Postgres(),
primary_key_fields=["article_id", "event_type", "competitor"],
)
这个流程展示了CocoIndex的声明式方法如何将复杂的ETL转化为简洁、可读的Python代码。refresh_interval确保增量更新:只有新文章才会触发LLM提取和索引。
3.5 查询数据
CocoIndex让你可以公开在底层运行SQL但返回结构化Python对象的查询处理器。在我们的项目中,定义了两个处理器:
@competitive_intelligence_flow.query_handler()
def search_by_competitor(competitor: str, event_type: str | None = None, limit: int = 20) -> cocoindex.QueryOutput:
"""Find recent competitive intelligence about a specific competitor."""
with connection_pool().connection() as conn:
with conn.cursor() as cur:
sql = """
SELECT
e.competitor, e.event_type, e.description,
e.significance, e.related_companies,
a.title, a.url, a.source, a.published_at
FROM intel_events e
JOIN intel_articles a ON e.article_id = a.id
WHERE LOWER(e.competitor) LIKE LOWER(%s)
"""
params = [f"%{competitor}%"]
if event_type:
sql += " AND e.event_type = %s"
params.append(event_type)
sql += " ORDER BY a.published_at DESC LIMIT %s"
params.append(limit)
cur.execute(sql, params)
results = [ { ... } for row in cur.fetchall() ]
return cocoindex.QueryOutput(results=results)
@competitive_intelligence_flow.query_handler()
def get_trending_competitors(days: int = 7) -> cocoindex.QueryOutput:
"""Rank competitors by recent news volume and weighted significance."""
with connection_pool().connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
e.competitor,
COUNT(*) AS total_events,
SUM(CASE WHEN e.significance = 'high' THEN 3
WHEN e.significance = 'medium' THEN 2 ELSE 1 END) AS weighted_score,
COUNT(DISTINCT e.event_type) AS event_types,
array_agg(DISTINCT e.event_type) AS events
FROM intel_events e
JOIN intel_articles a ON e.article_id = a.id
WHERE a.published_at >= NOW() - INTERVAL '%s days'
GROUP BY e.competitor
ORDER BY weighted_score DESC
""",
(days,),
)
results = [ { ... } for row in cur.fetchall() ]
return cocoindex.QueryOutput(results=results)
这些处理器展示了CocoIndex如何将SQL和Python结合在一起。weighted_score强调高影响事件(3分)、中等影响事件(2分)和低影响事件(1分),据此对公司进行排名。
4、设置和运行管道
前提条件:
- 一个PostgreSQL实例——本地或云端。
- Python 3.11+并安装
cocoindex包(pip install -e .)。 - Tavily和OpenRouter的API密钥。
4.1 配置
将.env.example复制到.env并设置你的DATABASE_URL、COCOINDEX_DATABASE_URL、OPENAI_API_KEY、TAVILY_API_KEY以及竞争对手列表。你还可以配置SEARCH_DAYS_BACK和REFRESH_INTERVAL_SECONDS。
4.2 运行管道
- 交互模式:
python3 run_interactive.py会提示你输入竞争对手、事件类型、时间范围和运行模式。它会相应地更新.env并运行一次性或持续同步。 - 直接模式:运行
cocoindex update main -f执行初始同步。要持续运行,使用cocoindex update -L main.py。 - 管道运行后,使用
python3 generate_report.py生成报告,查看汇总统计和详细情报。
4.3 使用结果
使用上面展示的查询处理器从Python脚本或交互式控制台调用。例如:
from main import competitive_intelligence_flow
# Recent activity by Anthropic
results = competitive_intelligence_flow.search_by_competitor("Anthropic")
for r in results.results:
print(r)
# Trending companies in the last 7 days
trending = competitive_intelligence_flow.get_trending_competitors(days=7)
print(trending.results)
你还可以基于高显著性事件构建仪表盘或触发警报。CocoIndex的增量引擎确保每次运行只处理新文章,因此你不会为重新处理相同内容付费。
4.4 CocoIndex功能展示
竞争情报监控器展示了CocoIndex的几项关键功能如何在一个实际项目中协同工作。以下是功能及其在用例中的展示方式总结:
- 增量处理:
refresh_interval和指纹识别确保仅处理新文章,显著减少令牌消耗。 - 自定义源连接器:
TavilySearchConnector展示了将外部API集成为CocoIndex源是多么容易。 - LLM驱动的转换:
ExtractByLlm从非结构化文章文本中提取结构化CompetitiveEvent对象。 - 双收集器导出:原始文章和提取的事件分别存储在不同的PostgreSQL表中。
- 查询处理器:
search_by_competitor和get_trending_competitors将SQL查询封装为类型化的Python函数。
4.5 使用CocoInsight可视化数据血缘
虽然我们的管道图展示了高层级流程,CocoIndex内置的CocoInsight工具让你可以检查各个步骤并追踪数据血缘。CocoInsight允许你在每个阶段预览数据、检查LLM提取结果,以及在不编写额外代码的情况下尝试不同的索引策略。要对你的流程启动CocoInsight,运行:
cocoinsight preview main
这将打开一个交互式界面,你可以在其中探索源、转换和目标,提高透明度和可调试性。
4.6 超越竞争情报
CocoIndex可以为广泛的AI原生管道提供动力:使用混合嵌入模型索引PDF元素、从会议记录中提取元数据、从电子邮件构建知识图谱等等。2025年更新日志强调了None处理改进以实现更健壮的转换,以及GPU隔离以安全运行计算密集型工作负载。通过重新导出和重置选项进行完整重建,你可以快速迭代索引策略并从故障中恢复。
5、结束语
CocoIndex代表了一种向真正AI原生数据基础设施的转变——一种拥抱非结构化输入、无缝LLM和嵌入转换,并将增量处理作为默认设置的转变。
我们构建的竞争情报监控器在实践中展示了这一点:来自Tavily的自定义摄取、使用GPT-4o-mini的结构化事件提取、PostgreSQL中的双重索引,以及直观的查询处理器——所有这些都是以简洁、可读的代码实现的,以增量方式运行以避免重复成本。
随着最近的更新——持久行级重试、跳过未更改数据的快速指纹识别、精细化的变更检测,以及通过CocoInsight实现的更好可观察性——CocoIndex致力于成为生产AI管道更加可靠、自修复的基础。
在Claude(或任何LLM提供商)中没有"旋转赢取更多令牌"的按钮——令牌成本是真实的,而且增长很快。但有了像CocoIndex这样的工具,你确实可以从每个令牌中获得更多价值:只处理新内容、智能缓存结果,以及在不重新运行整个流程的情况下重试失败。
所以下次你在原型化LLM管道并紧张地盯着使用量仪表盘时……记住:庄家不一定要赢。CocoIndex把胜算放在你这边。
原文链接: Exploring CocoIndex: A Deep Dive into AI‑Native Data Pipelines and Competitive Intelligence
汇智网翻译整理,转载请标明出处