0

I have 2 implementations of a cost calculating Spark job, extending following trait

trait Cost {

  val calculateCost: Column

  def df: DataFrame

  lazy val aggregate = df
    .withColumn("cost", calculateCost)

}

implementation 1

case class CurrentCost(df: DataFrame) extends Cost {
  override val calculateCost = when(includeCost, $"c1" * $"c2").otherwise(lit(0))
}

implementation 2 uses different Columns to calculate its cost:

case class PreviousCost(df: DataFrame) extends Cost {
  override val calculateCost = callSomeUdf($"c3", $"c4")
}

Both are called in similar fashion:

val result = CurrentCost(df).aggregate

This generates run time exceptions depending on which implementation gets called.

For implementation 1:

Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.Column, value: CASE WHEN ((NOT dealerNameIsNull) AND (costType = CURRENT)) THEN (c1 * c2) ELSE 0 END)
    - field (class: CostFeatures, name: calculateCost, type: class org.apache.spark.sql.Column)

For implementation 2:

Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.Column, value: UDF(c3, c4))

What is the better approach to abstract away common functionality from concrete implementations?

baitmbarek
  • 2,440
  • 4
  • 18
  • 26
Raf
  • 842
  • 1
  • 9
  • 25
  • Hi Raf, can you share a bit more code, to show how you're using your case classes ? Might need Datasets API of just case objects depending on your usage. – baitmbarek Dec 14 '19 at 08:54
  • I see. Both these case classes take as input a whole bunch of `Dataframe`s so I cannot use case objects – Raf Dec 14 '19 at 12:47
  • Thank you Raf for this precision. If so, why using case classes instead of simple classes ? They're only used by the driver to create the right topology so they don't need to be compared or serialized, right ? – baitmbarek Dec 14 '19 at 12:49
  • I edited the question a bit to make it clear how the case class are used. I expect similar issues with classes but I should try that out – Raf Dec 14 '19 at 12:58
  • Does this answer your question? [Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects](https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou) – cchantep Dec 14 '19 at 13:03

1 Answers1

0

Raf, I can't reproduce the serialization problem with the below code :

sealed trait Cost {
  val calculateCost: Column
  def df: DataFrame
  def aggregate = df.withColumn("cost", calculateCost)
}

case class CurrentCost(df: DataFrame) extends Cost {
  import df.sparkSession.implicits._
  override val calculateCost = when($"includeCost", $"c1" * $"c2").otherwise(lit(0))
}
case class PreviousCost(df: DataFrame) extends Cost {
  import df.sparkSession.implicits._
  override val calculateCost = MyUdfs.myAwesomeUdf($"c1")
}

object MyUdfs{
  val myAwesomeUdf = udf{i: Int => i * 1.1F}
}

object MyMain extends App {

  val spark = SparkSession.builder().config(new SparkConf().setMaster("local[4]").setAppName("azeaz")).getOrCreate

  import spark.implicits._

  val costDF = Seq(
    (1,1.2,true),
    (2,1.1,true),
    (3,1.1,false)
  ).toDF("c1","c2","includeCost")

  CurrentCost(costDF).aggregate.show(false)
  PreviousCost(costDF).aggregate.show(false)

}

Getting both outputs :

+---+---+-----------+----+
|c1 |c2 |includeCost|cost|
+---+---+-----------+----+
|1  |1.2|true       |1.2 |
|2  |1.1|true       |2.2 |
|3  |1.1|false      |0.0 |
+---+---+-----------+----+

+---+---+-----------+---------+
|c1 |c2 |includeCost|cost     |
+---+---+-----------+---------+
|1  |1.2|true       |1.1      |
|2  |1.1|true       |2.2      |
|3  |1.1|false      |3.3000002|
+---+---+-----------+---------+

Did I miss something ?

baitmbarek
  • 2,440
  • 4
  • 18
  • 26