Apache Spark のRDD
とDataFrame
(Spark 2.0.0 DataFrame は の単なる型エイリアスですDataset[Row]
)の違いは何でしょうか?
一方を他方に変換できますか?
ベストアンサー1
まず第一に、
DataFrame
は から進化しましたSchemaRDD
。
はい、Dataframe
と間の変換はRDD
絶対に可能です。
以下にいくつかのサンプルコードスニペットを示します。
df.rdd
はRDD[Row]
以下は、データフレームを作成するためのオプションの一部です。
1)
yourrddOffrow.toDF
は に変換されますDataFrame
。2)
createDataFrame
SQLコンテキストの使用val df = spark.createDataFrame(rddOfRow, schema)
スキーマは以下のオプションから選択できます素晴らしい SO の投稿で説明されているとおりです。
ScalaケースクラスとScalaリフレクションAPIからimport org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
または使用
Encoders
import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema
スキーマは、およびを使用して作成することもできます
StructType
。StructField
val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
実際のところ、Apache Spark API は現在 3 つあります。
RDD
API:
(Resilient Distributed Dataset) API
RDD
は、1.0 リリース以降、Spark に含まれています。APIには、データに対して計算を実行するための()、()、 ()
RDD
などの多くの変換メソッドが用意されています。これらの各メソッドは、変換されたデータを表す新しい を生成します。ただし、これらのメソッドは実行される操作を定義するだけであり、アクション メソッドが呼び出されるまで変換は実行されません。アクション メソッドの例には、() や() などがあります。map
filter
reduce
RDD
collect
saveAsObjectFile
RDDの例:
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
例: RDD で属性をフィルタリングする
rdd.filter(_.age > 21)
DataFrame
API
Spark 1.3 では、
DataFrame
Spark のパフォーマンスとスケーラビリティの向上を目指す Project Tungsten イニシアチブの一環として、新しい API が導入されました。このDataFrame
API では、データを記述するためのスキーマの概念が導入され、Spark がスキーマを管理し、Java シリアル化を使用するよりもはるかに効率的な方法でノード間でのみデータを渡すことができるようになりました。この
DataFrame
APIは、SparkのCatalystオプティマイザが実行できるリレーショナルクエリプランを構築するためのAPIであるため、APIとは根本的に異なりますRDD
。このAPIは、クエリプランの構築に慣れている開発者にとって自然なものです。
SQL スタイルの例:
df.filter("age > 21");
制限事項:コードがデータ属性を名前で参照しているため、コンパイラがエラーを検出することはできません。属性名が正しくない場合、エラーはクエリ プランが作成される実行時にのみ検出されます。
APIのもう 1 つの欠点DataFrame
は、非常に Scala 中心であり、Java をサポートしているものの、サポートが制限されていることです。
たとえば、DataFrame
既存RDD
の Java オブジェクトから を作成する場合、Spark の Catalyst オプティマイザーはスキーマを推測できず、DataFrame 内のオブジェクトはすべてscala.Product
インターフェースを実装していると想定します。Scala はcase class
、このインターフェースを実装しているため、そのまま使用できます。
Dataset
API
Dataset
Spark 1.6 で API プレビューとしてリリースされたこのAPI は、使い慣れたオブジェクト指向プログラミング スタイルとRDD
API のコンパイル時の型安全性、そして Catalyst クエリ オプティマイザーのパフォーマンス上の利点という両方の長所を提供することを目指しています。データセットも API と同じ効率的なオフヒープ ストレージ メカニズムを使用しますDataFrame
。データのシリアル化に関しては、 API には、JVM 表現 (オブジェクト) と Spark の内部バイナリ形式を変換するエンコーダー
Dataset
の概念があります。Spark には、オフヒープ データと対話するためのバイト コードを生成し、オブジェクト全体をデシリアル化することなく個々の属性へのオンデマンド アクセスを提供するという点で非常に高度な組み込みエンコーダーがあります。Spark はまだカスタム エンコーダーを実装するための API を提供していませんが、将来のリリースで提供される予定です。さらに、
Dataset
API は Java と Scala の両方で同様に機能するように設計されています。Java オブジェクトを操作する場合、それらが完全に Bean に準拠していることが重要です。
Dataset
API SQL スタイルの例:
dataset.filter(_.age < 21);
カタリストレベルフロー。(Spark Summit の DataFrame と Dataset の謎を解くプレゼンテーション)
さらに読む...データブリックス記事 - 3 つの Apache Spark API の物語: RDD とデータフレームおよびデータセット