大きな (ギガバイト規模の) 引数を持つ関数を Dask で送信したいです。これを行うための最良の方法は何ですか? この関数をさまざまな (小さな) パラメータで何度も実行したいです。
例(悪い例)
これは、concurrent.futures インターフェースを使用します。dask.delayed インターフェースも同様に簡単に使用できます。
x = np.random.random(size=100000000) # 800MB array
params = list(range(100)) # 100 small parameters
def f(x, param):
pass
from dask.distributed import Client
c = Client()
futures = [c.submit(f, x, param) for param in params]
しかし、これは予想よりも遅く、メモリ エラーが発生します。
ベストアンサー1
さて、ここで問題なのは、各タスクに numpy 配列が含まれておりx
、これが大きいことです。送信する 100 個のタスクごとに、シリアル化してx
、スケジューラに送信し、ワーカーに送信するなどする必要があります。
代わりに、配列をクラスターに一度送信します。
[future] = c.scatter([x])
今は、クラスター上に存在するfuture
配列を指すトークンです。これで、ローカル クライアント上の numpy 配列ではなく、このリモート future を参照するタスクを送信できます。x
# futures = [c.submit(f, x, param) for param in params] # sends x each time
futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster
これにより、はるかに高速になり、Dask はデータの移動をより効率的に制御できるようになりました。
すべての作業者にデータを分散する
最終的に配列xをすべてのワーカーに移動する必要があると予想される場合は、配列をブロードキャストして開始することをお勧めします。
[future] = c.scatter([x], broadcast=True)
Dask Delayedを使用する
Futures は dask.delayed でも問題なく動作します。パフォーマンス上の利点はありませんが、このインターフェースを好む人もいます。
# futures = [c.submit(f, future, param) for param in params]
from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)