1

The question here is how do you re-use objects of a UDF but avoid race conditions?

I am using a UDF within my spark application and the Unit tests seem non deterministic due to a race condition. Sometimes they pass sometimes they fail...

I have tried to force the re-use of objects by creating them and passing them to the UDF for efficiency. However it seems that seperate "tests" sharing the same spark context and JVM are using these objects and causing errors.

def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={
    sdfOut.format(sdfIn.parse(input))
  }

  val datePartitionFormat = new SimpleDateFormat("yyyyMMdd")
  val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd")
  val validDateFormat = new SimpleDateFormat("yyyy-MM-dd")

  val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat))
  val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat))

Sometimes when I run my unit tests I get the following error with this function:

17/01/13 11:45:45 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.NumberFormatException: multiple points at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at java.text.DigitList.getDouble(DigitList.java:169) at java.text.DecimalFormat.parse(DecimalFormat.java:2056) at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1867) at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514) at java.text.DateFormat.parse(DateFormat.java:364) at com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviouslySeenDomainsStartOfDayDF$.reformatDate(mDnsPreviouslySeenDomainsStartOfDayDF.scala:22)

I use the function like so:

val df = df2
  .filter(
    datediff(
      to_date(partitionToDateUDF($"dt"))
      ,to_date(dTStampToDate($"d_last_seen"))
    ) < 90
  )

and upon debugging have found the input "df2" to be:

+-----------+--------+-------------------------+--------------------------------+
|d_last_seen|      dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")|
+-----------+--------+-------------------------+--------------------------------+
| 2016/11/02|20161102|2016-11-02               |2016-11-02                      |
| 2016/11/01|20161102|2016-11-02               |2016-11-01                      |
+-----------+--------+-------------------------+--------------------------------+

I use conf.setMaster("local[2]"), could it be that spark uses threads and therefore shares the same JVM when running locally, however this wouldn't happen when deployed as the seperate executors will have their own JVM and therefore their own instantiations of the objects?

Adam Pitt
  • 128
  • 10

1 Answers1

2

SimpleDateFormat is not thread-safe (see for example Why is Java's SimpleDateFormat not thread-safe?). That means that if you use it in any UDF (even in one single Spark job) you might get unexpected results, because spark will use your UDF in several tasks which run on separate threads ending up with multiple threads accessing it at the same time. This is true for both local mode and actual distributed clusters - a single copy will be used by several threads on each executor.

To overcome this - simply use a different formatter which is thread-safe, e.g. Joda's DateTimeFormatter.

Community
  • 1
  • 1
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • 1
    Thank you Tzach, I would like to add that the answer to the overall question here is: You have to be thread safe within Spark UDFs as multiple tasks run on multiple threads on each executor. The solution Tzach has provided me with is threadsafe. – Adam Pitt Jan 13 '17 at 12:28