Celery and RabbitMQ

July 7, 2015   

Celery

It is an asynchronous task queue/job queue based on distributed message passing.

Installation:

pip install celery

Setup:

vi apps/apps/__init__.py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

vi apps/apps/celery.py

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'apps.settings')

app = Celery('apps',
         # broker='amqp://%s:%s@localhost:5672/%s' % (celery_username, celery_password, celery_vhost),
         broker='amqp://guest:guest@localhost:5672//',
         )

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# Optional configuration, see the application user guide.
app.conf.update(
    # CELERY_RESULT_BACKEND = 'cache+memcached://127.0.0.1:11211/',
    CELERY_RESULT_BACKEND='amqp://',
    CELERY_TASK_SERIALIZER='json',
)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

if __name__ == '__main__':
    app.start()

Third Party modules:

  • celerybeat for scheduling periodic tasks (a smarter version of cron)
  • celeryev / celerymon for monitoring your worker cluster
  • celerycam for snapshotting cluster state

apply_async vs delay

delay is a shortcut to send a task message, but does not support execution options.

retry vs acks_late

Retry catch ruc time exception errors whereas acks_late catch if the worker in the middle of the task crash and try to return it back to queue but take care because your code maybe will run more than one with acks_late if in the middle of the tasks the worker shut off.

Abstract classes

It is used to register failure or success or complete actions on the task.

class DebugTask(Task):
    abstract = True

    def after_return(self, *args, **kwargs):
        print('Task returned: {0!r}'.format(self.request))


@app.task(base=DebugTask)
def add(x, y):
    return x + y

Running celery:

celery -A apps worker -E -Q queue_name --loglevel=info

RabbitMQ

It is open source message broker software that implements the Advanced Message Queuing Protocol (AMQP).

Installation:

# Ubuntu
sudo apt-get install rabbitmq-server
# Mac
brew install rabbitmq

Test Login:

curl -i -u guest:guest http://localhost:15672/api/whoami

Enable RabbitMQ Management:

sudo rabbitmq-plugins enable rabbitmq_management
sudo service rabbitmq-server restart

Checking the status:

sudo rabbitmqctl status

Setting up RabbitMQ

sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl list_users
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl list_vhosts
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
sudo rabbitmqctl list_user_permissions myvhost

Be sure to use this syntax when creating vhost:

amqp://user:pass@localhost:31278/myqueue

No slash at the end

If using guest user:

amqp://guest:guest@localhost:31278//

checkout for more commands

References:



comments powered by Disqus