队列选择:Celery/Dramatiq/RQ
使用 Celery、Dramatiq 和 RQ 构建可靠 Python 任务队列的七个关键选择——涵盖消息代理、重试、幂等性、超时、并发、调度和运维。
AI模型价格对比 | AI工具导航 | ONNX模型库 | Vibe Coding教程 | Tripo 3D | Meshy AI | ElevenLabs | KlingAI | ArtSpace | Phot.AI | InVideo
你有了一个 Web 应用、一个数据库,还有一堆不断增长的"稍后再处理"的任务:邮件、缩略图、支付、ETL。后台任务听起来很简单——直到一次部署上线、工作进程重启,队列开始失控。
说实话:可靠性就是你在前期做出的几个看似无聊的选择的总和。以下是七个最重要的选择,附带 Celery、Dramatiq 和 RQ 的具体实践模式。
1、根据你能容忍的故障来选择消息代理
你要决定的是: 消息存储在哪里,以及它们如何在崩溃中存活。
- Redis: 快速、简单、广泛使用。大多数应用的绝佳默认选择。支持持久化,但要注意全内存压力和淘汰策略。
- RabbitMQ: 持久化队列、细粒度路由(主题、优先级),以及你可以理解的消息确认机制。运维开销更大,控制力也更强。
- Postgres/SQS/其他: 用延迟换取持久性或云端简洁性。
Celery
# settings.py
broker_url = "redis://localhost:6379/0" # or amqp:// for RabbitMQ
result_backend = "redis://localhost:6379/1"
task_acks_late = True # ack after work (safer)
worker_prefetch_multiplier = 1 # fair dispatch under spikes
Dramatiq
import dramatiq
from dramatiq.brokers.redis import RedisBroker
broker = RedisBroker(url="redis://localhost:6379/0")
dramatiq.set_broker(broker)
RQ
from redis import Redis
from rq import Queue
redis = Redis(host="localhost", port=6379, db=0)
q = Queue("default", connection=redis) # messages stored in Redis lists
经验法则: 从 Redis 开始。如果你需要严格的路由、死信队列或长期持久化保证,就切换到 RabbitMQ(Celery 在这方面表现优异)。
2、重试:至少一次投递需要幂等的任务处理
任务一定会被执行两次。断电、工作进程被杀、超时、网络抖动——每个技术栈都会遇到这些问题。接受至少一次投递,并让任务具有幂等性。
Celery
from celery import shared_task
@shared_task(
autoretry_for=(Exception,),
retry_backoff=2, # exponential
retry_jitter=True,
max_retries=5
)
def charge(user_id, order_id):
# use a business idempotency key, not a random UUID
key = f"charge:{order_id}"
if already_processed(key):
return "skip"
return actually_charge(user_id, order_id)
Dramatiq
import dramatiq
@dramatiq.actor(max_retries=5, min_backoff=2000) # ms
def send_email(message_id):
if seen(message_id): return
deliver()
RQ
from rq import Retry
q.enqueue(process_video, video_id, retry=Retry(max=5, interval=[2,4,8,16]))
不会过时的幂等性模式:使用业务键(order_id)+ 一个简短的 Redis SETNX 锁,或者一个数据库 outbox 表来标记完成状态,而不是无法重新计算的随机令牌。
3、消息确认、可见性和超时(避免任务消失)
两个极端都会破坏系统:确认得太早(崩溃时丢失工作)和确认得太晚(任务重复投递风暴)。
- 工作完成后确认: 在 Celery 中,
task_acks_late=True意味着消息仅在任务返回后才被确认。配合worker_prefetch_multiplier=1使用,避免单个工作进程囤积任务。 - 时间限制: 果断地终止卡住的代码。
Celery
task_time_limit = 120 # hard kill (sec)
task_soft_time_limit = 90 # gives a chance to clean up
Dramatiq 使用中间件驱动的超时机制;在任务调用外部服务时应用它们。
RQ
q.enqueue(run_report, job_timeout=120) # visibility window
思维模型: 每个任务都应该有 (1) 一个时间预算,和 (2) 一个清晰的确认语义方案。
4、并发模型:CPU 密集型 vs I/O 密集型
选择与任务类型匹配的工作进程模式。
- CPU 密集型: 多个进程;避免 GIL 竞争。
- I/O 密集型: 线程或异步可以提高并行度。
- 混合型: 拆分队列(如
cpu、io),让每个队列有独立调优的工作进程池。
Celery
# CPU
celery -A app worker -Q cpu -l info --concurrency=4 --pool=prefork
# I/O
celery -A app worker -Q io -l info --concurrency=100 --pool=gevent
Dramatiq
dramatiq app --processes 4 --threads 8 # processes × threads
RQ 的工作进程是基于进程的;每个队列运行多个工作进程来扩展。
防护措施: 不要把所有东西都放在一个大队列里。按延迟预算和资源特征来拆分。
5、负载、序列化和边界
当任务携带整个数据库记录时,队列就会变得不可靠。发送标识符,而不是大块数据。在工作进程中用最新数据重新获取。
- 序列化: 默认使用 JSON 或 msgpack;避免使用原始 pickle 以确保跨版本安全。
- 最大大小: 保持负载小巧(考虑 < 64 KB)。将大型制品存储在对象存储中,只传递键。
Celery
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
Dramatiq 和 RQ 默认对 JSON 友好;保持类型的简单性。
6、调度和限流,避免意外
周期性工作需要自己的可靠性方案。
- Celery Beat: 类 cron 的调度器;如果需要高可用,在领导者选举后面运行两个 Beat 实例。
- Dramatiq: 使用调度器插件(通常是 crontab 中间件)。
- RQ Scheduler: 简单有效,适合直接的任务调度。
Celery
from celery.schedules import crontab
beat_schedule = {
"nightly-reports": {
"task": "reports.generate",
"schedule": crontab(minute=0, hour=1),
"options": {"queue": "cpu"}
}
}
限流(保护依赖):Celery 有针对每个任务的 rate_limit="10/m";Dramatiq/RQ 可以通过中间件或应用层限流来执行限制。在调用第三方 API 时务必应用限流。
7、可观测性、反压和对运维友好的实践
你看不见的东西就无法修复。
- 仪表盘:Celery → Flower(队列、工作进程、重试)。 RQ → RQ Dashboard(任务、失败)。 Dramatiq → 社区仪表盘可用;结构化日志 + 指标是你的基线。
- 需要跟踪的指标: 入队数、开始数、成功数、失败数、重试次数、每个队列中最旧任务的年龄、工作进程心跳。
- 反压: 当最旧任务的年龄增长时,减轻负载:返回 429 状态码、降低生产者速度或扩展工作进程。
- 死信队列: 对于有害消息,路由到 DLQ;在增长时告警。
结构化日志示例
log = {"event": "task_done", "task": "generate_pdf", "ms": 842, "retries": 1, "ok": True}
print(json.dumps(log))
一个小习惯,大回报:在每行日志中包含 queue、task 和一个稳定的业务键。
8、案例简析:"卡住的缩略图生成器"
症状: 上传流量激增使队列爆炸。工作进程看起来很忙,但缩略图要么迟到,要么重复生成。
基于七个选择指导的修复:
- 拆分队列,按任务类型(
io:thumbnails、cpu:filters)。 - Celery 延迟确认配合
prefetch=1阻止了囤积;超时机制终止了卡住的图像解码器。 - 幂等键 = 原始文件的 SHA 值;工作进程在上传前在 Redis 中写入"完成"标志。
- 对外部存储 API 限流到 20 次/分钟,以避免级联限流。
- Flower 告警在"最旧任务年龄 > 60 秒"时触发自动扩容。
结果: 首张缩略图的 p95 时间从 7.8 秒降至 1.6 秒;重复任务消失。
9、快速参考:何时选择什么
- Celery: 功能丰富(路由、限流、chords/chains、Beat)。当你想要一个功能齐全的任务系统且能承受一些复杂性时最佳。搭配 RabbitMQ 可获得出色的持久性。
- Dramatiq: 精简、高性能、现代的人体工程学设计;如果你偏好极简主义和显式中间件,这是绝佳的默认选择。
- RQ: 最简单的心智模型;适合已经在使用 Redis 的中小型应用,你更看重可读性而非高级编排能力。
10、你会反复使用的入门代码片段
Celery 应用(最小化,合理的默认配置)
from celery import Celery
celery = Celery(__name__, broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")
celery.conf.update(
task_acks_late=True,
worker_prefetch_multiplier=1,
task_time_limit=120,
task_soft_time_limit=90,
task_serializer="json",
accept_content=["json"]
)
Dramatiq actor
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker())
@dramatiq.actor(max_retries=5, min_backoff=1000)
def resize_image(file_key: str):
if already_done(file_key): return
# fetch from storage and process
RQ enqueue
from rq import Queue, Retry
q.enqueue(send_email, user_id, retry=Retry(max=5), job_timeout=60)
11、结束语
可靠的队列不是魔法。它们是七个深思熟虑的选择的结果:消息代理、重试、确认/超时、并发、负载、调度和可观测性。每个选择做一次,记录在你的代码仓库中,你的后台任务就会在你发布功能的同时安静地完成它们的工作。
原文链接: Reliable Python Queues: 7 Celery/Dramatiq/RQ Choices
汇智网翻译整理,转载请标明出处