背景

上篇我们介绍了 Celery 的环境搭建以及基础入门,这篇主要分享如何在 Python+Flask 项目中使用。

步骤

1、新建 flask 项目,目录结构如下

Common 目录下存放 model 层做数据库关系映射以及公共方法
Config 目录下存放项目配置以及 celery 配置
Controllers 目录下存放业务控制方法以及注册路由
Tasks 目录下存放异步任务方法
具体代码如下:

Celery_settings.py

# celery配置
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区
CELERY_ENABLE_UTC = False  # 禁用UTC,配合CELERY_TIMEZONE使用
BROKER_URL = "amqp://yyyyy:xxxxxxxxxx@192.168.a.bb:5672/"  # broker地址
CELERY_RESULT_BACKEND = "yyyyy://:xxxxxxxxxx@192.168.3.53:6379/0"  # result地址
CELERY_ROUTES = {
    'run_api_job_delay': {'queue':'job1'},
    'run_ui_job_delay': {'queue':'job2'},
}# 不同任务队列配置

Settings.py

#公用配置
DEBUG = True
SQLALCHEMY_ECHO = False
DB_HOST="192.168.a.bb"
DB_USER="root"
DB_PASSWORD="xxxxxxxxxx"
SQLALCHEMY_DATABASE_URI="mysql+pymysql://"+DB_USER+":"+DB_PASSWORD+"@"+DB_HOST+":3306/rpa"
SQLALCHEMY_TRACK_MODIFICATIONS = True
SECRET_KEY = "xxxxxxxxx"
CORS_ALLOW_CREDENTIALS = True
CORS_ORIGIN_ALLOW_ALL = True
CSRF_ENABLED = True

Run_job.py

from flask importBlueprint
from flask import jsonify
from flask_restful import reqparse
from tasks.tasks import run_job_delay

runJob_page = Blueprint("runJob_page", __name__)

# 执行/调试场景测试
@runJob_page.route('/run_job', methods=['POST'])
#指定路由
def run_job():
    parser = reqparse.RequestParser()
    parser.add_argument('job_id',type=int)
    args = parser.parse_args()
    job_id = args.get('job_id')
    _save_run(job_id)
    return jsonify({'msg':"ok", "remark": "任务开始执行"})

def _save_run(job_id):
    run_job_delay.delay(job_id)

Tasks.py

from application importcelery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
#日志输出
@celery.task(name='run_api_job_delay')
def run_api_job_delay(job_id):
    print('执行异步任务')

Application.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_cors import *
from celery import Celery
from config import celery_setting
from flask_httpauth import HTTPBasicAuth

app = Flask(__name__)
#实例化应用对象
celery = Celery(app.name)
# 创建celery实例
celery.config_from_object(celery_setting)
#读取celery配置
CORS(app, supports_credentials=True)
app.config.from_pyfile("config\\settings.py")
db = SQLAlchemy(app)
auth = HTTPBasicAuth()

manager.py

from application importapp, manager
from flask_script import Command
from www import *
from gevent import pywsgi

# create_table
@Command
def create_all():
    from application import db
    db.create_all()

manager.add_command("create_all", create_all)

if __name__ == "__main__":
    # 测试
    app.run(host='0.0.0.0', debug=True,threaded=True, port=8888)
    # 生产
    # server =pywsgi.WSGIServer(('192.168.a.bb', 5000), app)
    # server.serve_forever()

2、创建 worker 项目,配置项和 server 项目相同
Controllers/runJob.py:

celery =Celery('worker', broker=settings.BROKER_URL, backend=settings.RESULT_BACKEND)
#实例化对象

@celery.task(name='run_job_delay')
def run_job_delay(job_id):
   Run_job(job_Id)

3、启动 server 项目

python manager.py

4、启动 worker 项目
Q 参数可以指定监听队列

celery worker -A worker -l info -P eventlet -Q job1

5、工作流简述
请求 run_job 接口,通过 url 映射到对应 view 函数;view 函数执行业务处理后推送异步方法到指定队列;worker 监听指定队列中消息并消费,将结果保存;
如果平台是综合多种类型的自动化任务并且需要指定 worker 消费的话,流转应该是下图这样。
例如 worker1 部署接口自动化执行服务,worker2 部署 UI 自动化执行服务。

最后
整体来讲 Celery 使用上手难度 ★★☆☆☆,容易出问题的地方一般在启动时:worker 以及 -A 后边路径,下篇分享如何使用 Celery 实现动态定时任务的配置。

相关阅读:


↙↙↙阅读原文可查看相关链接,并与作者交流