-1

Say I have a dataframe with multiple columns of possibly various types. I need to write a UDF that takes inputs from multiple columns, does a fairly complicated computation and returns result (say a string).

val dataframe = Seq( (1.0, Array(0, 2, 1), Array(0, 2, 3), 23.0, 21.0),
                     (1.0, Array(0, 7, 1), Array(1, 2, 3), 42.0, 41.0)).toDF(
                     "c", "a1", "a2", "t1", "t2")

Eg: ("c" * sum("a1") + sum("a2")).toString + "t1".toString

In actuality, the computation is lengthy and arrays have about a million elements. I am fairly new to Spark and would be grateful if a sample code or a pointer to resource (with Scala examples) is provided.

TIA

xinit
  • 147
  • 9
  • Computation is the sum of the length of your 2 array-columns? – Cesar A. Mostacero Feb 11 '20 at 17:38
  • Summation of all elements of the array. Apologies for the vagueness. It's only for proof of concept. – xinit Feb 11 '20 at 17:40
  • 1
    Take a look at [this answer](https://stackoverflow.com/a/43799760/2751573). – Andrew Feb 11 '20 at 17:58
  • Thank you. But earlier I tried and as warned in the linked answer, I suspect the deserialization caused performance problems that I am trying to solve. Also it does not have examples for working with multiple columns. [link](https://stackoverflow.com/questions/60169876/does-spark-data-set-method-serialize-the-computation-itself) – xinit Feb 11 '20 at 18:11

1 Answers1

1

here an example UDF:

val udf_doComputation = udf((c:Double, a1:Seq[Int],a2:Seq[Int],t1:Double) => {
      // your complex computation goes here
      (c*a1.sum+a2.sum).toString() + t1.toString()
    })

 dataframe
   .withColumn("result",udf_doComputation($"c",$"a1",$"a2",$"t1"))
   .show()

gives:

+---+---------+---------+----+----+--------+
|  c|       a1|       a2|  t1|  t2|  result|
+---+---------+---------+----+----+--------+
|1.0|[0, 2, 1]|[0, 2, 3]|23.0|21.0| 8.023.0|
|1.0|[0, 7, 1]|[1, 2, 3]|42.0|41.0|14.042.0|
+---+---------+---------+----+----+--------+

Note that the variable names of the UDF don't need to match the column names, but the types must match:

  • primitives of type A map directely to A. But there are several valid mappings, e.g. double in the dataframe map to either Double or java.lang.Double etc. But you cannot map to Option[A]! So if your input may be null, you need to use corresponding types from java.lang.*...
  • array of primitives of type A map to Seq[A], e.g. array<int> map to Seq[Int]. Concrete type will be WrappedArray, so mapping to this or IndexedSeq would also work. Important is to know the the runtime type is indexed.
  • struct map to Row
  • array<struct> map to Seq[Row]
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145