The question here is how do you re-use objects of a UDF but avoid race conditions?
I am using a UDF within my spark application and the Unit tests seem non deterministic due to a race condition. Sometimes they pass sometimes they fail...
I have tried to force the re-use of objects by creating them and passing them to the UDF for efficiency. However it seems that seperate "tests" sharing the same spark context and JVM are using these objects and causing errors.
def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={
sdfOut.format(sdfIn.parse(input))
}
val datePartitionFormat = new SimpleDateFormat("yyyyMMdd")
val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd")
val validDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat))
val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat))
Sometimes when I run my unit tests I get the following error with this function:
17/01/13 11:45:45 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.NumberFormatException: multiple points at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at java.text.DigitList.getDouble(DigitList.java:169) at java.text.DecimalFormat.parse(DecimalFormat.java:2056) at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1867) at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514) at java.text.DateFormat.parse(DateFormat.java:364) at com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviouslySeenDomainsStartOfDayDF$.reformatDate(mDnsPreviouslySeenDomainsStartOfDayDF.scala:22)
I use the function like so:
val df = df2
.filter(
datediff(
to_date(partitionToDateUDF($"dt"))
,to_date(dTStampToDate($"d_last_seen"))
) < 90
)
and upon debugging have found the input "df2" to be:
+-----------+--------+-------------------------+--------------------------------+
|d_last_seen| dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")|
+-----------+--------+-------------------------+--------------------------------+
| 2016/11/02|20161102|2016-11-02 |2016-11-02 |
| 2016/11/01|20161102|2016-11-02 |2016-11-01 |
+-----------+--------+-------------------------+--------------------------------+
I use conf.setMaster("local[2]"), could it be that spark uses threads and therefore shares the same JVM when running locally, however this wouldn't happen when deployed as the seperate executors will have their own JVM and therefore their own instantiations of the objects?