It looks like val1
and Value
actually should be BigDecimal
based on the type being decimal(38,0)
so I am going to make that assumption in the code below.
The quick way to do this would just be to use a good 'ole if-else
statement. This might also be the most performant way depending:
def udfFn(val1: BigDecimal, powVal: Int, mulVal: Int): BigDecimal =
if (val1 != null && powVal != null && mulVal != null) {
val mul = new BigDecimal(mulVal)
val1.scaleByPowerTen(-powVal).multiply(mul)
} else {
null
}
I think this looks a bit ugly myself, if you want to make it nicer reading, it's a job for functional programming! Option
and for
comprehensions to the rescue! (Note that if performance is an issue, this first solution may be your best option)
You could do the following:
def udfFn(val1: BigDecimal, powVal: Int, mulVal: Int): Option[BigDecimal] =
val r = for {
bd1 <- Option(val1)
pow <- Option(powVal)
mul <- Option(mulVal).map(new BigDecimal(_))
} yield (bd1.scaleByPowerTen(-pow).multiply(mul))
The for
comprehension will yield an Option
that is Some
only if every input Option
is Some
otherwise it will be None
.
My personal preference would be to do this with Dataset
instead of DataFrame
, as I think it makes the transformations easier to understand and makes the schema pretty explicit at each step as well as allowing you to write transformations without relying on UDFs, but it's definitely best to do whatever you and/or your organization are more comfortable with. For a Dataset
solution I would make a couple of case classes:
case class NewData(name: Option[String], val1: Option[BigDecimal], powVal: Option[Int], mulVal: Option[Int], finalValue: Option[BigDecimal])
case class SomeData(name: Option[String], val1: Option[BigDecimal], powVal: Option[Int], mulVal: Option[Int]) {
def toNewData: NewData = {
val fv = for {
bd1 <- val1
pow <- powVal
mul <- mulVal.map(new BigDecimal(_))
} yield (bd1.scaleByPowerTen(-pow).multiply(mul))
NewData(name, val1, powVal, mulVal, fv)
}
}
Then the code to do the transformation would be this:
import spark.implicits._
val ds: Dataset[SomeData] = ... // Obtained however you wish
val finalDs: Dataset[NewData] = ds.map(_.toNewData)