队列选择:Celery/Dramatiq/RQ

使用 Celery、Dramatiq 和 RQ 构建可靠 Python 任务队列的七个关键选择——涵盖消息代理、重试、幂等性、超时、并发、调度和运维。

队列选择:Celery/Dramatiq/RQ
AI模型价格对比 | AI工具导航 | ONNX模型库 | Vibe Coding教程 | Tripo 3D | Meshy AI | ElevenLabs | KlingAI | ArtSpace | Phot.AI | InVideo

你有了一个 Web 应用、一个数据库,还有一堆不断增长的"稍后再处理"的任务:邮件、缩略图、支付、ETL。后台任务听起来很简单——直到一次部署上线、工作进程重启,队列开始失控。

说实话:可靠性就是你在前期做出的几个看似无聊的选择的总和。以下是七个最重要的选择,附带 CeleryDramatiqRQ 的具体实践模式。

1、根据你能容忍的故障来选择消息代理

你要决定的是: 消息存储在哪里,以及它们如何在崩溃中存活。

  • Redis: 快速、简单、广泛使用。大多数应用的绝佳默认选择。支持持久化,但要注意全内存压力和淘汰策略。
  • RabbitMQ: 持久化队列、细粒度路由(主题、优先级),以及你可以理解的消息确认机制。运维开销更大,控制力也更强。
  • Postgres/SQS/其他: 用延迟换取持久性或云端简洁性。

Celery

# settings.py
broker_url = "redis://localhost:6379/0"          # or amqp:// for RabbitMQ
result_backend = "redis://localhost:6379/1"
task_acks_late = True                             # ack after work (safer)
worker_prefetch_multiplier = 1                    # fair dispatch under spikes

Dramatiq

import dramatiq
from dramatiq.brokers.redis import RedisBroker
broker = RedisBroker(url="redis://localhost:6379/0")
dramatiq.set_broker(broker)

RQ

from redis import Redis
from rq import Queue
redis = Redis(host="localhost", port=6379, db=0)
q = Queue("default", connection=redis)            # messages stored in Redis lists

经验法则: 从 Redis 开始。如果你需要严格的路由、死信队列或长期持久化保证,就切换到 RabbitMQ(Celery 在这方面表现优异)。

2、重试:至少一次投递需要幂等的任务处理

任务一定会被执行两次。断电、工作进程被杀、超时、网络抖动——每个技术栈都会遇到这些问题。接受至少一次投递,并让任务具有幂等性

Celery

from celery import shared_task

@shared_task(
    autoretry_for=(Exception,),
    retry_backoff=2,        # exponential
    retry_jitter=True,
    max_retries=5
)
def charge(user_id, order_id):
    # use a business idempotency key, not a random UUID
    key = f"charge:{order_id}"
    if already_processed(key):
        return "skip"
    return actually_charge(user_id, order_id)

Dramatiq

import dramatiq

@dramatiq.actor(max_retries=5, min_backoff=2000)  # ms
def send_email(message_id):
    if seen(message_id): return
    deliver()

RQ

from rq import Retry
q.enqueue(process_video, video_id, retry=Retry(max=5, interval=[2,4,8,16]))

不会过时的幂等性模式:使用业务键(order_id)+ 一个简短的 Redis SETNX 锁,或者一个数据库 outbox 表来标记完成状态,而不是无法重新计算的随机令牌。

3、消息确认、可见性和超时(避免任务消失)

两个极端都会破坏系统:确认得太早(崩溃时丢失工作)和确认得太晚(任务重复投递风暴)。

  • 工作完成后确认: 在 Celery 中,task_acks_late=True 意味着消息仅在任务返回后才被确认。配合 worker_prefetch_multiplier=1 使用,避免单个工作进程囤积任务。
  • 时间限制: 果断地终止卡住的代码。

Celery

task_time_limit = 120          # hard kill (sec)
task_soft_time_limit = 90      # gives a chance to clean up

Dramatiq 使用中间件驱动的超时机制;在任务调用外部服务时应用它们。

RQ

q.enqueue(run_report, job_timeout=120)  # visibility window

思维模型: 每个任务都应该有 (1) 一个时间预算,和 (2) 一个清晰的确认语义方案。

4、并发模型:CPU 密集型 vs I/O 密集型

选择与任务类型匹配的工作进程模式。

  • CPU 密集型: 多个进程;避免 GIL 竞争。
  • I/O 密集型: 线程或异步可以提高并行度。
  • 混合型: 拆分队列(如 cpuio),让每个队列有独立调优的工作进程池。

Celery

# CPU
celery -A app worker -Q cpu -l info --concurrency=4 --pool=prefork
# I/O
celery -A app worker -Q io  -l info --concurrency=100 --pool=gevent

Dramatiq

dramatiq app --processes 4 --threads 8  # processes × threads

RQ 的工作进程是基于进程的;每个队列运行多个工作进程来扩展。

防护措施: 不要把所有东西都放在一个大队列里。按延迟预算资源特征来拆分。

5、负载、序列化和边界

当任务携带整个数据库记录时,队列就会变得不可靠。发送标识符,而不是大块数据。在工作进程中用最新数据重新获取。

  • 序列化: 默认使用 JSON 或 msgpack;避免使用原始 pickle 以确保跨版本安全。
  • 最大大小: 保持负载小巧(考虑 < 64 KB)。将大型制品存储在对象存储中,只传递键。

Celery

task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]

DramatiqRQ 默认对 JSON 友好;保持类型的简单性。

6、调度和限流,避免意外

周期性工作需要自己的可靠性方案。

  • Celery Beat: 类 cron 的调度器;如果需要高可用,在领导者选举后面运行两个 Beat 实例。
  • Dramatiq: 使用调度器插件(通常是 crontab 中间件)。
  • RQ Scheduler: 简单有效,适合直接的任务调度。

Celery

from celery.schedules import crontab

beat_schedule = {
    "nightly-reports": {
        "task": "reports.generate",
        "schedule": crontab(minute=0, hour=1),
        "options": {"queue": "cpu"}
    }
}

限流(保护依赖):Celery 有针对每个任务的 rate_limit="10/m";Dramatiq/RQ 可以通过中间件或应用层限流来执行限制。在调用第三方 API 时务必应用限流。

7、可观测性、反压和对运维友好的实践

你看不见的东西就无法修复。

  • 仪表盘:Celery → Flower(队列、工作进程、重试)。 RQ → RQ Dashboard(任务、失败)。 Dramatiq → 社区仪表盘可用;结构化日志 + 指标是你的基线。
  • 需要跟踪的指标: 入队数、开始数、成功数、失败数、重试次数、每个队列中最旧任务的年龄、工作进程心跳。
  • 反压: 当最旧任务的年龄增长时,减轻负载:返回 429 状态码、降低生产者速度或扩展工作进程。
  • 死信队列: 对于有害消息,路由到 DLQ;在增长时告警。

结构化日志示例

log = {"event": "task_done", "task": "generate_pdf", "ms": 842, "retries": 1, "ok": True}
print(json.dumps(log))

一个小习惯,大回报:在每行日志中包含 queuetask 和一个稳定的业务键

8、案例简析:"卡住的缩略图生成器"

症状: 上传流量激增使队列爆炸。工作进程看起来很忙,但缩略图要么迟到,要么重复生成。

基于七个选择指导的修复:

  1. 拆分队列,按任务类型(io:thumbnailscpu:filters)。
  2. Celery 延迟确认配合 prefetch=1 阻止了囤积;超时机制终止了卡住的图像解码器。
  3. 幂等键 = 原始文件的 SHA 值;工作进程在上传前在 Redis 中写入"完成"标志。
  4. 对外部存储 API 限流到 20 次/分钟,以避免级联限流。
  5. Flower 告警在"最旧任务年龄 > 60 秒"时触发自动扩容。

结果: 首张缩略图的 p95 时间从 7.8 秒降至 1.6 秒;重复任务消失。

9、快速参考:何时选择什么

  • Celery: 功能丰富(路由、限流、chords/chains、Beat)。当你想要一个功能齐全的任务系统且能承受一些复杂性时最佳。搭配 RabbitMQ 可获得出色的持久性。
  • Dramatiq: 精简、高性能、现代的人体工程学设计;如果你偏好极简主义和显式中间件,这是绝佳的默认选择。
  • RQ: 最简单的心智模型;适合已经在使用 Redis 的中小型应用,你更看重可读性而非高级编排能力。

10、你会反复使用的入门代码片段

Celery 应用(最小化,合理的默认配置)

from celery import Celery

celery = Celery(__name__, broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")
celery.conf.update(
    task_acks_late=True,
    worker_prefetch_multiplier=1,
    task_time_limit=120,
    task_soft_time_limit=90,
    task_serializer="json",
    accept_content=["json"]
)

Dramatiq actor

import dramatiq
from dramatiq.brokers.redis import RedisBroker

dramatiq.set_broker(RedisBroker())

@dramatiq.actor(max_retries=5, min_backoff=1000)
def resize_image(file_key: str):
    if already_done(file_key): return
    # fetch from storage and process

RQ enqueue

from rq import Queue, Retry
q.enqueue(send_email, user_id, retry=Retry(max=5), job_timeout=60)

11、结束语

可靠的队列不是魔法。它们是七个深思熟虑的选择的结果:消息代理、重试、确认/超时、并发、负载、调度和可观测性。每个选择做一次,记录在你的代码仓库中,你的后台任务就会在你发布功能的同时安静地完成它们的工作。


原文链接: Reliable Python Queues: 7 Celery/Dramatiq/RQ Choices

汇智网翻译整理,转载请标明出处