上篇我们介绍了 Celery 的环境搭建以及基础入门,这篇主要分享如何在 Python+Flask 项目中使用。
1、新建 flask 项目,目录结构如下
Common 目录下存放 model 层做数据库关系映射以及公共方法
Config 目录下存放项目配置以及 celery 配置
Controllers 目录下存放业务控制方法以及注册路由
Tasks 目录下存放异步任务方法
具体代码如下:
Celery_settings.py
# celery配置
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区
CELERY_ENABLE_UTC = False # 禁用UTC,配合CELERY_TIMEZONE使用
BROKER_URL = "amqp://yyyyy:xxxxxxxxxx@192.168.a.bb:5672/" # broker地址
CELERY_RESULT_BACKEND = "yyyyy://:xxxxxxxxxx@192.168.3.53:6379/0" # result地址
CELERY_ROUTES = {
'run_api_job_delay': {'queue':'job1'},
'run_ui_job_delay': {'queue':'job2'},
}# 不同任务队列配置
Settings.py
#公用配置
DEBUG = True
SQLALCHEMY_ECHO = False
DB_HOST="192.168.a.bb"
DB_USER="root"
DB_PASSWORD="xxxxxxxxxx"
SQLALCHEMY_DATABASE_URI="mysql+pymysql://"+DB_USER+":"+DB_PASSWORD+"@"+DB_HOST+":3306/rpa"
SQLALCHEMY_TRACK_MODIFICATIONS = True
SECRET_KEY = "xxxxxxxxx"
CORS_ALLOW_CREDENTIALS = True
CORS_ORIGIN_ALLOW_ALL = True
CSRF_ENABLED = True
Run_job.py
from flask importBlueprint
from flask import jsonify
from flask_restful import reqparse
from tasks.tasks import run_job_delay
runJob_page = Blueprint("runJob_page", __name__)
# 执行/调试场景测试
@runJob_page.route('/run_job', methods=['POST'])
#指定路由
def run_job():
parser = reqparse.RequestParser()
parser.add_argument('job_id',type=int)
args = parser.parse_args()
job_id = args.get('job_id')
_save_run(job_id)
return jsonify({'msg':"ok", "remark": "任务开始执行"})
def _save_run(job_id):
run_job_delay.delay(job_id)
Tasks.py
from application importcelery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
#日志输出
@celery.task(name='run_api_job_delay')
def run_api_job_delay(job_id):
print('执行异步任务')
Application.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_cors import *
from celery import Celery
from config import celery_setting
from flask_httpauth import HTTPBasicAuth
app = Flask(__name__)
#实例化应用对象
celery = Celery(app.name)
# 创建celery实例
celery.config_from_object(celery_setting)
#读取celery配置
CORS(app, supports_credentials=True)
app.config.from_pyfile("config\\settings.py")
db = SQLAlchemy(app)
auth = HTTPBasicAuth()
manager.py
from application importapp, manager
from flask_script import Command
from www import *
from gevent import pywsgi
# create_table
@Command
def create_all():
from application import db
db.create_all()
manager.add_command("create_all", create_all)
if __name__ == "__main__":
# 测试
app.run(host='0.0.0.0', debug=True,threaded=True, port=8888)
# 生产
# server =pywsgi.WSGIServer(('192.168.a.bb', 5000), app)
# server.serve_forever()
2、创建 worker 项目,配置项和 server 项目相同
Controllers/runJob.py:
celery =Celery('worker', broker=settings.BROKER_URL, backend=settings.RESULT_BACKEND)
#实例化对象
@celery.task(name='run_job_delay')
def run_job_delay(job_id):
Run_job(job_Id)
3、启动 server 项目
python manager.py
4、启动 worker 项目
Q 参数可以指定监听队列
celery worker -A worker -l info -P eventlet -Q job1
5、工作流简述
请求 run_job 接口,通过 url 映射到对应 view 函数;view 函数执行业务处理后推送异步方法到指定队列;worker 监听指定队列中消息并消费,将结果保存;
如果平台是综合多种类型的自动化任务并且需要指定 worker 消费的话,流转应该是下图这样。
例如 worker1 部署接口自动化执行服务,worker2 部署 UI 自动化执行服务。
最后
整体来讲 Celery 使用上手难度 ★★☆☆☆,容易出问题的地方一般在启动时:worker 以及 -A 后边路径,下篇分享如何使用 Celery 实现动态定时任务的配置。
相关阅读: