When we first introduced celery to our existing django application, we were at less than 1/100th of the traffic we have now. Life was simple-
some_task.delay(), it returns immediately and executes asynchronously
As we scaled more than 100x, over the next 2 years, life become complicated.
Little about Celery
It is a terrible tasting vegetable. Seriously. Don’t ever have it.
Celery is a background task processing system. I like to think about celery as a reliable and distributed way to implement producer-consumer behaviour:
In our case, our django app (i.e. API server) was the producer. There were many celery workers, across multiple VMs that consumed these tasks. In between, the tasks were stored as “messages” in the RabbitMQ message queue.
The celery framework abstracts away a lot these underlying nitty-gritties. As a a developer you can focus on the business logic-
- invoke the task
- implement the task i.e. write the code that will run when the task is invoked.
More details lie ahead, along with why do you need to know them.
Are tasks function calls?
some_task.apply_async() felt like async function calls- invoke it, and the function executed asynchronously.
But this not the case. Lot of things go on under the hood to achieve the illusion. Some pit falls we came across:
Only primitives are allowed as task arguments
Once the task is invoked, celery serializes (usually to JSON) the full function name (
app.tasks.some_task), the function args and kwargs, and some additional info (queue name, execution, etc.). This serialzed text is then stored in the message broker. Later (typically in a few milliseconds), this message is picked up by the consumer and executed.
Now, in order for this serialization to work,
the entire message must be serial-izable!
Python objects cannot be serialized; typically only primitives (ints, strings, etc.) will work. Many times I have made the mistake of passing a
User object instead of
user_id. This happens because, when you write the code, and test out the function call, everything works. This often easily gets over-looked in PR reviews; especially in case of bad naming. Even worse, this raises an unhandled exception on the consumer side that will crash your API call.
Updating a function will break old tasks
Once, I started getting a lot of
celery.exceptions.NotRegistered. Celery was telling me, the task that I want to execute, was not there in the source code. Huh? But it was there! Errors occurred for a few mins, and then stopped. Few weeks later, it happened again. This time errors lasted for like half a day!
Turned out, in both cases, I had updated the function name (and thereby task name). Nothing inherently wrong with it. Except, there were messages in the broker of the old task name.
When a consumer running the new source-code, receives an old task, boom💥!
Consumer doesn’t know what to do, and errors out. Very counter-intuitive! I update function names all the time. Even for immediately executing tasks, some errors will occur because of the non-zero time taken to deploy in production. Moreover, this will not be caught in a staging environment.
Similar issues happened, when I updated the function args in a non-backward compatible way. After some heartache, we adopted a rule of thumb within the company:
Make tasks changes backward compatible, like APIs.
Ideally, don’t update tasks. But if you have to, make sure:
- Do not rename.
- New args have default values
- Removed args remain as no-op args
If you can’t do the above, create a new task like
some_task_v2- similar to versioning APIs.
Django-celery-results is a time bomb
I’m sure this was made with the best of intentions but this serves little practical purpose in production. Worse, it does a lot of harm.
Storing task results in a centralized location (like SQL) defeats the fundamental tenet of celery: a distributed task queue.
Within a few months of use, Task results were taking up 90% of our SQL storage, the django admin page didn’t open (because the SQL statements would time out).
Standard logging is a much better solution for any of your debugging needs.
In general, no need to store task results in any backend (even though celery provides this feature). By default the celery will log task results.
Pro tip: Log task params, in addition to task name
The code to achieve it is wicked simple. Django example: in
settings.py add the following
You unit test your code right? If not, I highly recommend Test Driven Development.
I couldn’t find much online, so I came up with a simple framework to unit test celery tasks. We need to test 2 parts:
1. Task invocation
Here, we test that the task is triggered appropriately. For this, we need to mock the actual celery invocation.
@patch is very handy here (ref). If we patch a method (either in the class or function), we get a new function arg with the mocked object. Then we can assert the function call args:
2. Task execution
Now we need to test that the task runs as per the arguments provided. This is one more straightforward, we call the task method directly and assert its side effects.
Ignore celery tasks in the test suite
After some heartache of the test suite failing sporadically- locally or in the CI, I figured we should ignore celery task creation in the unit test suite. One way to do it is to create a fake celery broker in the test environment:
Scaling up 🚀
So far, I talked about issues that come with having more code, not necessarily more traffic. Time to scale up! Scale challenges are always nice problems to have.
Not all tasks are created equal. Some tasks (that sync data to ES, or send OTPs) are much higher in priority than others. Putting these on a separate queue makes sure that dedicated consumers can consume these time-sensitive tasks.
Often these tasks would get delayed because of a larger backlog lower priority tasks.
Often certain tasks would get stuck while executing. We always wrote perfect code but the downstream dependencies would take too long to respond. Okay I’m kidding- we too pushed bad code. In any case, 1 “bad” task would affect the other “good” tasks. This couldn’t scale.
Solution: Use different Queues
Celery has the feature of having multiple queues. Different consumers consume from one or more queues. By default, a consumer only consumers from the
celery (default) queue.
It isn’t obvious initially, why wouldn’t anyone bother to have multiple queues? But once we started facing prioritisation and isolation issues, the answer become painfully obvious.
Celery provides many ways to implement task queues. My favourite is to add it to
settings.py — this acts as a single source of truth of the all the different queues:
On the consumer side, make sure there are workers that consume all mentioned queues, and the
In some cases, we had to
purge queues- some bad tasks were pilled up in the message broker. Purging would be impossible in a single large queue.
Bonus: Rate limiting
In some instances, we were getting throttled by the downsteam dependencies — we were making too many requests too quickly. We could limit this by having a smaller number of workers. For example: say we have a hard limit of 1,000 RPS to a certain API. We know 1 API call (or task execution) takes about 200ms (p50 latency). So, to achieve a max RPS of 500 (to keep some buffer), we set the max workers to
x / 0.2 = 500
=> x = 100
Naturally, these workers had to be on a specific queue.
Redis Broker does not scale
Celery recommends RabbitMQ as the broker but GCP provides a managed redis as a service so we went ahead with redis. After a few months we self-hosted RabbitMQ and moved away from redis.
There is a long stand issue with celery + redis: https://github.com/celery/celery/issues/5935. Effectively, tasks with long delays (greater than
visiblity_timeout) tend to get picked up by multiple workers and executed multiple times.
To work around this issue, we made affected tasks idempotent. But this often involved using a central datastore (usually SQL). Again, violating the fundamental tenet of being a distributed system. After some beating around the bush, we bit the bullet and migrated to RabbitMQ. Migration itself was length by uneventful-
- Create a new RabbitMQ cluster
- Deploy a new set of consumers that consume from RabbitMQ
- Deploy the new producer code that writes to RabbitMQ
- After 1–2 days, terminate the old redis consumers (to make sure no pending tasks remain).
As a business requirement, we wanted certain tasks to be executed only in office hours (8am to 6pm). So, whenever a task was being created outside of office hours, we calculated a
countdown that was added to make the task run at 8am next day. This didn’t scale!
Turns out, celery has a very un-optimised implementation for scheduled tasks. It picks a set of tasks from the message broker, iterates over them to check if it time for them to execute or not. We found out, queues that were scheduled for about 36hrs later took 100–1000 times longer to execute completely as compared to the queue with the same task but set to execute instantly 🤯.
Solution: have workers that execute only during office hours. Ingeniously simple! Improved throughput by 100x
This even simplified our producer code- no need to calculate a
countdown nor account for upcoming holidays.
Quite a few lessons over 2 years of usage! All said and done, I love working with celery.
It starts off simple and has enough features to scale for many different workloads!
I feel we will continue to use it in the years to come.
Please reach to me for any comments, suggestions on this article or for the exciting positions we have open at apna.co. We are solving more challenges problems like these at my company. DMs open on twitter