Celery best practices

I've been programming with celery for the last three years, and Deni Bertović's article about Celery best practices has truly been invaluable to me. In time, I've also come up with my set of best practices, and I guess this blog is as good a place as any to write them down.

Write short tasks

I think that a task should be as concise as possible, in order to be able to understand what it does and how it handles corner cases as quickly as possible. I personally try to follow these rules:

  • wrap the main task logic in an object method or a function
  • make this method/function raise identified exceptions for identified corner cases and decide what is the logic for each of them
  • implement a retry mechanism only where appropriate

Let's illustrate these rules with a simple example: sending an email using a 3rd party API (eg: Mailgun, Mailjet, etc). Anyone having spent enough time using third party infrastructure and systems knows they should never totally rely on them: the network can fail, they can be unavailable, etc. We thus need to handle some expectable error cases and have a fallback strategy in case of an unexpected error.

Let's say that we have a function api_send_mail that does the actual API call, raising a myapp.exceptions.InvalidUserInput exception, in case of an HTTP client error. This exception constitutes our set of expectable exceptions, that we need to plan for. Any other exception (connection error, server HTTP error, etc) will be sent to some crash report backend, like Sentry and trigger a retry.

My task implementation would look something like this:

import requests

from myproject.tasks import app  # app is your celery application
from myproject.exceptions import InvalidUserInput

from utils.mail import api_send_mail

@app.task(bind=True, max_retries=3)
def send_mail(self, recipients, sender_email, subject, body):
    """Send a plaintext email with argument subject, sender and body to a list of recipients."""
    try:
        data = api_send_mail(recipients, sender_email, subject, body)
    except InvalidUserInput:
        # No need to retry as the user provided an invalid input
        raise
    except Exception as exc:
        # Any other exception. Log the exception to sentry and retry in 10s.
        sentrycli.captureException()
        self.retry(countdown=10, exc=exc)
    return data

What the task actually does is abstracted one layer down, and almost all the rest of the task body is handling errors. I feel that it's easier to grasp the bigger picture, and that the task is easier to maintain this way.

Retry gracefully

Setting fixed countdowns for retries may not be what you want. I tend to prefer using a backoff increasing with the number of retries. This means the more a task fails, the more we have to wait until the next retry. I think this has a couple of interesting consequences:

  • we don't hammer the external service in case of an outage,
  • it gives more time to the service to go back to normal,
  • and thus increases our overall chance of success

A simple (but effective anyhow) implementation could look something like this:

def backoff(attempts):
    """Return a backoff delay, in seconds, given a number of attempts.

    The delay increases very rapidly with the number of attemps:
    1, 2, 4, 8, 16, 32, ...

    """
    return 2 ** attempts

@app.task(bind=True, max_retries=3)
def send_mail(self, recipients, sender_email, subject, body):
    """Send a plaintext email with argument subject, sender and body to a list of recipients."""
    try:
        data = api_send_mail(recipients, sender_email, subject, body)
    except InvalidUserInput:
        raise
    except Exception as exc:
        sentrycli.captureException()
        self.retry(countdown=backoff(self.request.retries), exc=exc)
        ...

Fail fast and don't block forever

One thing to remember is to always specify a timeout on I/O operations, or at least on the celery task itself. If you don't, it's possible all your tasks could block indefinitely, which would then prevent any additional task to start. In the context of the send_mail task, I could probably do something like this, as an API call should probably not take more than 5 seconds:

@app.task(
    bind=True,
    max_retries=3,
    soft_time_limit=5 # time limit is in seconds.
)
def send_mail(self, recipients, sender_email, subject, body):
    ...

If the task takes more than 5 seconds to complete, the celery.exceptions.SoftTimeLimitExceeded exception would get raised and logged to Sentry.

I also tend to set the CELERYD_TASK_SOFT_TIME_LIMIT configuration option with a default value of 300 (5 minutes). This will act as a failsafe if I forget to set an appropriate soft_time_limit option on a task.

Share common behavior among tasks

All that is pretty dandy, but I don't want to re-implement the exception catching for every task. I should be able to specify a basic behavior shared between all my tasks. Turns out you can, using an abstract task class.

from myproject.tasks import app

class BaseTask(app.Task):
    """Abstract base class for all tasks in my app."""

    abstract = True

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """Log the exceptions to sentry at retry."""
        sentrycli.captureException(exc)
        super(BaseTask, self).on_retry(exc, task_id, args, kwargs, einfo)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Log the exceptions to sentry."""
        sentrycli.captureException(exc)
        super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo)


@app.task(
    bind=True,
    max_retries=3,
    soft_time_limit=5,
    base=BaseTask)
def send_mail(self, recipients, sender_email, subject, body):
    """Send a plaintext email with argument subject, sender and body to a list of recipients."""
    try:
        data = api_send_mail(recipients, sender_email, subject, body)
    except InvalidUserInput:
        raise
    except Exception as exc:
        self.retry(countdown=backoff(self.request.retries), exc=exc)
    return data

You can see that the send_mail task implementation only deals with email sending and expected error handling. Everything else is handled by the abstract base class. If the common behavior is more complex, this trick can drastically reduce the size of each task body and the amount of duplicated code in your tasks.

Note: this example is only here to demonstrate how to share behavior between tasks. To properly integrate Sentry with Celery, have a look at this page.

Tip: have a look at the list of available handlers, to get an idea of what behavior can be shared between tasks.

Write large tasks as classes

So far, I've only implemented tasks as functions. However, it's also possible to define class tasks.

I think one of the scenarii where class tasks really shine are when you'd like to split a large task function into several well-defined and testable methods. As you can see here, the celery.task decorator will generate a task class and inject the decorated function as the class run method. Defining a class task amounts to defining a class inheriting from app.Task with a run method.

class handle_event(BaseTask):   # BaseTask inherits from app.Task

    def validate_input(self, event):
        ...

    def get_or_create_model(self, event):
        ...

    def stream_event(self, event):
        ...

    def run(self, event):
        if not self.validate_intput(event):
            raise InvalidInput(event)
        try:
            model = self.get_or_create_model(event)
            self.call_hooks(event)
            self.persist_model(event)
        except Exception as exc:
            self.retry(countdown=backoff(self.request.retries), exc=exc)
        else:
            self.stream_event(event)

By doing this, the task logic is clear and easy to follow (the run method stays concise even if the methods body are large), and each of these method can then be unit-tested independently.

Another advantage of using class tasks is using multiple inheritance to specialize a task with multiple abstract base classes. For example, I'd like to use the celery_once QueueOnce abstract class to introduce some locking mechanism, while still using the BaseTask for sentry logging. This way, each abstract task class is used as a mixin, adding some behaviour to the task.

Unit-test your tasks

Unit testing a project involving celery has always been a pickle for me. I tried to deploy a broker and a test celery worker in the CI environment, but it felt like killing a fly with a bazooka. The answer turns out to be quite simple, thanks to Nicolas Le Manchet for figuring this one out! When the CELERY_ALWAYS_EAGER option is activated, all tasks called using their apply_async or delay method are called directly, without requiring any broker or celery worker. Easy as pie.

Comments