用Celery扩展AI智能体的规模
现代AI应用不再是简单的请求-响应系统。
一旦你超越演示,AI智能体开始:
- 多步推理
- 调用外部工具和API
- 等待网络或I/O
- 部分失败时重试
- 运行时间远超过HTTP超时
到那时,同步执行就会崩溃。
本文解释如何使用Celery、RabbitMQ和Redis在Python中为AI智能体构建可扩展、容错的执行层。
1、AI智能体系统中的执行问题
基本的AI应用可能看起来像这样:
用户 → LLM → 响应
但真实的智能体系统很快演变成更复杂的东西:
用户
↓
API
↓
任务队列
↓
AI智能体
├─ LLM推理
├─ 工具执行
├─ 外部API
├─ 重试和回退
└─ 结果合成
↓
结果存储
这引入了若干挑战:
- AI智能体可以运行数秒或数分钟
- 工具调用不可预测地失败
- 负载以突发形式到达,不是均匀分布
- API服务器不应被执行阻塞
为了可靠地处理,执行必须与请求处理解耦。
2、为什么AI智能体使用Celery?
Celery是一个分布式任务队列,专为这类问题设计。
对于AI系统,它提供:
- 异步执行长时间运行的智能体工作流
- 重试语义用于不稳定的外部依赖
- 水平可扩展性通过工作进程
- 背压处理在高负载下
- 故障隔离从API层
与其将AI智能体视为函数调用,Celery允许你将它们视为作业——这是在规模上更好的抽象。
常见的生产设置使用:
- RabbitMQ作为消息代理
- Redis作为结果后端
RabbitMQ(代理)
- 可靠的消息传递
- 很好地处理突发工作负载
- 将生产者与消费者解耦
Redis(结果后端)
- 快速存储任务状态和输出
- 轻松过期已完成任务
- 高效轮询作业结果
这种分离允许每个组件独立扩展。
3、示例用例:后台AI智能体执行
3.1 场景
用户触发一个AI智能体,它:
- 分析输入
- 调用工具或API
- 综合最终响应
这个过程不应阻塞API请求。
相反,API将作业入队
- 工作线程执行智能体
- 结果被存储并异步检索
3.2 项目结构
ai_agent_system/
├── celery_app.py
├── tasks.py
├── agent.py
├── app.py
└── requirements.txt
3.3 Celery配置
celery_app.py
from celery import Celery
celery = Celery(
"ai_agent_tasks",
broker="amqp://guest:guest@localhost:5672//",
backend="redis://localhost:6379/0"
)
celery.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)
此配置简单、明确,且适合生产。
3.4 AI智能体逻辑
agent.py
import time
def run_ai_agent(prompt: str) -> dict:
# 第1步:推理
time.sleep(2)
analysis = f"分析输入:{prompt}"
# 第2步:工具执行
time.sleep(2)
tool_output = "外部数据已获取"
# 第3步:最终合成
time.sleep(2)
final_response = f"{prompt}的最终输出"
return {
"analysis": analysis,
"tool_output": tool_output,
"final_response": final_response
}
在真实系统中,此函数可能:
- 调用LLM API
- 使用智能体框架
- 执行工作流或管道
3.5 Celery任务定义
tasks.py
from celery_app import celery
from agent import run_ai_agent
@celery.task(bind=True, max_retries=3)
def execute_ai_agent(self, prompt: str):
try:
return run_ai_agent(prompt)
except Exception as exc:
raise self.retry(exc=exc, countdown=5)
重试在AI系统中至关重要——失败是预期的,不是例外。
3.6 从API触发执行
app.py
from tasks import execute_ai_agent
def handle_user_request(prompt: str):
task = execute_ai_agent.delay(prompt)
return {
"task_id": task.id,
"status": "智能体执行已启动"
}
这种模式允许API保持响应,无论智能体运行时间如何。
3.7 运行工作线程
celery -A celery_app.celery worker --loglevel=info
随着智能体负载增加,工作线程可以独立扩展。
3.8 获取任务结果
from celery.result import AsyncResult
from celery_app import celery
result = AsyncResult(task_id, app=celery)
if result.ready():
print(result.result)
这是以下内容的常见模式:
- 作业状态跟踪
- 异步结果检索
- 工作流协调
4、生产考虑
使用Celery处理AI智能体时:
- 为不同工作负载使用单独的队列
- 强制执行任务时间限制
- 在Redis中过期结果
- 设计幂等的任务
- 监控工作线程健康和延迟
AI系统中的大多数故障发生在编排中,而不是模型推理中。
5、结束语
AI智能体本质上是异步的。
它们推理、等待、重试,并与不可靠的系统交互。将它们视为简单函数调用会导致脆弱的架构。
Celery、RabbitMQ和Redis提供了稳定的执行层,允许AI智能体在真实世界条件下可靠运行。
随着AI系统变得更加自主和长时间运行,这种基础设施变得基础——不是可选的。
原文链接: Scaling AI Agents with Celery, RabbitMQ & Redis in Python
汇智网翻译整理,转载请标明出处