DataFrame のパーティションをどのように定義しますか? 質問する

DataFrame のパーティションをどのように定義しますか? 質問する

Spark 1.4.0 で Spark SQL と DataFrames を使い始めました。Scala で DataFrames にカスタム パーティショナーを定義したいのですが、その方法がわかりません。

私が作業しているデータ テーブルの 1 つには、次の例のようなアカウント別のトランザクションのリストが含まれています。

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

少なくとも最初は、計算のほとんどはアカウント内のトランザクション間で行われます。そのため、アカウントのすべてのトランザクションが同じ Spark パーティションに含まれるようにデータをパーティション分割する必要があります。

しかし、これを定義する方法が見つかりません。DataFrame クラスには、「repartition(Int)」というメソッドがあり、作成するパーティションの数を指定できます。しかし、RDD に指定できるような、DataFrame のカスタム パーティショナーを定義するために使用できるメソッドが見つかりません。

ソース データは Parquet に保存されます。DataFrame を Parquet に書き込むときに、パーティション分割する列を指定できることはわかりました。そのため、おそらく Parquet に、そのデータを「アカウント」列でパーティション分割するように指示できるでしょう。しかし、アカウントは数百万ある可能性があり、Parquet を正しく理解していれば、アカウントごとに個別のディレクトリが作成されるため、これは合理的な解決策とは思えませんでした。

アカウントのすべてのデータが同じパーティションに含まれるように、Spark でこの DataFrame をパーティション分割する方法はありますか?

ベストアンサー1

スパーク >= 2.3.0

スパーク-22614範囲パーティションを公開します。

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

スパーク-22389外部フォーマットのパーティションを公開するデータ ソース API v2

スパーク >= 1.6.0

Spark >= 1.6 では、クエリとキャッシュに列によるパーティション分割を使用できます。参照:スパーク-11410そしてスパーク-4849使用repartition方法:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

RDDsSpark Dataset( Dataset[Row]akaを含む) とは異なり、DataFrame現時点ではカスタム パーティショナーは使用できません。通常は人工的なパーティション列を作成することで対処できますが、同じ柔軟性は得られません。

Spark < 1.6.0:

1つできることは、入力データを事前に分割してからDataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

DataFrameからの作成にはRDD単純なマップ フェーズのみが必要なので、既存のパーティション レイアウトは保持される必要があります*:

assert(df.rdd.partitions == partitioned.partitions)

同じ方法で既存のパーティションを再分割できますDataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

どうやら不可能ではないようです。それが意味をなすかどうかという疑問が残ります。ほとんどの場合、意味をなさないと私は主張します。

  1. 再パーティション化はコストのかかるプロセスです。一般的なシナリオでは、ほとんどのデータをシリアル化、シャッフル、デシリアル化する必要があります。一方、事前にパーティション化されたデータからメリットを得られる操作の数は比較的少なく、内部 API がこの特性を活用するように設計されていない場合はさらに制限されます。

    • いくつかのシナリオでは参加しますが、内部のサポートが必要になります。
    • ウィンドウ関数は、一致するパーティショナーで呼び出されます。上記と同じですが、単一のウィンドウ定義に制限されます。ただし、内部的にすでにパーティション化されているため、事前のパーティション化は冗長になる可能性があります。
    • 単純な集計- 一時バッファのメモリフットプリントを削減することは可能ですが、全体的なコストは大幅に高くなります。 (現在の動作) と(事前パーティション分割)GROUP BYとほぼ同等です。実際には役に立たない可能性があります。groupByKey.mapValues(_.reduce)reduceByKey
    • によるデータ圧縮SqlContext.cacheTable。ランレングス符号化を使用しているようなので、適用するとOrderedRDDFunctions.repartitionAndSortWithinPartitions圧縮率が向上する可能性があります。
  2. パフォーマンスはキーの分布に大きく依存します。分布が偏っていると、リソースの使用率が最適ではなくなります。最悪の場合、ジョブを完了することがまったく不可能になります。

  3. 高レベルの宣言型APIを使用する主な目的は、低レベルの実装の詳細から自分自身を分離することです。すでに述べたように、フォローそしてロミ・クンツマン最適化は触媒オプティマイザーこれはかなり洗練されたものであり、内部をもっと深く調べなければ、簡単に改善できるとは思えません。

関連概念

JDBC ソースによるパーティション分割:

JDBCデータソースのサポートpredicates口論次のように使用できます。

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

述語ごとに 1 つの JDBC パーティションが作成されます。個々の述語を使用して作成されたセットが分離されていない場合は、結果のテーブルに重複が表示されることに注意してください。

partitionBy方法DataFrameWriter:

Sparkは、書き込み時にデータを「パーティション」するために使用できるメソッドをDataFrameWriter提供します。提供された列セットを使用して書き込み時にデータを分離します。partitionBy

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

これにより、キーに基づくクエリの読み取り時に述語プッシュダウンが有効になります。

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

しかし、これは と同等ではありませんDataFrame.repartition。特に次のような集計では:

val cnts = df1.groupBy($"k").sum()

引き続き以下が必要となりますTungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy方法DataFrameWriter(Spark >= 2.0):

bucketByは と同様の用途がありますpartitionByが、テーブル ( ) に対してのみ使用できますsaveAsTable。バケット情報は結合を最適化するために使用できます。

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* によるパーティションレイアウト私が言いたいのは、データ分散だけです。RDDpartitionedにはパーティショナーがなくなりました。** 早期投影がないと仮定します。集約が列の小さなサブセットのみをカバーする場合、おそらく何のメリットもありません。

おすすめ記事