Celery と asyncio を組み合わせるには? 質問する

Celery と asyncio を組み合わせるには? 質問する

Celery タスクを のように見せるラッパーを作成するにはどうすればよいでしょうかasyncio.Task? または、Celery を と統合するより良い方法はありますかasyncio?

Celeryの作者@asksol、こう言った。:

Celery を非同期 I/O フレームワーク上の分散レイヤーとして使用することは非常に一般的です (ヒント: CPU バインドされたタスクをプリフォーク ワーカーにルーティングすると、イベント ループがブロックされなくなります)。

しかし、フレームワーク専用のコード例は見つかりませんでしたasyncio

ベストアンサー1

編集: 2021年1月12日 以前の回答(一番下にあります)は古くなっていたので、asyncioとCeleryを併用する方法をまだ探している人を満足させる可能性のある解決策の組み合わせを追加しました。

まずはユースケースを簡単に分解してみましょう(より詳細な分析はこちら:asyncio とコルーチン vs タスクキュー):

  • タスクが I/O バウンドである場合は、コルーチンと asyncio を使用する方がよい傾向があります。
  • タスクが CPU に依存する場合は、Celery または他の同様のタスク管理システムを使用する方がよいでしょう。

したがって、Python の「1 つのことを行い、それをうまく行う」という文脈では、asyncio と celery を混在させないようにするのは理にかなっています。

しかし、メソッドを非同期的にも非同期タスクとしても実行できるようにしたい場合はどうなるでしょうか? その場合、考慮すべきいくつかのオプションがあります。

  • 私が見つけることができた最良の例は次のとおりです。https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(そして私はそれが@Franey の返答):

    1. 非同期メソッドを定義します。

    2. 使用asgirefsync.async_to_syncモジュールを使用して、非同期メソッドをラップし、Celery タスク内で同期的に実行します。

      # tasks.py
      import asyncio
      from asgiref.sync import async_to_sync
      from celery import Celery
      
      app = Celery('async_test', broker='a_broker_url_goes_here')
      
      async def return_hello():
          await asyncio.sleep(1)
          return 'hello'
      
      
      @app.task(name="sync_task")
      def sync_task():
          async_to_sync(return_hello)()
      
  • 私が出会ったユースケースは高速APIアプリケーションは前の例の逆になります。

    1. 集中的な CPU バウンド プロセスが非同期エンドポイントを占有しています。

    2. 解決策は、非同期 CPU バウンド プロセスを Celery タスクにリファクタリングし、Celery キューから実行するためのタスク インスタンスを渡すことです。

    3. そのケースを視覚化するための最小限の例:

      import asyncio
      import uvicorn
      
      from celery import Celery
      from fastapi import FastAPI
      
      app = FastAPI(title='Example')
      worker = Celery('worker', broker='a_broker_url_goes_here')
      
      @worker.task(name='cpu_boun')
      def cpu_bound_task():
          # Does stuff but let's simplify it
          print([n for n in range(1000)])
      
      @app.get('/calculate')
      async def calculate():
          cpu_bound_task.delay()
      
      if __name__ == "__main__":
          uvicorn.run('main:app', host='0.0.0.0', port=8000)
      
  • もう一つの解決策はジュアンラそして@ダニウス回答では提案されていますが、同期実行と非同期実行を混在させるとパフォーマンスが低下する傾向があることを念頭に置く必要があります。そのため、これらの回答を本番環境で使用するかどうかを決定する前に監視する必要があります。

最後に、既製のソリューションがいくつかありますが、私はそれらを自分で使用したことがないのでお勧めできませんが、ここでリストします。

  • セロリプール非同期IOこれは、Celery 5.0 では解決できなかったことを正確に解決しているようですが、少し実験的なものであることに注意してください (本日 2021 年 1 月 12 日のバージョン 0.2.0)
  • タスク「Asyncio コルーチンを配布する Celery のようなタスク マネージャー」であると主張していますが、少し古いようです (最新のコミットは約 2 年前)

まあ、それはあまり古くなっていませんね。Celery バージョン 5.0 では asyncio 互換性が実装されていないため、これがいつ実装されるか、実装されるかどうかはわかりません... 応答のレガシー理由 (当時の回答だったため) とコメントの継続のために、これをここに残します。

これは、公式サイトに記載されているように、Celery バージョン 5.0 から可能になります。

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. Celery の次のメジャー バージョンでは Python 3.5 のみがサポートされ、新しい asyncio ライブラリを活用する予定です。
  2. Python 2 のサポートを廃止することで、大量の互換性コードを削除できるようになり、Python 3.5 に移行することで、古いバージョンでは代替手段がなかった型付け、async/await、asyncio などのコンセプトを活用できるようになります。

上記は前のリンクから引用したものです。

だから一番いいのは待つことだバージョン 5.0配布されます!

それまでの間、楽しいコーディングを:)

おすすめ記事