Spark: DataFrame の UDF ではタスクがシリアル化できない 質問する

Spark: DataFrame の UDF ではタスクがシリアル化できない 質問する

org.apache.spark.SparkException: Task not serializableSpark 1.4.1 で以下を実行しようとすると、次のエラーが発生します。

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat

object ConversionUtils {
  val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")

  def tsUTC(s: String): Timestamp = new Timestamp(iso8601.parse(s).getTime)

  val castTS = udf[Timestamp, String](tsUTC _)
}

val df = frame.withColumn("ts", ConversionUtils.castTS(frame("ts_str")))
df.first

ここで、は 内に存在するframeです。そのデータ フレームには問題はありません。DataFrameHiveContext

整数用の同様のUDFを持っていますが、問題なく動作します。しかし、タイムスタンプ付きのUDFは問題を引き起こすようです。ドキュメンテーションjava.sql.TimeStampは を実装しているので、Serializableそれは問題ではありません。 についても同じことが言えますSimpleDateFormatここ

これにより、問題の原因は UDF であると考えられます。ただし、何をどのように修正すればよいかはわかりません。

トレースの関連セクション:

Caused by: java.io.NotSerializableException: ...
Serialization stack:
        - object not serializable (class: ..., value: ...$ConversionUtils$@63ed11dd)
        - field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$)
        - object (class ...$ConversionUtils$$anonfun$3, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683))
        - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146)
        - element of array (index: 35)
        - array (class [Ljava.lang.Object;, size 36)
        - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
        - object (class scala.collection.mutable.ArrayBuffer,

ベストアンサー1

試す:

object ConversionUtils extends Serializable {
  ...
}

おすすめ記事