キーによる複数の出力への書き込み Spark - 1 つの Spark ジョブ 質問する

キーによる複数の出力への書き込み Spark - 1 つの Spark ジョブ 質問する

単一のジョブで Spark を使用して、キーに応じて複数の出力に書き込む方法を教えてください。

関連している:キーによる複数の出力への書き込み Scalding Hadoop、1つのMapReduceジョブ

例えば

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

確実にするcat prefix/1のは

a
b

そしてcat prefix/2

c

編集:最近、完全なインポート、ピンプ、圧縮コーデックを含む新しい回答を追加しました。https://stackoverflow.com/a/46118044/1586965、これは以前の回答に加えて役立つかもしれません。

ベストアンサー1

Spark 1.4以降を使用している場合は、データフレームAPI(DataFramesはSpark 1.3で導入されましたが、partitionBy()必要なのは1.4で導入

RDD から始める場合は、まずそれを DataFrame に変換する必要があります。

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

Python では、同じコードは次のようになります。

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

DataFrame を取得したら、特定のキーに基づいて複数の出力に書き込むのは簡単です。さらに、これが DataFrame API の優れた点ですが、コードは Python、Scala、Java、R でほぼ同じです。

people_df.write.partitionBy("number").text("people")

必要に応じて、他の出力形式も簡単に使用できます。

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

これらの各例では、Spark は DataFrame をパーティション分割したキーごとにサブディレクトリを作成します。

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh

おすすめ記事