データセットにカスタムオブジェクトを保存するにはどうすればいいですか? 質問する

データセットにカスタムオブジェクトを保存するにはどうすればいいですか? 質問する

によるとSparkデータセットの紹介:

Spark 2.0 を楽しみにしながら、データセットにいくつかのエキサイティングな改善を計画しています。具体的には、... カスタム エンコーダー - 現在、さまざまなタイプのエンコーダーを自動生成していますが、カスタム オブジェクト用の API を公開したいと考えています。

カスタム型を保存しようとすると、Dataset次のようなエラーが発生します。

データセットに格納されている型のエンコーダーが見つかりません。プリミティブ型 (Int、String など) と製品型 (ケース クラス) は、sqlContext.implicits をインポートすることでサポートされます。_ 他の型のシリアル化のサポートは、将来のリリースで追加される予定です。

または:

Java.lang.UnsupportedOperationException: ... のエンコーダーが見つかりません。

既存の回避策はありますか?


この質問はコミュニティ Wiki の回答のエントリ ポイントとしてのみ存在することに注意してください。質問と回答の両方を自由に更新/改善してください。

ベストアンサー1

アップデート

この回答は今でも有効で参考になりますが、2.2/2.3 以降では、、、、、、およびSetの組み込みエンコーダー サポートが追加され、状況は改善されています。ケース クラスと通常の Scala 型のみを使用して型を作成するSeqことにこだわる場合は、 の暗黙の だけで十分です。MapDateTimestampBigDecimalSQLImplicits


残念ながら、これを解決するのに役立つものはほとんど追加されていません@since 2.0.0Encoders.scalaまたはSQLImplicits.scala主にプリミティブ型に関連するもの(およびケース クラスの調整)を見つけます。まず最初に言うべきことは、現時点ではカスタムクラスエンコーダーに対する優れたサポートはない. それで、次に、現在利用できるものを考慮して、期待できる限りの成果を上げるいくつかのトリックを紹介します。 事前に免責事項として、これは完璧に機能するわけではありません。すべての制限を明確かつ前もって説明するために最善を尽くします。

問題は一体何なのか

データセットを作成する場合、Sparkは「エンコーダ(T型のJVMオブジェクトを内部Spark SQL表現に変換)を必要とします。エンコーダは通常、から暗黙的に自動的に作成されるかSparkSession、またはEncoders「(ドキュメントcreateDatasetエンコーダーはEncoder[T]という形式をとりますT。 はエンコードする型です。最初の提案は を追加することですimport spark.implicits._(これによりこれら2つ目の提案は、暗黙のエンコーダを明示的に渡すことです。これエンコーダ関連の関数のセット。

通常の授業ではエンコーダーは利用できないので、

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

次のような暗黙的なコンパイル時エラーが発生します。

データセットに格納されている型のエンコーダーが見つかりません。プリミティブ型 (Int、String など) と製品型 (ケース クラス) は、sqlContext.implicits をインポートすることでサポートされます。_ 他の型のシリアル化のサポートは、将来のリリースで追加される予定です。

しかし、上記のエラーの原因となった型を を拡張するクラスでラップするとProduct、エラーは実行時に遅延されてしまうため、

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

コンパイルは問題なく完了するが、実行時にエラーが発生する。

java.lang.UnsupportedOperationException: MyObj のエンコーダが見つかりません

The reason for this is that the encoders Spark creates with the implicits are actually only made at runtime (via scala relfection). In this case, all Spark checks at compile time is that the outermost class extends Product (which all case classes do), and only realizes at runtime that it still doesn't know what to do with MyObj (the same problem occurs if I tried to make a Dataset[(Int,MyObj)] - Spark waits until runtime to barf on MyObj). These are central problems that are in dire need of being fixed:

  • some classes that extend Product compile despite always crashing at runtime and
  • there is no way of passing in custom encoders for nested types (I have no way of feeding Spark an encoder for just MyObj such that it then knows how to encode Wrap[MyObj] or (Int,MyObj)).

Just use kryo

The solution everyone suggests is to use the kryo encoder.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

This gets pretty tedious fast though. Especially if your code is manipulating all sorts of datasets, joining, grouping etc. You end up racking up a bunch of extra implicits. So, why not just make an implicit that does this all automatically?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

And now, it seems like I can do almost anything I want (the example below won't work in the spark-shell where spark.implicits._ is automatically imported)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

Or almost. The problem is that using kryo leads to Spark just storing every row in the dataset as a flat binary object. For map, filter, foreach that is enough, but for operations like join, Spark really needs these to be separated into columns. Inspecting the schema for d2 or d3, you see there is just one binary column:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Partial solution for tuples

So, using the magic of implicits in Scala (more in 6.26.3 Overloading Resolution), I can make myself a series of implicits that will do as good a job as possible, at least for tuples, and will work well with existing implicits:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Then, armed with these implicits, I can make my example above work, albeit with some column renaming

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

I haven't yet figured out how to get the expected tuple names (_1, _2, ...) by default without renaming them - if someone else wants to play around with this, this is where the name "value" gets introduced and this is where the tuple names are usually added. However, the key point is that that I now have a nice structured schema:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

So, in summary, this workaround:

  • allows us to get separate columns for tuples (so we can join on tuples again, yay!)
  • we can again just rely on the implicits (so no need to be passing in kryo all over the place)
  • is almost entirely backwards compatible with import spark.implicits._ (with some renaming involved)
  • does not let us join on the kyro serialized binary columns, let alone on fields those may have
  • has the unpleasant side-effect of renaming some of the tuple columns to "value" (if necessary, this can be undone by converting .toDF, specifying new column names, and converting back to a dataset - and the schema names seem to be preserved through joins, where they are most needed).

Partial solution for classes in general

This one is less pleasant and has no good solution. However, now that we have the tuple solution above, I have a hunch the implicit conversion solution from another answer will be a bit less painful too since you can convert your more complex classes to tuples. Then, after creating the dataset, you'd probably rename the columns using the dataframe approach. If all goes well, this is really an improvement since I can now perform joins on the fields of my classes. If I had just used one flat binary kryo serializer that wouldn't have been possible.

Here is an example that does a bit of everything: I have a class MyObj which has fields of types Int, java.util.UUID, and Set[String]. The first takes care of itself. The second, although I could serialize using kryo would be more useful if stored as a String (since UUIDs are usually something I'll want to join against). The third really just belongs in a binary column.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Now, I can create a dataset with a nice schema using this machinery:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

And the schema shows me I columns with the right names and with the first two both things I can join against.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

おすすめ記事