django_celery_beat

django_celery_beat 使用

引入django-celery-beat包

INSTALLED_APPS = [
    ...
    'django_celery_beat'
    ...
]

定义celery app

from __future__ import absolute_import
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')
app = Celery('my_celery')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

定义配置文件:

# celery相关配置
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# CELERY_RESULT_BACKEND = 'django-db' 
INSTALLED_APPS = [
    ...
    'django_celery_results'
    ...
]
#使用database作为结果存储 向installed_apps中添加django_celery_results

定义任务:

from logging import info
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from celery import shared_task
import json


def my_task(string):
    schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS)

    PeriodicTask.objects.create(
        interval=schedule,
        name='add device %s' % string,
        task='celery_app.task.test_func',
        args=json.dumps([string])
    )

@shared_task
def test_func(string):
    info(string)

相关Model:

class IntervalSchedule(models.Model):
    """Schedule executing on a regular interval.
    Example: execute every 2 days
    every=2, period=DAYS
    """
    DAYS = DAYS
    HOURS = HOURS
    MINUTES = MINUTES
    SECONDS = SECONDS
    MICROSECONDS = MICROSECONDS
    ...

class CrontabSchedule(models.Model):
    """Timezone Aware Crontab-like schedule.

    Example:  Run every hour at 0 minutes for days of month 10-15
    minute="0", hour="*", day_of_week="*",
    day_of_month="10-15", month_of_year="*"
    """
    #
    minute = models.CharField(...)
    hour = models.CharField(...)
    day_of_week = models.CharField(...)
    day_of_month = models.CharField(...)
    month_of_year = models.CharField(...)
    timezone = timezone_field.TimeZoneField(...)

class ClockedSchedule(models.Model):
    """clocked schedule."""
    clocked_time = models.DateTimeField(
        verbose_name=_('Clock Time'),
        help_text=_('Run the task at clocked time'),
    )
    enabled = models.BooleanField(...)

python -m celery -A celery_app worker -l info -B -c 5