Celery and RabbitMQ
Celery
It is an asynchronous task queue/job queue based on distributed message passing.
Installation:
pip install celery
Setup:
vi apps/apps/__init__.py
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
vi apps/apps/celery.py
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'apps.settings')
app = Celery('apps',
# broker='amqp://%s:%s@localhost:5672/%s' % (celery_username, celery_password, celery_vhost),
broker='amqp://guest:guest@localhost:5672//',
)
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# Optional configuration, see the application user guide.
app.conf.update(
# CELERY_RESULT_BACKEND = 'cache+memcached://127.0.0.1:11211/',
CELERY_RESULT_BACKEND='amqp://',
CELERY_TASK_SERIALIZER='json',
)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
if __name__ == '__main__':
app.start()
Third Party modules:
- celerybeat for scheduling periodic tasks (a smarter version of cron)
- celeryev / celerymon for monitoring your worker cluster
- celerycam for snapshotting cluster state
delay is a shortcut to send a task message, but does not support execution options.
Retry catch ruc time exception errors whereas acks_late catch if the worker in the middle of the task crash and try to return it back to queue but take care because your code maybe will run more than one with acks_late if in the middle of the tasks the worker shut off.
It is used to register failure or success or complete actions on the task.
class DebugTask(Task):
abstract = True
def after_return(self, *args, **kwargs):
print('Task returned: {0!r}'.format(self.request))
@app.task(base=DebugTask)
def add(x, y):
return x + y
Running celery:
celery -A apps worker -E -Q queue_name --loglevel=info
RabbitMQ
It is open source message broker software that implements the Advanced Message Queuing Protocol (AMQP).
Installation:
# Ubuntu
sudo apt-get install rabbitmq-server
# Mac
brew install rabbitmq
Test Login:
curl -i -u guest:guest http://localhost:15672/api/whoami
Enable RabbitMQ Management:
sudo rabbitmq-plugins enable rabbitmq_management
sudo service rabbitmq-server restart
Checking the status:
sudo rabbitmqctl status
Setting up RabbitMQ
sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl list_users
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl list_vhosts
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
sudo rabbitmqctl list_user_permissions myvhost
Be sure to use this syntax when creating vhost:
amqp://user:pass@localhost:31278/myqueue
No slash at the end
If using guest user:
amqp://guest:guest@localhost:31278//
References: