用Celery扩展AI智能体的规模

为长时间运行的智能体AI系统设计可靠的执行管道。

用Celery扩展AI智能体的规模
微信 ezpoda免费咨询:AI编程 | AI模型微调| AI私有化部署
AI模型价格对比 | AI工具导航 | ONNX模型库 | Tripo 3D | Meshy AI | ElevenLabs | KlingAI | ArtSpace | Phot.AI | InVideo

现代AI应用不再是简单的请求-响应系统。

一旦你超越演示,AI智能体开始:

  • 多步推理
  • 调用外部工具和API
  • 等待网络或I/O
  • 部分失败时重试
  • 运行时间远超过HTTP超时

到那时,同步执行就会崩溃。

本文解释如何使用Celery、RabbitMQ和Redis在Python中为AI智能体构建可扩展、容错的执行层

1、AI智能体系统中的执行问题

基本的AI应用可能看起来像这样:

用户 → LLM → 响应

但真实的智能体系统很快演变成更复杂的东西:

用户
↓
API
↓
任务队列
↓
AI智能体
├─ LLM推理
├─ 工具执行
├─ 外部API
├─ 重试和回退
└─ 结果合成
↓
结果存储

这引入了若干挑战:

  • AI智能体可以运行数秒或数分钟
  • 工具调用不可预测地失败
  • 负载以突发形式到达,不是均匀分布
  • API服务器不应被执行阻塞

为了可靠地处理,执行必须与请求处理解耦

2、为什么AI智能体使用Celery?

Celery是一个分布式任务队列,专为这类问题设计。

对于AI系统,它提供:

  • 异步执行长时间运行的智能体工作流
  • 重试语义用于不稳定的外部依赖
  • 水平可扩展性通过工作进程
  • 背压处理在高负载下
  • 故障隔离从API层

与其将AI智能体视为函数调用,Celery允许你将它们视为作业——这是在规模上更好的抽象。

常见的生产设置使用:

  • RabbitMQ作为消息代理
  • Redis作为结果后端

RabbitMQ(代理)

  • 可靠的消息传递
  • 很好地处理突发工作负载
  • 将生产者与消费者解耦

Redis(结果后端)

  • 快速存储任务状态和输出
  • 轻松过期已完成任务
  • 高效轮询作业结果

这种分离允许每个组件独立扩展。

3、示例用例:后台AI智能体执行

3.1 场景

用户触发一个AI智能体,它:

  • 分析输入
  • 调用工具或API
  • 综合最终响应

这个过程不应阻塞API请求。

相反,API将作业入队

  • 工作线程执行智能体
  • 结果被存储并异步检索

3.2 项目结构

ai_agent_system/
├── celery_app.py
├── tasks.py
├── agent.py
├── app.py
└── requirements.txt

3.3 Celery配置

celery_app.py

from celery import Celery

celery = Celery(
    "ai_agent_tasks",
    broker="amqp://guest:guest@localhost:5672//",
    backend="redis://localhost:6379/0"
)

celery.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
)

此配置简单、明确,且适合生产。

3.4 AI智能体逻辑

agent.py

import time

def run_ai_agent(prompt: str) -> dict:
    # 第1步:推理
    time.sleep(2)
    analysis = f"分析输入:{prompt}"
    
    # 第2步:工具执行
    time.sleep(2)
    tool_output = "外部数据已获取"
    
    # 第3步:最终合成
    time.sleep(2)
    final_response = f"{prompt}的最终输出"
    
    return {
        "analysis": analysis,
        "tool_output": tool_output,
        "final_response": final_response
    }

在真实系统中,此函数可能:

  • 调用LLM API
  • 使用智能体框架
  • 执行工作流或管道

3.5 Celery任务定义

tasks.py

from celery_app import celery
from agent import run_ai_agent

@celery.task(bind=True, max_retries=3)
def execute_ai_agent(self, prompt: str):
    try:
        return run_ai_agent(prompt)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=5)

重试在AI系统中至关重要——失败是预期的,不是例外。

3.6 从API触发执行

app.py

from tasks import execute_ai_agent

def handle_user_request(prompt: str):
    task = execute_ai_agent.delay(prompt)
    return {
        "task_id": task.id,
        "status": "智能体执行已启动"
    }

这种模式允许API保持响应,无论智能体运行时间如何。

3.7 运行工作线程

celery -A celery_app.celery worker --loglevel=info

随着智能体负载增加,工作线程可以独立扩展。

3.8 获取任务结果

from celery.result import AsyncResult
from celery_app import celery

result = AsyncResult(task_id, app=celery)
if result.ready():
    print(result.result)

这是以下内容的常见模式:

  • 作业状态跟踪
  • 异步结果检索
  • 工作流协调

4、生产考虑

使用Celery处理AI智能体时:

  • 为不同工作负载使用单独的队列
  • 强制执行任务时间限制
  • 在Redis中过期结果
  • 设计幂等的任务
  • 监控工作线程健康和延迟

AI系统中的大多数故障发生在编排中,而不是模型推理中。

5、结束语

AI智能体本质上是异步的。

它们推理、等待、重试,并与不可靠的系统交互。将它们视为简单函数调用会导致脆弱的架构。

Celery、RabbitMQ和Redis提供了稳定的执行层,允许AI智能体在真实世界条件下可靠运行。

随着AI系统变得更加自主和长时间运行,这种基础设施变得基础——不是可选的。


原文链接: Scaling AI Agents with Celery, RabbitMQ & Redis in Python

汇智网翻译整理,转载请标明出处