Celery + SQS on Django on Elastic Beanstalk

Pre-Preface

I write these articles mostly for myself. So that if in 3 years I need to do a similar setup I will be able to reduce the amount of time spent crawling the internet for answers. As a result the information contained in them is bound to be incomplete and probably not very useful for outsiders.

However since I don’t want to leave anyone frustrated, if you wish to know more about some background aspect that I left unmentioned, please write a comment, I will be sure to update the content so to provide a more wholesome help article for everyone.

Preface

I will begin by saying that I used to have a Redis instance on AWS Elasticache but I have in the end opted for AWS SQS as it’s much, much cheaper to run. Two Redis instances (one for production and one for testing) cost me around 140$/month while SQS offers 1.000.000 free operations/month making it basically free for the usage that I make of it now.

I had already tried SQS before and I found it relatively easy to setup, both in development(locally with Docker) or in production (on Elastic Beanstalk) and yet at that point I didn’t realize what I was walking into.

Anyway, let’s get started.

Installation

Celery (5.1.0 at the time of writing) has a version designed to work with SQS. To install it to your Elastic Beanstalk environment all you have to do is to add to your .

Then go in your project folder and create a file called . Some of SQS’s magic is already going to manifest at this stage:

import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", os.environ['DJANGO_SETTINGS_FILE'])app = Celery("app")CELERY_CONFIG = {
"task_serializer": "json",
"accept_content": ["json"],
"result_serializer": "json",
"result_backend": None,
"worker_enable_remote_control": False,
}
broker_url = "sqs://"
DOCKER_BROKER_URL = "sqs://user:password@sqs:9324/"
if os.environ.get('RDS_ALT_DB_NAME'): # checking if we are on AWS EBS or in local env CELERY_CONFIG.update( # settings for prod and test environments
**{
"broker_url": broker_url,
"broker_transport": "sqs",
"broker_transport_options ": {
"region": "eu-central-1",
"visibility_timeout": 3600,
"polling_interval": 60,
},
}
)
else:
CELERY_CONFIG.update( # settings for dev (docker) environment
**{
"broker_url": DOCKER_BROKER_URL,
"broker_transport": "sqs",
"broker_transport_options": {
"region": "eu-central-1",
"visibility_timeout": 3600,
"polling_interval": 60,
},
}
)
app.conf.update(**CELERY_CONFIG)
app.autodiscover_tasks()
app.conf.beat_schedule = {
"import_coaches": {
"task": "gsheets.tasks.import_coaches",
"schedule": crontab(minute=0, hour="*/12",
"options" : {
"queue" : os.environ.get('MAIN_QUEUE')
}
}
}

Let’s unpack some of the critical points of this config file. I really struggled with this because there is so little documentation available.

Those of you who have used celery with Redis, RabbitMQ or other brokers will be surprised to see an empty broker url and no results backend mentioned anywhere.

Result Backend

SQS works exclusively as broker and not as a backend. If you need a backend in your project you can use SQS + RabbitMQ/Redis/SQLAlchemy.

Broker Url

SQS’s broker url is because in my environment I have the variables and . If you do have these variables, SQS (but really, Kombu) will find them and use them to gain access to your AWS SQS console.

and must be AWS IAM credentials with the correct permissions in order to read and write messages on SQS queues. And in fact they need permissions to create new queues as well.

Alternatively you can compose the in this way:

broker_url = f"sqs://{os.environ.get('SQS_ACCESS_KEY_ID')}:{os.environ.get('SQS_SECRET_ACCESS_KEY')}@"

In this example I used different credentials that I made on purpose for SQS. By using specific IAM users for specific tasks you increase a project’s robustness against malicious attacks.

Queue

At this point we haven’t specified which queue or queues we want to use. I wasted a lot of time trying to use a configuration called “predefined_queues” which didn’t seem to have any bearing whatsoever on which queue was being used to read or write tasks. The solution I found is a little bit more hands-on but at least it works 100% of the time.

It involves specifying which queue to use every time I call a celery task, and booting up the celery worker with instructions to listen to a specific queue (or list of queues). This type of micromanaging was necessary in my case because SQS seemed to be extremely liberal with how it allowed various IAM roles access to various queues.

During testing I had numerous issues with tasks inserted by one environment and executed in a different one, despite me paying great attention to setting the right permissions for the right roles. In the end I decided to be extremely strict with the queue usage and adopt a white-list approach to it, clearly specifying which queue I wanted to use at every step of the way.

# .platform/hooks/postdeploy/# These two lines are to get some env vars from the EBS environment
CELERY_WORKER_ENV=$(/opt/elasticbeanstalk/bin/get-config environment -k CELERY_WORKER_ENV)
MAIN_QUEUE=$(/opt/elasticbeanstalk/bin/get-config environment -k MAIN_QUEUE)
if [ "$CELERY_WORKER_ENV" = "true" ];
then
# Create the celery systemd service file
echo "[Unit]
Name=Celery
Description=Celery service for My App
After=network.target
StartLimitInterval=0
[Service] Type=simple
Restart=always
RestartSec=1
User=root
WorkingDirectory=/var/app/current
ExecStart=/var/app/venv/staging-LQM1lest/bin/celery -A <project_name> worker --loglevel=INFO -Q ${MAIN_QUEUE} --concurrency=6 --logfile=/var/log/celery.log
EnvironmentFile=/opt/elasticbeanstalk/deployment/env
[Install] WantedBy=multi-user.target
" | tee /etc/systemd/system/celery.service
# Start celery service
systemctl start celery.service
# Enable celery service to load on system start
systemctl enable celery.service
else
echo "CELERY_WORKER_ENV = false"
fi

I use the same repository for several environments so any configuration has to be dynamic. In this case I use two environment variables to understand if this is a worker environment and if it is to complete the execution command with the correct queue name.

When executing celery you can use the -Q tag to specify an exclusive queue or a list of queues to look for tasks in. The worker will not go looking for tasks elsewhere. I never had a problem with Redis in this sense but with SQS this helped me solve a particularly annoying problem.

The second piece of the puzzle is specifying which queue to load a task on whenever you call it. So instead of calling tasks using i call them with . For example:

Not hard-coding the queue name will certainly spare you a lot of misery in the future.

The same can be done for tasks that go in the celery beat schedule simply by adding a keyword arg, as in the example below:

app.conf.beat_schedule = {
"import_coaches": {
"task": "gsheets.tasks.import_coaches",
"schedule": crontab(minute=0, hour="*/12",
"options" : {
"queue" : os.environ.get('MAIN_QUEUE')
}
}
}

Wannabe full-stack developer