4

I've run into something odd in Spark 2.2 and how it deserializes case classes. For these examples, assume this case class:

case class X(a:Int, b:Int) {
  println("in the constructor!!!")
}

If I have the following map operation, I see both my constructor and the value of 'a' messages in the executor logs.

ds.map(x => {
  val x = X(1, 2)
  println(s"a=${x.a})
}

With the following map operation, I do not see my constructor message but I do see the value of 'a' message in the executor logs. The constructor message is in the driver logs.

val x = X(1, 2)
ds.map(x => println(s"a=${x.a}"))

And I get the same behavior if I use a broadcast variable.

val xBcast = sc.broadcast(X(1, 2))
ds.map(x => println(s"a=${xBcast.value.a}"))

Any idea what's going on? Is Spark serializing each field as needed? I would have expected the whole object to be shipped over and deserialized. With that deserialization I'd expect a constructor call.

When I looked at the encoder code for Products it looks like it gets the necessary fields from the constructor. I guess I was assuming it would use those encoders for this kind of stuff.

I even decompiled my case class's class file and the constructor generated seems reasonable.

  • why are you defining an object in the driver as `val x` but then also referencing elements inside ds as `x`? I'm surprised that works at all, unless the elements in your ds also have a field 'a'. I'd work that to `ds.map(_ => println(s"a=..."))` or more clearly `ds.foreach(_ => println(s"a=.."))` since your output type is Unit anyway. Lastly... try messing around with `val x` vs `def x` vs `lazy val x`. I'd bet constructor for 'val' is in the driver, for 'def' is in executors, and not sure where lazy val ends up... probably driver also, but it's an interesting exercise. – kmh Oct 09 '18 at 17:28
  • The return type being Unit is more laziness in my examples. Assume I'm outputting actual values. I focused on showing how I use the object. It finds x because you can reference outer scope variables in an inner scope in scala. Plus spark knows how to interpret that. If you read up on broadcast variables what I have is what you're supposed to do. The only difference between using broadcast and not is Spark can be much smarter about not re-serializing with broadcast. – Mike Hurley Oct 09 '18 at 18:16
  • Could you provide some details? How do you test this code? In what mode? What is the type of of `ds` and how is it created? – zero323 Oct 09 '18 at 19:24
  • Ds is a DataSet. It's created with the "session.read.parquet(file).as[CaseClass]" pattern. I test it on the cluster in yarn/cluster mode. – Mike Hurley Oct 09 '18 at 20:17

1 Answers1

1

Spark is using Java serialization (available because case classes extend Serializable) by default, which does not require the use of a constructor to deserialize. See this StackOverflow question for details on Java serialization/deserialization.

Note that this reliance on Java serialization can cause issues, as the internal serialization format is not set in stone so JVM version differences can cause deserialization to fail.

Levi Ramsey
  • 18,884
  • 1
  • 16
  • 30
  • That doesn't sound right. Even with RDD API Spark doesn't necessarily use Java serialization, and with Dataset API it shouldn't use it for data serialization at all. – zero323 Oct 09 '18 at 19:22
  • Using Java Serialization seems right. The scala @transient annotation, which works with Java Serialization, correctly makes fields not be serialized. – Mike Hurley Oct 09 '18 at 20:18