基于 Redis 配置 Celery
以下撇开 Web 框架,介绍基于 Redis 配置Celery任务的方法。
|
1
2
|
pip install celery[redis]
|
项目结构
|
1
2
3
4
5
6
7
8
9
|
$ tree your_project
your_project
├── init.py
├── main.py
├── celery.py
└── tasks.py
0 directories, 4 files
|
其中,main.py
是触发 Task 的业务代码。 当然,文件名可以随意改。celery.py
是Celery的 app 定义的位置,tasks.py
是 Task 定义的位置,文件名不建议修改。
配置 Celery
在celery.py
中写入如下代码:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from celery import Celery
from .settings import REDIS_URL
APP = Celery(
main=package,
broker=REDIS_URL,
backend=REDIS_URL,
include=[f’{package}.tasks’],
)
APP.conf.update(task_track_started=True)
|
其中,REDIS_URL
从同一的配置settings.py
中引入, 形式大概是redis://localhost:6379/0
。 这里既用 Redis 来当broker
,又用来当backend
。 即,既当消息队列,又当结果反馈的数据库(默认仅保存 1 天)。
在include=
,需要填一个下游 worker 的包名列表。 这里选择了同一个包的tasks.py
文件。
额外设置的task_track_started
,是命令 Worker 反馈STARTED
状态。 默认情况下,是无法知道任务什么时候开始执行的。
编写任务并调用
在tasks.py
文件中,添加异步任务的实现。
|
1
2
3
4
5
6
|
from .celery import APP
@APP.task
def do_sth():
pass
|
在需要发起任务的地方,用.apply_async
可以触发异步调用。 即,实际只是向消息队列发送消息,真正的执行操作在远程。
|
1
2
3
4
5
6
7
|
from celery.result import AsyncResult
from .tasks imprt do_sth
result = do_sth.apply_async()
assert isinstance(result, AsyncResult)
|
运行 Worker:
|
1
2
|
celery -A your_project worker
|
运行原理
一次 Task 从触发到完成,序列图如下:
其中,main
代表业务代码主进程。 它可能是 Django、Flask 这类 Web 服务,也可能是一个其它类型的进程。worker
就是指 Celery 的 Worker。
main
发送消息后,会得到一个AsyncResult
,其中包含task_id
。 仅通过task_id
,也可以自己构造一个AsyncResult
,查询相关信息。 其中,代表运行过程的,主要是state
。
worker
会持续保持对 Redis(或其它消息队列,如 RabbitMQ)的关注,查询新的消息。 如果获得新消息,将其消费后,开始运行do_sth
。 运行完成会把返回值对应的结果,以及一些运行信息,回写到 Redis(或其它 backend,如 Django 数据库等)上。 在系统的任何地方,通过对应的AsyncResult(task_id)
就可以查询到结果。
Celery Task 的状态
以下是状态图:
其中,除SUCCESS
外,还有失败(FAILURE
)、取消(REVOKED
)两个结束状态。 而RETRY
则是在设置了重试机制后,进入的临时等待状态。
另外,如果保存在 Redis 的结果信息被清理(默认仅保存 1 天),那么任务状态又会变成PENDING
。 这在设计上是个巨大的问题,使用时要做对应容错。
常见控制操作
|
1
2
3
4
5
6
7
8
|
result = AsyncResult(task_id)
阻塞等待返回
result.wait()
取消任务
result.revoke()
删除任务记录
result.forget()
|
有时,在业务主进程中需要等待异步运行的结果,这时需要使用wait
。 如果要取消一个排队中、或已执行的任务,则可以使用revoke
。 即使任务已经执行完成,也可以使用revoke
,但不会有任何变化。 如果需要提前删除任务记录,可以使用forget
。
原文来自:https://note.qidong.name/2020/08/celery-with-redis/ 作者:零壹軒