次のコード フラグメントは、ディレクトリ リストを取得し、各ファイルで抽出メソッドを呼び出し、結果の薬物オブジェクトを xml にシリアル化するメソッドの一部です。
try(Stream<Path> paths = Files.list(infoDir)) {
paths
.parallel()
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
}
以下は、まったく同じことを実行するまったく同じコードですが、単純な.list()
呼び出しを使用してディレクトリ リストを取得し、.parallelStream()
結果のリストを呼び出します。
Arrays.asList(infoDir.toFile().list())
.parallelStream()
.map(f -> infoDir.resolve(f))
.map(this::extract)
.forEachOrdered(drug -> {
try {
marshaller.write(drug);
} catch (JAXBException ex) {
ex.printStackTrace();
}
});
私のマシンはクアッドコア MacBook Pro、Java v 1.8.0_60 (ビルド 1.8.0_60-b27) です。
約 7000 個のファイルを処理しています。3 回の実行の平均は次のとおりです。
最初のバージョン: あり.parallel()
: 20秒。なし.parallel()
: 41秒
2 番目のバージョン: あり.parallelStream()
: 12 秒。 あり.stream()
: 41 秒。
extract
ストリームから読み取ってすべての負荷の高い作業を実行するメソッドと、最終的な書き込みを実行する呼び出しが変更されていないことを考えると、並列モードでの 8 秒は大きな違いのように思えますwrite
。
ベストアンサー1
問題は、Stream API の現在の実装と、IteratorSpliterator
サイズが不明なソースの の現在の実装が、そのようなソースを並列タスクに不適切に分割することです。 1024 個を超えるファイルがあったのは幸運でしたが、そうでなければ並列化のメリットはまったく得られません。現在の Stream API 実装では、estimateSize()
から返される値が考慮されますSpliterator
。IteratorSpliterator
サイズが不明な はLong.MAX_VALUE
分割前に を返し、そのサフィックスも常に を返しますLong.MAX_VALUE
。その分割戦略は次のとおりです。
MAX_BATCH
現在のバッチ サイズを定義します。現在の式では、1024 要素から開始し、サイズ (33554432 要素) に達するまで算術的に増加します (2048、3072、4096、5120 など) 。- バッチ サイズに達するか、入力がなくなるまで、入力要素 (この場合はパス) を配列に消費します。
ArraySpliterator
作成された配列をプレフィックスとして反復処理し、配列自体をサフィックスとして返します。
7000 個のファイルがあるとします。Stream API は推定サイズを要求し、IteratorSpliterator
を返しますLong.MAX_VALUE
。Stream API は に分割を要求し、配列のIteratorSpliterator
基になる から 1024 個の要素を収集して(推定サイズは 1024)と (推定サイズは依然として)に分割します。 は1024 よりはるかに大きいため、Stream API は小さい部分の分割を試みることなく、大きい部分の分割を続行することを決定します。したがって、全体的な分割ツリーは次のようになります。DirectoryStream
ArraySpliterator
Long.MAX_VALUE
Long.MAX_VALUE
IteratorSpliterator (est. MAX_VALUE elements)
| |
ArraySpliterator (est. 1024 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 2048 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 3072 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 856 elements) IteratorSpliterator (est. MAX_VALUE elements)
|
(split returns null: refuses to split anymore)
その後、実行する並列タスクが 5 つあります。実際には、1024、2048、3072、856、0 の要素が含まれています。最後のチャンクには要素が 0 個ありますが、それでも推定Long.MAX_VALUE
要素数があると報告されるため、Stream API もそれを送信することに注意してくださいForkJoinPool
。問題は、Stream API が、推定サイズがはるかに小さいため、最初の 4 つのタスクをさらに分割しても無駄であると判断することです。そのため、入力が非常に不均等に分割され、最大 4 つの CPU コアが使用されます (実際にはもっと多くのコアがある場合でも)。要素ごとの処理にどの要素に対してもほぼ同じ時間がかかる場合、プロセス全体は最大の部分 (3072 要素) が完了するまで待機することになります。したがって、最大で 7000/3072 = 2.28 倍の高速化が期待できます。したがって、順次処理に 41 秒かかる場合、並列ストリームには約 41/2.28 = 18 秒かかります (実際の数値に近い値です)。
回避策はまったく問題ありません。 を使用すると、Files.list().parallel()
すべての入力Path
要素がメモリ(ArraySpliterator
オブジェクト内)に保存されることに注意してください。したがって、手動で にダンプしても、メモリを無駄にすることはありません。(現在は によって作成されます)List
のような配列ベースのリスト実装は、問題なく均等に分割できるため、さらに高速化されます。ArrayList
Collectors.toList()
なぜこのようなケースは最適化されないのでしょうか? もちろん不可能な問題ではありません (実装はかなり難しいかもしれませんが)。 JDK 開発者にとって優先度の高い問題ではないようです。 このトピックについてはメーリング リストでいくつかの議論がありました。 Paul Sandoz のメッセージを読むことができます。ここここで彼は私の最適化の取り組みについてコメントしています。