11

I have a dataframe with the following code:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Now checking the logs, I found out that for each row the UDF is executed 3 times. If I add the "test3" from a "test.three" column then the UDF is executed once more.

Can someone explain me why?

Can this be avoid properly (without caching the dataframe after "test" is added, even if this works)?

Shaido
  • 27,497
  • 23
  • 70
  • 73
Rolintocour
  • 2,934
  • 4
  • 32
  • 63
  • What do you mean? You're calling the test function three times. That's why it's being executed three times. Not sure why you're making it a UDF. Why not just make the Map a val? – user4601931 Nov 04 '19 at 15:49
  • 1
    This is just an example to show the behavior of spark. For me "test" is a new column that contains a structure, then accessing any part of the structure shouldn't execute the UDF again. How am I wrong? – Rolintocour Nov 04 '19 at 16:03
  • I tried to print the schema, the DataType of "test" is `Map` and not a Struct. Now instead of returning a Map, if the UDF returns a case class like Test(one String, two: String) then `test`is indeed a Struct but there are always as many executions of the UDF. – Rolintocour Nov 04 '19 at 16:21
  • related : https://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns – Raphael Roth Nov 04 '19 at 19:27
  • caching should work according to this answer : https://stackoverflow.com/a/40962714/1138523 – Raphael Roth Nov 04 '19 at 19:28
  • You have addopted a workaround with side effects. The correct solution is to cache. Why? Think in terms of transformations and actions. Every time you call 'test' or its nested fields, Spark re-calculates its value based on the previous transformations. Caching is the recommended way to avoid that. asNondeterministic is meant for completely different scenarios, where calling your udf twice returns wrong results, rather than causes a performence issue. – Alon Nov 05 '19 at 09:04

1 Answers1

9

If you want to avoid multiple calls to a udf (which is useful especially if the udf is a bottleneck in your job) you can do it as follows:

val testUDF = udf(test _).asNondeterministic()

Basically you tell Spark that your function is not deterministic and now Spark makes sure it is called only once because it is not safe to call it multiple times (each call could possibly return different result).

Also be aware that this trick is not for free, by doing this you are putting some constraints on the optimizer, one side effect of this is for example that Spark optimizer does not push filters through expressions that are not deterministic so you become responsible for optimal position of the filters in your query.

David Vrba
  • 2,984
  • 12
  • 16
  • nice! this answer also belongs here : https://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns – Raphael Roth Nov 04 '19 at 19:26
  • 1
    In my case, the `asNondeterministic` forces the UDF to execute only once. With the `explode(array(myUdf($"id")))` solution, it still get executed twice. – Rolintocour Nov 05 '19 at 07:59
  • @David Vrba What do you mean by? : so you become responsible for optimal position of the filters in your query. – thebluephantom Feb 03 '21 at 10:39
  • @thebluephantom If your expressions are deterministic, the Spark optimizer will push the filters through them. If the expression is non-deterministic (which is the case with udf.asNondeterministic()) the optimizer will not push it, so you better call the filter as soon as possible. – David Vrba Feb 04 '21 at 05:31
  • can u give an example? pls – thebluephantom Feb 04 '21 at 07:24