How we scaled Celery for our Django app

The scenario

Process 30–50 million tasks per day; max throughput of 2k TPS (tasks per second). Tasks should not be duplicated (executed exactly once). Some tasks need to scheduled for later.

call some_task.delay(), it returns immediately and executes asynchronously

As we scaled more than 100x, over the next 2 years, life become complicated.

Need to process all of this (source: https://unsplash.com/photos/jOqJbvo1P9g)

Little about Celery

It is a terrible tasting vegetable. Seriously. Don’t ever have it.

  1. implement the task i.e. write the code that will run when the task is invoked.

Are tasks function calls?

Initially, some_task.delay() or some_task.apply_async() felt like async function calls- invoke it, and the function executed asynchronously.

Only primitives are allowed as task arguments

Once the task is invoked, celery serializes (usually to JSON) the full function name (app.tasks.some_task), the function args and kwargs, and some additional info (queue name, execution, etc.). This serialzed text is then stored in the message broker. Later (typically in a few milliseconds), this message is picked up by the consumer and executed.

Updating a function will break old tasks

Once, I started getting a lot of celery.exceptions.NotRegistered. Celery was telling me, the task that I want to execute, was not there in the source code. Huh? But it was there! Errors occurred for a few mins, and then stopped. Few weeks later, it happened again. This time errors lasted for like half a day!

Make tasks changes backward compatible, like APIs.

Ideally, don’t update tasks. But if you have to, make sure:

  1. New args have default values
  2. Removed args remain as no-op args

Django-celery-results is a time bomb

This library:

Standard logging is a much better solution for any of your debugging needs.

In general, no need to store task results in any backend (even though celery provides this feature). By default the celery will log task results.

Pro tip: Log task params, in addition to task name

The code to achieve it is wicked simple. Django example: in settings.py add the following

Unit Testing

You unit test your code right? If not, I highly recommend Test Driven Development.

1. Task invocation

Here, we test that the task is triggered appropriately. For this, we need to mock the actual celery invocation. @patch is very handy here (ref). If we patch a method (either in the class or function), we get a new function arg with the mocked object. Then we can assert the function call args:

2. Task execution

Now we need to test that the task runs as per the arguments provided. This is one more straightforward, we call the task method directly and assert its side effects.

Ignore celery tasks in the test suite

After some heartache of the test suite failing sporadically- locally or in the CI, I figured we should ignore celery task creation in the unit test suite. One way to do it is to create a fake celery broker in the test environment:

Scaling up 🚀

So far, I talked about issues that come with having more code, not necessarily more traffic. Time to scale up! Scale challenges are always nice problems to have.

Source: https://unsplash.com/photos/TV2gg2kZD1o

Prioritisation

Not all tasks are created equal. Some tasks (that sync data to ES, or send OTPs) are much higher in priority than others. Putting these on a separate queue makes sure that dedicated consumers can consume these time-sensitive tasks.

Isolation

Often certain tasks would get stuck while executing. We always wrote perfect code but the downstream dependencies would take too long to respond. Okay I’m kidding- we too pushed bad code. In any case, 1 “bad” task would affect the other “good” tasks. This couldn’t scale.

Solution: Use different Queues

Celery has the feature of having multiple queues. Different consumers consume from one or more queues. By default, a consumer only consumers from the celery (default) queue.

Some of the queues at apna.co

Bonus: Rate limiting

In some instances, we were getting throttled by the downsteam dependencies — we were making too many requests too quickly. We could limit this by having a smaller number of workers. For example: say we have a hard limit of 1,000 RPS to a certain API. We know 1 API call (or task execution) takes about 200ms (p50 latency). So, to achieve a max RPS of 500 (to keep some buffer), we set the max workers to x

   x / 0.2 = 500
=> x = 100

Redis Broker does not scale

Celery recommends RabbitMQ as the broker but GCP provides a managed redis as a service so we went ahead with redis. After a few months we self-hosted RabbitMQ and moved away from redis.

Celery was duplicating
  1. Deploy a new set of consumers that consume from RabbitMQ
  2. Deploy the new producer code that writes to RabbitMQ
  3. After 1–2 days, terminate the old redis consumers (to make sure no pending tasks remain).

Delayed Tasks

As a business requirement, we wanted certain tasks to be executed only in office hours (8am to 6pm). So, whenever a task was being created outside of office hours, we calculated a countdown that was added to make the task run at 8am next day. This didn’t scale!

It starts off simple and has enough features to scale for many different workloads!

I feel we will continue to use it in the years to come.

Please reach to me for any comments, suggestions on this article or for the exciting positions we have open at apna.co. We are solving more challenges problems like these at my company. DMs open on twitter

Engineering Manager at apna.co | Startup enthusiast, Public Speaker, Tech Blogger | 1 startup was acqui-hired.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store