Django celery Peridoic Tasks
Preface
在最近自己做Django + Celery的项目的时候,有了定时任务的需求。查阅资料后发现Celery是有定时任务功能的,并且有个应用 dajngo-celery-beat
可以很好的满足Django和Celery定时任务的需求。
Celery
先简单介绍一下 Celery
。Celery
是一个基于Python开发的分布式异步消息队列,通过celery可以轻松的实现异步任务的处理。celery需要borker来进消息的发送和传递,需要backend将任务的执行结果存储起来。常用的Borker为RabbitMQ和Redis,当然也可以作为backend。
.. code:: python
from celery import Celery
app = Celery(‘hello’, broker=‘amqp://[email protected]//‘, backend=‘amqp://[email protected]//‘)
@app.task def hello(): return ‘hello world’
这里仅仅做简单的介绍,具体的仅参考 官方文档 <http://docs.celeryproject.org/en/master/index.html#>
__ 。
配置 Django + Celery
基础配置
pip install Django
pip install celery
pip install redis
注意:Celery仅仅支持Django1.8及以上版本,Django1.8以前的请使用Celery3.0。这里我使用的是Django1.11.2
Celery4.2.1 创建项目并建立 demo_celery
的APP后目录如下:
demo_celery
├── admin.py
├── apps.py
├── __init__.py
├── migrations
│ └── __init__.py
├── models.py
├── tests.py
└── views.py
demo
├── __init__.py
├── settings.py
├── urls.py
└── wsgi.py
manage.py
templates
venv
现在,我们有了一个名为 demo
的项目和名为 demo_celery_beat
的app(记得将该app添加到settings.py中)。接下来将celery配置进去。
在demo文件夹下创建 celery.py
文件,写入以下内容:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')
app = Celery('demo')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
注意将项目名改为你自己的
然后在demo文件夹下的 __init__.py
中添加以下代码:
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app', ]
然后在 settings.py
中添加如下配置(这里我的borker和backend都用了redis,你可以根据自己的情况调整):
CELERY_BROKER_URL = 'redis://:[email protected]:6379/0'
CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/0'
CELERY_RESULT_SERIALIZER = 'json'
之后在 demo_celery
文件夹下创建 tasks.py
:
from celery import shared_task
import time
@shared_task
def test():
print "hello"
time.sleep(2)
return "done"
然后在views.py中调用该任务:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.shortcuts import render
from django.http import HttpResponse
from .tasks import test
# Create your views here.
def index(request):
test.delay()
return HttpResponse("OK")
之后就是打通路由,能使用URL访问到该view就OK了。
一切配置完成后,我们先测试一下。 先后执行:
1、 python manage.py runserver
2、 celery -A demo worker -l info
访问 http://127.0.0.1:8000/demo
配置django-celery-beat
上一步实现了Djnago+Celery异步任务的实现,现在终于到了我们想要说的定时任务了。
pip install django-celery-beat
在 settings.py
中添加如下代码:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'demo_celery',
"django_celery_beat" # 添加django_celery_beat app
]
....
TIME_ZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False # 禁止使用UTC时间
CELERY_TIMEZONE = "Asia/Shanghai" # 设置时区
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERYD_MAX_TASKS_PER_CHILD = 3 # 每个worker最大执行数,长时间执行造成内存泄露
其实到这一步,我们已经可以通过Django自带的后台来创建定时任务了。
python manage.py migrate
python manage.py createsuperuser
python manage.py runserver
celery -A demo worker -l info
celery -A demo beat -l info -S django
启动服务,登录后台 点击 Periodic tasks
创建定时任务:
http://127.0.0.1:8000/admin/django_celery_beat/periodictask/add/
可以看到,已经可以定时执行了。但是到现在还远没有结束,我们需要在自己的代码里去实现,虽然Django的后台挺好用的。通过阅读文档和查看源码,发现django-celery-beat 主要由以下五个Moudle组成
PeriodicTask 这个moudle定义了一个单独的任务,必须和
IntervalSchedule
、CrontabSchedule
、SolarSchedule
等计划相关联。PeriodicTasks 这个moudle仅仅用作索引来跟踪计划的改变,从而告知服务器重新加载计划。
IntervalSchedule 定义一个任务的执行频率,每隔(every) DAYS, HOURS, MINUTES, SECONDS, MICROSECONDS 去执行一次任务。
CrontabSchedule 和Linux中的计划任务相同,可以指定特定的时间去执行任务。
SolarSchedule 如果你想任务依据日出、日落、黄昏、黎明时间来执行,你可以使用 solar 调度器类型。
Example
一个每隔10秒运行一次的任务
from django_celery_beat.models import PeriodicTasks, PeriodicTask, IntervalSchedule
# Create your views here.
def index(request):
# 创建Interval对象
schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS)
# 创建任务
PeriodicTask.objects.create(interval=schedule, name="demo2", task="demo_celery.tasks.test")
return HttpResponse("OK")
这是一个非常基本的一个任务,还可以加很多参数,如下一个例子
需要传入参数, 创建任务后,2秒钟运行,每隔10秒执行一次,一次性任务
任务修改为:
from celery import shared_task
import time
@shared_task
def test(name):
print "hello" + name
time.sleep(2)
return "done"
view中的代码:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.shortcuts import render
from django.http import HttpResponse
from .tasks import test
from django_celery_beat.models import PeriodicTasks, PeriodicTask, IntervalSchedule
from datetime import datetime, timedelta
# Create your views here.
def index(request):
# 创建Interval对象
schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS)
# 创建任务
PeriodicTask.objects.create(interval=schedule, name="demo3", task="demo_celery.tasks.test", one_off=True, args=[],
kwargs='{"name":"world"}', start_time=(datetime.now() + timedelta(seconds=2)))
return HttpResponse("OK")
创建一个cron任务 每周每天每小时的第1分钟执行一次
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.shortcuts import render
from django.http import HttpResponse
from .tasks import test
from django_celery_beat.models import PeriodicTasks, PeriodicTask, CrontabSchedule
from datetime import datetime, timedelta
# Create your views here.
def index(request):
# 创建cron对象
crontab, created = CrontabSchedule.objects.get_or_create(minute=1, timezone="Asia/Shanghai")
# 创建任务
PeriodicTask.objects.create(crontab=crontab, name="demo5", task="demo_celery.tasks.test", one_off=False, args=[],
kwargs='{"name":"world"}', start_time=datetime.now())
return HttpResponse("OK")
Note
官方文档里给出的警告,大概意思就是,如果你Djnago项目更改了时区,你的定时任务还会使用的是以前的时区,你可以使用如下方法改变。
但是我,按照官方给出的代码老是报错,看了源码才知道,该方法需要传入一个对象,并且还要 no_change
这个选项, PeriodicTask
里刚好有这个,于是就都明白了。改为如下代码:
PeriodicTask.objects.all().update(last_run_at=None)
periodic = PeriodicTask.objects.all()
for p in periodic:
PeriodicTasks.changed(p)
到此为止,django-celery-beat 的简单介绍就这样了,如果你想看更多的内容建议阅读以下官方文档。
Reference
[1] Periodic Tasks — Celery 4.3.0rc2 documentation