Celery 基本
前提
OS: ubuntu14.04
インストール
sudo apt-get install rabbitmq-server python-celery python-celery-doc
基本
- celeryはmessage passing framework
- message brokerを必要とする
Broker
選べる
Task Stateのトラッキング
http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-result-backends
result backendとして選べる
built-in
- SQLAlchemy/Django Database ORM
- Memcachedd
- Redis
- RabbitMQ
- MongoDB
自作も可能
Getting Started
- sample_worker.py
from celery import Celery import time import celeryconfig #app = Celery("tasks", backend="rpc", broker="amqp://guest@192.168.10.5") app = Celery("tasks") app.config_from_object(celeryconfig) @app.task def add(x, y): time.sleep(5) return x + y
- sample_task_caller.py
from sample_worker import add * just call add.delay(4, 4) * ready async_result = add.delay(4, 4) print async_result.ready() * get print async_result.get(timeout=10)
- celeryconfig.py
BROKER_URL = 'amqp://guest@192.168.10.5' CELERY_RESULT_BACKEND = 'rpc://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Asia/Tokyo' CELERY_ENABLE_UTC = True
- run worker
celery worker -A sample_worker -n sample_worker --loglevel=info # 同じノードで複数workersの場合名前必要
Routing
- https://denibertovic.com/posts/celery-best-practices/
- http://docs.celeryproject.org/en/latest/userguide/routing.html#id2
Queueを分けることに相当する.
用語
celeryはkombuに依存しているはずなので,これを参考にした
- exchange
- exchange_type
celeryconfig.py
- Queues
from kombu import Queue CELERY_DEFAULT_QUEUE = 'default' CELERY_QUEUES = ( Queue('default', routing_key='task.#'), Queue('feed_tasks', routing_key='feed.#'), ) CELERY_DEFAULT_EXCHANGE = 'tasks' CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' CELERY_DEFAULT_ROUTING_KEY = 'task.default'
- Routers
CELERY_ROUTES = { 'feeds.tasks.import_feed': { # dirX.dirY...moduleX.task_name 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, }
key一覧はこれを参照
http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
Routerは自作も可能
Hard coded
あまりおすすめできない
from feeds.tasks import import_feed import_feed.apply_async(args=['http://cnn.com/rss'], queue='feed_tasks', routing_key='feed.import')
wake up workers on serverX, Y, and Z
user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h user@x:/$ celery -A proj worker -Q default --hostname=x@%h user@y:/$ celery -A proj worker -Q default --hostname=y@%h
Broadcst
from kombu.common import Broadcast CELERY_QUEUES = (Broadcast('broadcast_tasks'), ) CELERY_ROUTES = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}}
このQueueのconsumers全てに送られる.
Designing Workflow
The Primitives
message passing frameworkが備えている一般的なメソッドはある
group, chain, chord, map, starmap, chunks
workflow sample: (taskA, taskB, taskC) -> taskD -> taskE
Default Behaviour
- default #workers = 4
Best Practice
URLs
- http://www.celeryproject.org/
- http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html
- http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps
- http://docs.celeryproject.org/en/latest/userguide/index.html#guide
- http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq
- http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-result-backends
- http://docs.celeryproject.org/en/latest/configuration.html#configuration
- https://denibertovic.com/posts/celery-best-practices/
- http://abhishek-tiwari.com/post/amqp-rabbitmq-and-celery-a-visual-guide-for-dummies