Dask 分散で大きな引数を持つタスクを効率的に送信するにはどうすればよいでしょうか? 質問する

Dask 分散で大きな引数を持つタスクを効率的に送信するにはどうすればよいでしょうか? 質問する

大きな (ギガバイト規模の) 引数を持つ関数を Dask で送信したいです。これを行うための最良の方法は何ですか? この関数をさまざまな (小さな) パラメータで何度も実行したいです。

例(悪い例)

これは、concurrent.futures インターフェースを使用します。dask.delayed インターフェースも同様に簡単に使用できます。

x = np.random.random(size=100000000)  # 800MB array
params = list(range(100))             # 100 small parameters

def f(x, param):
    pass

from dask.distributed import Client
c = Client()

futures = [c.submit(f, x, param) for param in params]

しかし、これは予想よりも遅く、メモリ エラーが発生します。

ベストアンサー1

さて、ここで問題なのは、各タスクに numpy 配列が含まれておりx、これが大きいことです。送信する 100 個のタスクごとに、シリアル化してx、スケジューラに送信し、ワーカーに送信するなどする必要があります。

代わりに、配列をクラスターに一度送信します。

[future] = c.scatter([x])

今は、クラスター上に存在するfuture配列を指すトークンです。これで、ローカル クライアント上の numpy 配列ではなく、このリモート future を参照するタスクを送信できます。x

# futures = [c.submit(f, x, param) for param in params]  # sends x each time
futures = [c.submit(f, future, param) for param in params]  # refers to remote x already on cluster

これにより、はるかに高速になり、Dask はデータの移動をより効率的に制御できるようになりました。

すべての作業者にデータを分散する

最終的に配列xをすべてのワーカーに移動する必要があると予想される場合は、配列をブロードキャストして開始することをお勧めします。

[future] = c.scatter([x], broadcast=True)

Dask Delayedを使用する

Futures は dask.delayed でも問題なく動作します。パフォーマンス上の利点はありませんが、このインターフェースを好む人もいます。

# futures = [c.submit(f, future, param) for param in params]

from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)

おすすめ記事