Celery ドキュメントDjango内でCeleryをテストすることについて言及しかし、Django を使用していない場合に Celery タスクをテストする方法は説明されていません。どうすればいいのでしょうか?
ベストアンサー1
任意の unittest ライブラリを使用してタスクを同期的にテストできます。私は通常、Celery タスクを扱うときに 2 つの異なるテスト セッションを実行します。最初のセッション (以下で提案しているように) は完全に同期しており、アルゴリズムが期待どおりに動作することを確認するためのセッションです。2 番目のセッションはシステム全体 (ブローカーを含む) を使用して、シリアル化の問題やその他の配布、通信の問題が発生していないことを確認します。
それで:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
そしてあなたのテスト:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)