KZKY memo

自分用メモ.

Celery 基本

前提

OS: ubuntu14.04

インストール

sudo apt-get install rabbitmq-server python-celery python-celery-doc     

基本

  • celeryはmessage passing framework
  • message brokerを必要とする

Broker

選べる

  • RabbitMQ
  • Redis
  • SQLAlchemy, Django Database (not recommended)
  • Amazon SQS, MongoDB and IronMQ (experimental)

Task Stateのトラッキング

http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-result-backends

result backendとして選べる

built-in

  • SQLAlchemy/Django Database ORM
  • Memcachedd
  • Redis
  • RabbitMQ
  • MongoDB

自作も可能

Getting Started

  • sample_worker.py
from celery import Celery
import time
import celeryconfig

#app = Celery("tasks", backend="rpc", broker="amqp://guest@192.168.10.5")
app = Celery("tasks")
app.config_from_object(celeryconfig)

@app.task
def add(x, y):
    time.sleep(5)
    return x + y
  • sample_task_caller.py
from sample_worker import add

* just call
add.delay(4, 4)

* ready
async_result = add.delay(4, 4)
print async_result.ready()

* get
print async_result.get(timeout=10)
  • celeryconfig.py
BROKER_URL = 'amqp://guest@192.168.10.5'
CELERY_RESULT_BACKEND = 'rpc://'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Tokyo'
CELERY_ENABLE_UTC = True
  • run worker
celery worker -A sample_worker -n sample_worker --loglevel=info # 同じノードで複数workersの場合名前必要

Routing

Queueを分けることに相当する.

基本

Exchnage(s)をつくる
Queue(s)をつくる
それらをbind

を参考にすると理解が進むはず.

用語

celeryはkombuに依存しているはずなので,これを参考にした

  • exchange
  • exchange_type
    • direct
      • routing_keyに完全一致
    • fanout
      • routing_keyがなくても,常にマッチ
    • topic
      • routing_keyにpartial match
      • * (=any word match), # =(zero or more words match)が使える
      • dotsで区切れるので,domain nameのような感じでマッチする
      • *.stock.#ならば,"usd.stock" and "eur.stock.db" but not "stock.nasdaq"
      • (taskが指定するrouting_keyには,* or #を使わないと思われる)
celeryconfig.py
  • Queues
from kombu import Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default'
  • Routers
CELERY_ROUTES = {
        'feeds.tasks.import_feed': { # dirX.dirY...moduleX.task_name
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

key一覧はこれを参照
http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
Routerは自作も可能

Hard coded

あまりおすすめできない

from feeds.tasks import import_feed
import_feed.apply_async(args=['http://cnn.com/rss'],
                        queue='feed_tasks',
                        routing_key='feed.import')
wake up workers on serverX, Y, and Z
user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h
user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

Broadcst

from kombu.common import Broadcast

CELERY_QUEUES = (Broadcast('broadcast_tasks'), )

CELERY_ROUTES = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}}

このQueueのconsumers全てに送られる.

Designing Workflow

The Primitives

message passing frameworkが備えている一般的なメソッドはある
group, chain, chord, map, starmap, chunks
workflow sample: (taskA, taskB, taskC) -> taskD -> taskE

Default Behaviour

  • default #workers = 4