5

Using Spark 1.6.1 I want to call the number of times a UDF is called. I want to do this because I have a very expensive UDF (~1sec per call) and I suspect the UDF being called more often than the number of records in my dataframe, making my spark job slower than necessary.

Although I could not reproduce this situation, I came up with a simple example showing that the number of calls to the UDF seems to be different (here: less) than the number of rows, how can that be?

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf

object Demo extends App {
  val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._


  val callCounter = sc.accumulator(0)

  val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")

  println(df.count) //  gives 10000

  val myudf = udf((d:Int) => {callCounter.add(1);d})

  val res = df.withColumn("result",myudf($"value")).cache

  println(res.select($"result").collect().size) // gives 10000
  println(callCounter.value) // gives 9941

}

If using an accumulator is not the right way to call the counts of the UDF, how else could I do it?

Note: In my actual Spark-Job, get a call-count which is about 1.7 times higher than the actual number of records.

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • I tried your same code, its printing 10000 as callcounter, all the `println`s are printing the same number, i am using spark 2.0 – Shankar Oct 29 '16 at 06:12
  • :I can able to re-produce when i change my master as `local[*]` instead of local, when i tried with local its printing correctly. When i tried with local[*], it was printing 9996 instead of 10000 – Shankar Oct 29 '16 at 06:14
  • Is it a known problem when we use accumulator for this kind of case? when we use local[*], why its not counting it properly? – Shankar Oct 29 '16 at 06:26

1 Answers1

1

Spark applications should define a main() method instead of extending scala.App. Subclasses of scala.App may not work correctly.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf

object Demo extends App {
    def main(args: Array[String]): Unit = {
         val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
         val sc = new SparkContext(conf)
         // [...]
    }   
}

This should solve your problem.

eliasah
  • 39,588
  • 11
  • 124
  • 154
  • thanks, this solves the issue. But in my real application, I don't have a main method (we use spark notebook). Maybe I manage to reproduce this behaviour in a small example, I will then ask a new question. In short, I join two dataframes (standard inner join) and then call my udf using withColumn. The udf gets called multiple times for the same row – Raphael Roth Oct 29 '16 at 08:37
  • I'm not sure how spark-notebooks works to be honest. I'll need to take a look at it when I have some time. – eliasah Oct 29 '16 at 08:39
  • here is the new question: http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns – Raphael Roth Oct 29 '16 at 15:18