基于 Redis 配置 Celery

以下撇开 Web 框架,介绍基于 Redis 配置Celery任务的方法。

|

1

2

|

pip install celery[redis]

|

项目结构

|

1

2

3

4

5

6

7

8

9

|

$ tree your_project

your_project

├── init.py

├── main.py

├── celery.py

└── tasks.py

0 directories, 4 files

|

其中,main.py是触发 Task 的业务代码。 当然,文件名可以随意改。celery.pyCelery的 app 定义的位置,tasks.py是 Task 定义的位置,文件名不建议修改。

配置 Celery

celery.py中写入如下代码:

|

1

2

3

4

5

6

7

8

9

10

11

12

13

|

from celery import Celery

from .settings import REDIS_URL

APP = Celery(

    main=package,

    broker=REDIS_URL,

    backend=REDIS_URL,

    include=[f’{package}.tasks’],

)

APP.conf.update(task_track_started=True)

|

其中,REDIS_URL从同一的配置settings.py中引入, 形式大概是redis://localhost:6379/0。 这里既用 Redis 来当broker,又用来当backend。 即,既当消息队列,又当结果反馈的数据库(默认仅保存 1 天)。

include=,需要填一个下游 worker 的包名列表。 这里选择了同一个包的tasks.py文件。

额外设置的task_track_started,是命令 Worker 反馈STARTED状态。 默认情况下,是无法知道任务什么时候开始执行的。

编写任务并调用

tasks.py文件中,添加异步任务的实现。

|

1

2

3

4

5

6

|

from .celery import APP

@APP.task

def do_sth():

    pass

|

在需要发起任务的地方,用.apply_async可以触发异步调用。 即,实际只是向消息队列发送消息,真正的执行操作在远程。

|

1

2

3

4

5

6

7

|

from celery.result import AsyncResult

from .tasks imprt do_sth

result = do_sth.apply_async()

assert isinstance(result, AsyncResult)

|

运行 Worker:

|

1

2

|

celery -A your_project worker

|

运行原理

一次 Task 从触发到完成,序列图如下:

其中,main代表业务代码主进程。 它可能是 Django、Flask 这类 Web 服务,也可能是一个其它类型的进程。worker就是指 Celery 的 Worker。

main发送消息后,会得到一个AsyncResult,其中包含task_id。 仅通过task_id,也可以自己构造一个AsyncResult,查询相关信息。 其中,代表运行过程的,主要是state

worker会持续保持对 Redis(或其它消息队列,如 RabbitMQ)的关注,查询新的消息。 如果获得新消息,将其消费后,开始运行do_sth。 运行完成会把返回值对应的结果,以及一些运行信息,回写到 Redis(或其它 backend,如 Django 数据库等)上。 在系统的任何地方,通过对应的AsyncResult(task_id)就可以查询到结果。

Celery Task 的状态

以下是状态图:

其中,除SUCCESS外,还有失败(FAILURE)、取消(REVOKED)两个结束状态。 而RETRY则是在设置了重试机制后,进入的临时等待状态。

另外,如果保存在 Redis 的结果信息被清理(默认仅保存 1 天),那么任务状态又会变成PENDING。 这在设计上是个巨大的问题,使用时要做对应容错。

常见控制操作

|

1

2

3

4

5

6

7

8

|

result = AsyncResult(task_id)

阻塞等待返回

result.wait()

取消任务

result.revoke()

删除任务记录

result.forget()

|

有时,在业务主进程中需要等待异步运行的结果,这时需要使用wait。 如果要取消一个排队中、或已执行的任务,则可以使用revoke。 即使任务已经执行完成,也可以使用revoke,但不会有任何变化。 如果需要提前删除任务记录,可以使用forget

原文来自:https://note.qidong.name/2020/08/celery-with-redis/ 作者:零壹軒