クロージャの外で関数を呼び出すと奇妙な動作が発生します:
- 関数がオブジェクト内にある場合、すべてが機能する
- 関数がクラス内にある場合は以下を取得します:
タスクはシリアル化できません: java.io.NotSerializableException: テスト
問題は、コードをオブジェクトではなくクラスに必要とすることです。なぜこのようなことが起こるのか、何か考えはありますか? Scala オブジェクトはシリアル化されていますか (デフォルト?)?
これは動作するコード例です:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
これは動作しない例です:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
ベストアンサー1
RDDはSerialisableインターフェースを拡張しているので、これがタスクの失敗の原因ではありません。これは、RDD
Sparkでシリアル化して回避できるという意味ではありません。NotSerializableException
Spark は分散コンピューティング エンジンであり、その主な抽象化は分散コレクションとして見ることができる復元力のある分散データセット ( RDD ) です。基本的に、RDD の要素はクラスターのノード間で分割されますが、Spark はこれをユーザーから抽象化し、ユーザーが RDD (コレクション) をローカルのコレクションであるかのように操作できるようにします。
あまり詳しく説明するつもりはありませんが、RDD でさまざまな変換 ( map
、flatMap
などfilter
) を実行する場合、変換コード (クロージャ) は次のようになります。
- ドライバーノード上でシリアル化され、
- クラスター内の適切なノードに送信され、
- デシリアライズされた、
- そして最後にノード上で実行される
もちろん、これをローカルで実行することもできます (例のように)。ただし、これらのフェーズはすべて (ネットワーク経由での送信を除いて) 発生します。[これにより、本番環境にデプロイする前にバグをキャッチできます]
2 番目のケースでは、testing
マップ関数内からクラスで定義されたメソッドを呼び出しています。Spark はこれを認識し、メソッドは単独ではシリアル化できないため、クラス全体 testing
をシリアル化しようとします。そのため、別の JVM で実行された場合でもコードは機能します。次の 2 つの可能性があります。
クラス テストをシリアル化可能にして、クラス全体を Spark でシリアル化できるようにします。
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
または、someFunc
メソッドの代わりに関数を作成します (関数は Scala ではオブジェクトです)。これにより、Spark はそれをシリアル化できるようになります。
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
クラスのシリアル化に関する、似ているが同じではない問題が興味深いかもしれません。その問題については、この Spark Summit 2013 プレゼンテーションで読むことができます。
ちなみに、rddList.map(someFunc(_))
を に書き直すこともできますrddList.map(someFunc)
が、どちらもまったく同じです。通常は、冗長性が少なく読みやすい 2 番目の方が好まれます。
編集 (2015-03-15): SPARK-5307でSerializationDebuggerが導入され、Spark 1.3.0 がこれを使用する最初のバージョンです。これは、 NotSerializableException にシリアル化パスを追加します。 NotSerializableException が発生すると、デバッガーはオブジェクト グラフにアクセスしてシリアル化できないオブジェクトへのパスを見つけ、ユーザーがオブジェクトを見つけるのに役立つ情報を構築します。
OP の場合、stdout に出力されるのは次のようになります。
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)