私は読むクラスターモードの概要Spark スタンドアロン クラスター内のさまざまなプロセスと並列処理についてはまだ理解できません。
ワーカーは JVM プロセスですか? を実行するbin\start-slave.sh
と、実際には JVM であるワーカーが生成されたことがわかりました。
上記のリンクにあるように、エグゼキュータは、タスクを実行するワーカー ノード上のアプリケーション用に起動されるプロセスです。エグゼキュータも JVM です。
私の質問は次のとおりです:
エグゼキュータはアプリケーションごとに存在します。では、ワーカーの役割は何でしょうか? エグゼキュータと連携して結果をドライバーに返しますか? それともドライバーがエグゼキュータと直接通信しますか? もしそうなら、ワーカーの目的は何でしょうか?
アプリケーションの実行者の数を制御するにはどうすればよいですか?
タスクをエグゼキュータ内で並列実行することはできますか? できる場合、エグゼキュータのスレッド数をどのように構成しますか?
ワーカー、エグゼキュータ、エグゼキュータ コア (--total-executor-cores) の関係は何ですか?
ノードあたりのワーカー数が増えるとはどういう意味ですか?
更新しました
よりよく理解するために例を見てみましょう。
例 1: 5 つのワーカー ノード (各ノードに 8 つのコアがある) を持つスタンドアロン クラスターで、デフォルト設定でアプリケーションを起動します。
例 2例 1 と同じクラスター構成ですが、次の設定でアプリケーションを実行します --executor-cores 10 --total-executor-cores 10。
例 3例 1 と同じクラスター構成ですが、次の設定でアプリケーションを実行します --executor-cores 10 --total-executor-cores 50。
例 4例 1 と同じクラスター構成ですが、次の設定でアプリケーションを実行します --executor-cores 50 --total-executor-cores 50。
例 5例 1 と同じクラスター構成ですが、次の設定でアプリケーションを実行します --executor-cores 50 --total-executor-cores 10。
これらの各例で、エグゼキューターの数はいくつですか? エグゼキューターあたりのスレッド数はいくつですか? コアの数はいくつですか? アプリケーションあたりのエグゼキューターの数はどのように決定されますか? 常にワーカーの数と同じですか?
ベストアンサー1
Spark はマスター/スレーブ アーキテクチャを使用します。図からわかるように、Spark には 1 つの中央コーディネーター (ドライバー) があり、これが多数の分散ワーカー (エグゼキューター) と通信します。ドライバーと各エグゼキューターは、独自の Java プロセスで実行されます。
運転者
ドライバーは、メイン メソッドが実行されるプロセスです。最初にユーザー プログラムをタスクに変換し、その後、エグゼキューターでタスクをスケジュールします。
執行者
エグゼキュータは、特定の Spark ジョブ内の個々のタスクの実行を担当するワーカー ノードのプロセスです。Spark アプリケーションの開始時に起動され、通常はアプリケーションの存続期間中実行されます。タスクの実行が完了すると、その結果がドライバーに送信されます。また、ブロック マネージャーを通じてユーザー プログラムによってキャッシュされる RDD 用のメモリ内ストレージも提供します。
アプリケーション実行フロー
これを念頭に置いて、spark-submit を使用してアプリケーションをクラスターに送信すると、内部では次のことが起こります。
- スタンドアロン アプリケーションが起動し、インスタンスをインスタンス化します
SparkContext
(このときのみ、アプリケーションをドライバーと呼ぶことができます)。 - ドライバー プログラムは、エグゼキューターを起動するためにクラスター マネージャーにリソースを要求します。
- クラスター マネージャーはエグゼキューターを起動します。
- ドライバー プロセスはユーザー アプリケーションを通じて実行されます。RDD 上のアクションと変換に応じて、タスクがエグゼキューターに送信されます。
- 実行者はタスクを実行し、結果を保存します。
- いずれかのワーカーがクラッシュした場合、そのタスクは別のエグゼキュータに送信され、再度処理されます。書籍「Learning Spark: Lightning-Fast Big Data Analysis」では、Spark とフォールト トレランスについて次のように説明しています。
Spark は、失敗したタスクや遅いタスクを再実行することで、失敗したマシンや遅いマシンに自動的に対処します。たとえば、map() 操作のパーティションを実行しているノードがクラッシュした場合、Spark はそれを別のノードで再実行します。また、ノードがクラッシュしなくても、他のノードよりもはるかに遅いだけの場合、Spark は別のノードでタスクの「投機的」コピーを事前に起動し、それが完了したらその結果を取得します。
- ドライバーからの SparkContext.stop() を使用するか、メイン メソッドが終了/クラッシュすると、すべてのエグゼキュータが終了し、クラスター リソースはクラスター マネージャーによって解放されます。
あなたの質問
エグゼキュータが起動されると、エグゼキュータはドライバーに自身を登録し、それ以降は直接通信します。ワーカーは、リソースの可用性をクラスター マネージャーに通知する役割を担います。
YARN クラスターでは、--num-executors でこれを行うことができます。スタンドアロン クラスターでは、spark.executor.cores を操作してワーカーに複数のエグゼキューターを保持できるコアがない限り、ワーカーごとに 1 つのエグゼキューターが取得されます。(@JacekLaskowski が指摘したように、--num-executors は YARN では使用されなくなりました。https://github.com/apache/spark/commit/16b6d18613e150c7038c613992d80a7828413e66)
--executor-cores でエグゼキュータごとのコア数を割り当てることができます。
--total-executor-cores はアプリケーションあたりのエグゼキュータコアの最大数です
ショーン・オーウェンが言ったように糸: 「マシンごとに複数のワーカーを実行する十分な理由はありません」。たとえば、1 台のマシンに複数の JVM が配置されることになります。
アップデート
このシナリオをテストすることはできませんでしたが、ドキュメントによると次のとおりです。
例 1: Spark は、スケジューラによって提供されるコアとエグゼキュータを貪欲に取得します。そのため、最終的には、それぞれ 8 個のコアを持つ 5 個のエグゼキュータが取得されます。
例 2 ~ 5: Spark は、単一のワーカーで要求された数のコアを割り当てることができないため、エグゼキューターは起動されません。