Usage

Tasks

Library adds two implementation of celery task class and improve it with several functions:

DjangoTask

Class which extends celery.Task and adds functions that simplify celery usage with Django framework.

Let’s create task with this base:

from django.contrib.auth.models import User
from django_celery_extensions.task import DjangoTask

@celery_app.task(
    base=DjangoTask,
    bind=True)
def notify_user(self, user_pk):
    user = User.objects.get(pk=user_pk)
    was_notified = notify(user)
    return was_notified

Because Django framework often uses atomic transactions it is recomended call celery tasks via on_commit function. DjangoTask simplifies it with several methods:

notify_user.apply_async_on_commit(args=(user.pk,))  # similar to apply_async but with on_commit
notify_user.delay_on_commit(user.pk)  # similar to delay but with on_commit
notify_user.apply_async_and_get_result(args=(user.pk,), timeout=None, propagate=True)  # call apply_async and wait specified timeout to task result. If result is not obtained to the specified time ``TimeoutError`` is raised

Default retry delays

If you want to use celery autoretry but you want different retry times for different attempts, you can use default_retry_delays:

@celery_app.task(
    base=DjangoTask,
    bind=True,
    autoretry_for=(NotifyException,),
    default_retry_delays=(60, 120, 180))
def notify_user(self, user_pk):
    user = User.objects.get(pk=user_pk)
    was_notified = notify(user)
    return was_notified

The task will be retried three times. First attempt will be after 60 second, the second attempt will be after 120 second and third after 180 second.

Unique

Sometimes it is necessary for a task with the same input to run only once. For this purpose you can use unique configuration:

@celery_app.task(
    base=DjangoTask,
    bind=True,
    unique=True,
    stale_time_limit=60)
def notify_user(self, user_pk):
    user = User.objects.get(pk=user_pk)
    was_notified = notify(user)
    return was_notified

Task will be now run only once if you fill apply it two times at the same time. Attribute stale_time_limit defines maximum nuber of seconds how long the task lock will be applied.

For unique tasks you can use is_processing method to check if task is running right now:

notify_user.is_processing(args=(user_pk,))

For every unique task is generated and stored a key in the cache. The key is generated by a unique key generator. The function default_unique_key_generator is used as a generator by default and you can change change it with unique_key_generator property:

from kombu import serialization

def custom_unique_key_generator(task, prefix, task_args, task_kwargs):
    return 'custome unique_key'

@celery_app.task(
    base=DjangoTask,
    bind=True,
    unique=True,
    unique_key_generator=custom_unique_key_generator)
def notify_user(self, user_pk):
    user = User.objects.get(pk=user_pk)
    was_notified = notify(user)
    return was_notified

Default unique key generator generates unique key from celery task name and task input arguments.

Django commands to celery tasks

Django commands can be converted automatically to the celery task. For example when you want to use celery beater instead of cron. For this purpose you can use DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DJANGO_COMMANDS setting to define which commands you want to convert into tasks:

DJANGO_CELERY_EXTENSIONS_AUTO_GENERATE_TASKS_DJANGO_COMMANDS = {
    'clearsessions': {'unique': True},
}

Commands are defined in dictionary where key is name of the command and value is dictionary of celery task configuration. Celery base task class is DjangoTask therefore supports all its functions.

If you want to call the command tasky by hand, you can use get_django_command_task to get the task:

from django_celery_extensions.task import get_django_command_task

get_django_command_task('clearsessions').delay_on_commit()

Ignore

Celery tasks can be run only once per specific time. For this purpose you can use ignore_task_after_success and ignore_task_timedelta:

@celery_app.task(
    base=DjangoTask,
    bind=True,
    ignore_task_after_success=True,
    ignore_task_timedelta=timedelta(hours=5))
def notify_user(self, user_pk):
    user = User.objects.get(pk=user_pk)
    was_notified = notify(user)
    return was_notified

Now notify_user task will be ignored for 5 hours after the last successful completion:

notify_user.delay(5).state  # result will be SUCCESS
notify_user.delay(5).state  # result will be IGNORED
# wait 5 hours
notify_user.delay(5).state  # result will be SUCCESS

If task ends in failure state it can be run again and will not be ignored.

A task can be set to ignore by hand with set_ignore_task method:

@celery_app.task(
    base=DjangoTask,
    bind=True,
    ignore_task_timedelta=timedelta(hours=5))
def notify_user(self, user_pk):
    user = User.objects.get(pk=user_pk)
    was_notified = notify(user)
    if was_notified:
        self.set_ignore_task()  # there can be specified the ignore task timedelta too self.set_ignore_task(ignore_task_timedelta=timedelta(hours=2))
    return was_notified

Now the task will be ignored only if a user was successfully notified.

Signals

You can use DjangoTask new event methods to simplify your celery tasks logic. The invocation_id is unique UUID value generated with the task invocation:

class CustomTask(DjangoTask):

    def on_invocation_apply(self, invocation_id, args, kwargs, options, result):
        """
        Method is called when task was applied with the requester.
        """

    def on_invocation_trigger(self, invocation_id, args, kwargs, task_id, options, result):
        """
        Task has been triggered and placed in the queue.
        """

    def on_invocation_unique(self, invocation_id, args, kwargs, task_id, options, result):
        """
        Task has been triggered but the same task is already active.
        """

    def on_invocation_ignored(self, invocation_id, args, kwargs, task_id, options, result):
        """
        Task has been triggered but the task has set ignore_task_timedelta
        and task was successfully completed in this timeout.
        """

    def on_invocation_timeout(self, invocation_id, args, kwargs, task_id, ex, options, result):
        """
        Task has been joined to another unique async result.
        """

    def on_task_start(self, task_id, args, kwargs):
        """
        Task has been started with worker.
        """

    def on_task_retry(self, task_id, args, kwargs, exc, eta):
        """
        Task failed but will be retried.
        """

    def on_task_failure(self, task_id, args, kwargs, exc, einfo):
        """
        Task failed and will not be retried.
        """

    def on_task_success(self, task_id, args, kwargs, retval):
        """
        Task was successful.
        """

@celery_app.task(base=CustomTask)
def custom_task(self, user_pk):
    pass

Beater

Celery documentation warns against running more than one beater. But sometimes is necessary have more instances (for example in the cloud deployments). You can use django_celery_extensions.beat.LockedPersistentScheduler to ensure that only one instance of beater will be active. Only run celery beater with this scheduler to ensure it:

celery -A proj beat -s django_celery_extensions.beat.LockedPersistentScheduler

The scheduler will only work with configured redis_cache.RedisCache in Django settings.