単一のジョブで Spark を使用して、キーに応じて複数の出力に書き込む方法を教えてください。
関連している:キーによる複数の出力への書き込み Scalding Hadoop、1つのMapReduceジョブ
例えば
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
確実にするcat prefix/1
のは
a
b
そしてcat prefix/2
c
編集:最近、完全なインポート、ピンプ、圧縮コーデックを含む新しい回答を追加しました。https://stackoverflow.com/a/46118044/1586965、これは以前の回答に加えて役立つかもしれません。
ベストアンサー1
Spark 1.4以降を使用している場合は、データフレームAPI(DataFramesはSpark 1.3で導入されましたが、partitionBy()
必要なのは1.4で導入。
RDD から始める場合は、まずそれを DataFrame に変換する必要があります。
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
Python では、同じコードは次のようになります。
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
DataFrame を取得したら、特定のキーに基づいて複数の出力に書き込むのは簡単です。さらに、これが DataFrame API の優れた点ですが、コードは Python、Scala、Java、R でほぼ同じです。
people_df.write.partitionBy("number").text("people")
必要に応じて、他の出力形式も簡単に使用できます。
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
これらの各例では、Spark は DataFrame をパーティション分割したキーごとにサブディレクトリを作成します。
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh