I have 2 implementations of a cost calculating Spark
job, extending following trait
trait Cost {
val calculateCost: Column
def df: DataFrame
lazy val aggregate = df
.withColumn("cost", calculateCost)
}
implementation 1
case class CurrentCost(df: DataFrame) extends Cost {
override val calculateCost = when(includeCost, $"c1" * $"c2").otherwise(lit(0))
}
implementation 2 uses different Column
s to calculate its cost:
case class PreviousCost(df: DataFrame) extends Cost {
override val calculateCost = callSomeUdf($"c3", $"c4")
}
Both are called in similar fashion:
val result = CurrentCost(df).aggregate
This generates run time exceptions depending on which implementation gets called.
For implementation 1:
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: CASE WHEN ((NOT dealerNameIsNull) AND (costType = CURRENT)) THEN (c1 * c2) ELSE 0 END)
- field (class: CostFeatures, name: calculateCost, type: class org.apache.spark.sql.Column)
For implementation 2:
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: UDF(c3, c4))
What is the better approach to abstract away common functionality from concrete implementations?