Spark 1.4.0 で Spark SQL と DataFrames を使い始めました。Scala で DataFrames にカスタム パーティショナーを定義したいのですが、その方法がわかりません。
私が作業しているデータ テーブルの 1 つには、次の例のようなアカウント別のトランザクションのリストが含まれています。
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
少なくとも最初は、計算のほとんどはアカウント内のトランザクション間で行われます。そのため、アカウントのすべてのトランザクションが同じ Spark パーティションに含まれるようにデータをパーティション分割する必要があります。
しかし、これを定義する方法が見つかりません。DataFrame クラスには、「repartition(Int)」というメソッドがあり、作成するパーティションの数を指定できます。しかし、RDD に指定できるような、DataFrame のカスタム パーティショナーを定義するために使用できるメソッドが見つかりません。
ソース データは Parquet に保存されます。DataFrame を Parquet に書き込むときに、パーティション分割する列を指定できることはわかりました。そのため、おそらく Parquet に、そのデータを「アカウント」列でパーティション分割するように指示できるでしょう。しかし、アカウントは数百万ある可能性があり、Parquet を正しく理解していれば、アカウントごとに個別のディレクトリが作成されるため、これは合理的な解決策とは思えませんでした。
アカウントのすべてのデータが同じパーティションに含まれるように、Spark でこの DataFrame をパーティション分割する方法はありますか?
ベストアンサー1
スパーク >= 2.3.0
スパーク-22614範囲パーティションを公開します。
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
スパーク-22389外部フォーマットのパーティションを公開するデータ ソース API v2。
スパーク >= 1.6.0
Spark >= 1.6 では、クエリとキャッシュに列によるパーティション分割を使用できます。参照:スパーク-11410そしてスパーク-4849使用repartition
方法:
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
RDDs
Spark Dataset
( Dataset[Row]
akaを含む) とは異なり、DataFrame
現時点ではカスタム パーティショナーは使用できません。通常は人工的なパーティション列を作成することで対処できますが、同じ柔軟性は得られません。
Spark < 1.6.0:
1つできることは、入力データを事前に分割してからDataFrame
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
DataFrame
からの作成にはRDD
単純なマップ フェーズのみが必要なので、既存のパーティション レイアウトは保持される必要があります*:
assert(df.rdd.partitions == partitioned.partitions)
同じ方法で既存のパーティションを再分割できますDataFrame
:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
どうやら不可能ではないようです。それが意味をなすかどうかという疑問が残ります。ほとんどの場合、意味をなさないと私は主張します。
再パーティション化はコストのかかるプロセスです。一般的なシナリオでは、ほとんどのデータをシリアル化、シャッフル、デシリアル化する必要があります。一方、事前にパーティション化されたデータからメリットを得られる操作の数は比較的少なく、内部 API がこの特性を活用するように設計されていない場合はさらに制限されます。
- いくつかのシナリオでは参加しますが、内部のサポートが必要になります。
- ウィンドウ関数は、一致するパーティショナーで呼び出されます。上記と同じですが、単一のウィンドウ定義に制限されます。ただし、内部的にすでにパーティション化されているため、事前のパーティション化は冗長になる可能性があります。
- 単純な集計- 一時バッファのメモリフットプリントを削減することは可能ですが、全体的なコストは大幅に高くなります。 (現在の動作) と(事前パーティション分割)
GROUP BY
とほぼ同等です。実際には役に立たない可能性があります。groupByKey.mapValues(_.reduce)
reduceByKey
- によるデータ圧縮
SqlContext.cacheTable
。ランレングス符号化を使用しているようなので、適用するとOrderedRDDFunctions.repartitionAndSortWithinPartitions
圧縮率が向上する可能性があります。
パフォーマンスはキーの分布に大きく依存します。分布が偏っていると、リソースの使用率が最適ではなくなります。最悪の場合、ジョブを完了することがまったく不可能になります。
- 高レベルの宣言型APIを使用する主な目的は、低レベルの実装の詳細から自分自身を分離することです。すでに述べたように、フォローそしてロミ・クンツマン最適化は触媒オプティマイザーこれはかなり洗練されたものであり、内部をもっと深く調べなければ、簡単に改善できるとは思えません。
関連概念
JDBC ソースによるパーティション分割:
JDBCデータソースのサポートpredicates
口論次のように使用できます。
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
述語ごとに 1 つの JDBC パーティションが作成されます。個々の述語を使用して作成されたセットが分離されていない場合は、結果のテーブルに重複が表示されることに注意してください。
partitionBy
方法DataFrameWriter
:
Sparkは、書き込み時にデータを「パーティション」するために使用できるメソッドをDataFrameWriter
提供します。提供された列セットを使用して書き込み時にデータを分離します。partitionBy
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
これにより、キーに基づくクエリの読み取り時に述語プッシュダウンが有効になります。
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
しかし、これは と同等ではありませんDataFrame.repartition
。特に次のような集計では:
val cnts = df1.groupBy($"k").sum()
引き続き以下が必要となりますTungstenExchange
:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
方法DataFrameWriter
(Spark >= 2.0):
bucketBy
は と同様の用途がありますpartitionBy
が、テーブル ( ) に対してのみ使用できますsaveAsTable
。バケット情報は結合を最適化するために使用できます。
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
* によるパーティションレイアウト私が言いたいのは、データ分散だけです。RDDpartitioned
にはパーティショナーがなくなりました。** 早期投影がないと仮定します。集約が列の小さなサブセットのみをカバーする場合、おそらく何のメリットもありません。