背景

因为有些任务需要定时执行,且希望做成通过页面动态可配。基础的 celery 的定时任务配置都是通过配置文件或者代码编写后在启动时加载一次,而修改配置后需要重启服务才可以生效。

希望达到的效果是通过 WEB 配置后不重启立即生效,经过调研(google 搜索)python 这方面确实不是很多。最后决定使用 django-celery-beat 插件使用,整体工作流如下。
1、通过 WEB 操作触发接口请求发送数据到服务端
2、服务端对数据进行处理并保存至数据库
3、Beat 检测通过循环时间检测,通过对比当前时间以及任务时间判断如果任务到了需要执行的时间则推送到 MQ 队列中
4、Worker 监听到 MQ 队列中的 task,执行。

步骤:

1、pip 安装 django-celery-beat

pip install django-celery-beat

2、初始化 django 项目(安装 django 不详细描述了)

django-admin startproject beat

3、注册插件,在 settings.py 文件 INSTALLED_APPS 中添加 django_celery_beat

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_celery_beat'
]

4、settings.py 文件中修改 MYSQL 配置

DATABASES = {
    'default': {
        'ENGINE':'django.db.backends.mysql',  # 数据库引擎
        'NAME': 'celery-schedule',  # 数据库名
        'USER': 'root',  # 账号
        'PASSWORD': '123456',  # 密码
        'HOST': '192.168.8.103',  # HOST
        'POST': 3306,  # 端口
    }
}

5、settings.py 中新增配置使生效

CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler'

6、新增 celeryconfig.py 配置文件,内容见下

import platform, os
#时区
CELERY_TIMEZONE='Asia/Shanghai'
broker_url = "amqp://admin:xxxxxx@xxx.xxx.xxx.xxxx:5672/"
result_backend = "redis://:xxxxxxxxx@xxx.xxx.x.xxx:6379/0"

beat_max_loop_interval = 10
worker_max_tasks_per_child = 10
# 指定任务接受的序列化类型
accept_content = ['json']
# 指定任务序列化方式
task_serializer = 'json'
# 指定结果序列化的方式
result_serializer = 'json'
# 任务过期时间,celery任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES = 60 * 30
if platform.system() == 'Windows':
    # Celery在Windows环境下运行需要设置这个变量,否则调用任务会报错
   os.environ['FORKED_BY_MULTIPROCESSING'] = '1'

7、新增 celery.py 文件

from __future__import absolute_import, unicode_literals
from celery import Celery
import os
from celery_beat import settings

# 为celery指定DJANGO_SETTINGS_MODULE环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_beat.settings')
# 创建celery的实例app,要在指定DJANGO_SETTINGS_MODULE环境变量之后
app = Celery('tasks')
app.config_from_object('celery_beat.celeryconfig',
                      namespace='CELERY')  # 命名空间namespace='CELERY'定义所有与celery相关的配置的键名要以'CELERY_'为前缀
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 自动加载任务

8、初始化数据库

python manage.py migrate

数据库表分析:

django_celery_beat_periodictask 表:具体某个待执行的任务表,需要与其他表关联
关键字段解析如下

字段名 含义
Name 任务名称
Task Celery 任务名称如:run_api_job_delay
Args 参数 如:[31,1,2]
Queue 队列名称
Enabled 是否启用 1 是 0 否
Last_run_at 最后一次运行时间
Total_run_count 总运行次数
Crontab_id django_celery_beat_crontabschedule 表外键,时间配置关联
Interval_id django_celery_beat_intervalschedule 表外键,时间配置关联
Expire_seconds 有效时长

django_celery_beat_crontabschedule 表:配置执行时间规则,同 linux 下 crontab 定时任务

比如:
第一条代表每天的 4 点执行
第二条代码 1 月 15 日 1 点 2 分执行

启动 BEAT 命令

celery -A celery_beat beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

上面这些步骤都完成后,需要进行重启。
经过试验发现第三张表 django_celery_beat_periodictasks 表中有一个字段名称是 last_update 最后更新时间,通过 server 变更任务表或者时间配置表后手动更新下这个 last_update 字段。BEAT 服务就会打印如下一段日志

[2022-01-14 11:29:42,476: INFO/MainProcess]DatabaseScheduler: Schedule changed.

另外:

django_celery_beat_intervalschedule 表是根据日出日落设置
django_celery_beat_solarschedule 表是根据经纬度设置

如果不想直接查看 mq 监控任务的,Celery 也可以使用 flower 监控
安装:

pip3 install flower

启动

celery flower --port=5555 --broker=amqp://admin:aaaaa//xxx.xxx.xx.xx:5672

浏览器访问 ip:5555 端口即可打开监控页面

相关阅读:


↙↙↙阅读原文可查看相关链接,并与作者交流