背景

在去年编写自动化测试平台的时候,因为存在发送邮件、异步执行自动化任务、执行定时任务、模块解耦等需求。需要使用 MQ,我选择的是 RabbitMQ。
但是另一个问题就是缺少一个管理任务的系统,因为开发语言是 Python,我选择了 Celery。

Celery 名词解释

名词 解释
Task 任务,如定时任务和异步任务
Broker 消息队列,可以选择 Redis 或者 mq 更推荐 mq
Worker 消费者,监听 mq 收到消息后执行
Backend 存储任务执行结果,一般可选 redis
Beat 定时任务配置以及执行模块

架构如下图

安装

1、 linux 下使用 Docker 安装 Redis 和 RabbitMQ

docker run -d --name myredis -p 6379:6379 redis --requirepass "123456aBaB"
docker run -d --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e 
RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

2、 Python 安装 Celery

pip install celery==4.4.7

3、编写 tasks.py

from celery import Celery
#broker是mq的地址,backend是redis的
celery = Celery('tasks', broker="amqp://admin:admin@192.168.3.53:5672/",
                backend="redis://:redis123456aB@192.168.3.53:6379/0")
#name为指定任务的名字
@celery.task(name='run_job_delay')
def run_job_delay(a, b):
    print('执行异步任务')
#使用delay发送异步任务
run_job_delay.delay(1, 2)

4、 编写 worker 代码

from celery import Celery
#broker是mq的地址,backend是redis的
celery = Celery('tasks', broker="amqp://admin:admin@192.168.3.53:5672/",
                backend="redis://:redis123456aB@192.168.3.53:6379/0")
@celery.task(name='run_job_delay')
def run_job_delay(a,b):
    print(a+b)

5、 启动 worker,在 worker 目录所在的 cmd 命令行下执行命令,我没写错是的在命令行下

celery worker -A worker -l info -P eventlet

worker 是 worker.py 的模块名字,-l 是日志级别 -P eventlet 是 windows 下启动报错所以加这个参数,需要自己手动安装一下 eventlet。
6、运行 task.py,发送任务到 MQ,此时可以打开 mq 的控制台看到一条消息插入了队列。
7、 worker 可以看到日志输出
收到了任务 ID 为"1bbf4e58-70ec-457c-9762-4ff0157863fd"
任务名称为"run_job_delay"的任务,worker 执行任务输出了结果 3。

[2022-01-02 17:12:01,435: INFO/MainProcess] celery@WIN-M3P18OTBSS1 ready.
[2022-01-02 17:12:01,443: INFO/MainProcess] pidbox: Connected to amqp://admin:**@192.168.3.53:5672//.
[2022-01-02 17:14:52,613: INFO/MainProcess] Received task: run_job_delay[1bbf4e58-70ec-457c-9762-4ff0157863fd]
[2022-01-02 17:14:52,614: WARNING/MainProcess] 3
[2022-01-02 17:14:52,616: INFO/MainProcess] Task run_job_delay[1bbf4e58-70ec-457c-9762-4ff0157863fd] succeeded in 0.0s: None

8、 实际项目中使用时我会在场景执行的方法中传递场景 ID 等参数到 worker,然后由 worker 进行异步执行。

最后

这篇完全属于 hello world 级别,有兴趣但是无任何经验的同学可以跟着试试,在接下来的几篇分享中会介绍:如何结合 flask 在项目中使用、如何指定队列、使用 beat 动态配置定时任务等实际案例。


↙↙↙阅读原文可查看相关链接,并与作者交流