4

The spark docs says that

By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task.

If I create a Java SimpleDateFormat and use it in RDD operations, I got a exception NumberFormatException: multiple points.

I know SimpleDateFormat is not thread-safe. But as said by spark docs, this SimpleDateFormat object is copied to each task, so there should not be multiple threads accessing this object.

I speculate that all task in one executor shares the same SimpleDateFormate object, am I right?


This program prints the same object java.text.SimpleDateFormat@f82ede60

object NormalVariable {

  // create dateFormat here doesn't change
  // val dateFormat = new SimpleDateFormat("yyyy.MM.dd")

  def main(args: Array[String]) {

    val dateFormat = new SimpleDateFormat("yyyy.MM.dd")

    val conf = new SparkConf().setAppName("Spark Test").setMaster("local[*]")
    val spark = new SparkContext(conf)

    val dates = Array[String]("1999.09.09", "2000.09.09", "2001.09.09", "2002.09.09", "2003.09.09")

    println(dateFormat)

    val resultes = spark.parallelize(dates).map { i =>
      println(dateFormat)
      dateFormat.parse(i)
    }.collect()

    println(resultes.mkString(" "))
    spark.stop()
  }
}
DNA
  • 42,007
  • 12
  • 107
  • 146
moshangcheng
  • 259
  • 3
  • 14
  • Could you add the code to the question ? – maasg Mar 23 '15 at 12:32
  • I added my code, could you check my code, thanks. – moshangcheng Mar 23 '15 at 12:55
  • I ran your code in Spark shell without exceptions. Where do you get an exception, exactly? – pzecevic Mar 23 '15 at 15:50
  • @pzecevic it's a 'heisenbug' a concurrent change to the non-thread safe `SimpleDateFormat` – maasg Mar 23 '15 at 15:57
  • I presume that this has to do with how Spark serializes the closures, the [ClosureCleaner](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala) and singleton Scala objects in the JVM, but exactly what's going on goes beyond me. Probably @joshrosen could provide an authoritative answer – maasg Mar 24 '15 at 10:01
  • In the meantime, I think you want to create an instance of `SimpleDateFormat` within the closure to solve this problem. `SimpleDateFormat` uses a mutable `Calendar` instance inside. It's 'mutable state' in disguise. – maasg Mar 24 '15 at 10:03
  • Thanks for the response @maasg. We did fix the problem using that way. I'm just curious about Spark's behaivor, which is not consistent with its document. – moshangcheng Mar 24 '15 at 11:03
  • I found this question+response to be quite helpful in understanding what's going on: http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers – maasg Mar 24 '15 at 12:04

1 Answers1

1

As you know, SimpleDateFormat is not thread safe.

If Spark is using a single core per executor (--executor-cores 1) then everything should work fine. But as soon as you configure more than one core per executor, your code is now running multi-threaded, the SimpleDateFormat is shared by multiple Spark tasks concurrently, and is likely to corrupt the data and throw various exceptions.

To fix this, you can use one of the same approaches as for non-Spark code, namely ThreadLocal, which ensures you get one copy of the SimpleDateFormat per thread.

In Java, this looks like:

public class DateFormatTest {

  private static final ThreadLocal<DateFormat> df = new ThreadLocal<DateFormat>(){
    @Override
    protected DateFormat initialValue() {
        return new SimpleDateFormat("yyyyMMdd");
    }
  };

  public Date convert(String source) throws ParseException{
    Date d = df.get().parse(source);
    return d;
  }
}

and the equivalent code in Scala works just the same - shown here as a spark-shell session:

import java.text.SimpleDateFormat

object SafeFormat extends ThreadLocal[SimpleDateFormat] {
  override def initialValue = {
    new SimpleDateFormat("yyyyMMdd HHmmss")
  }
}

sc.parallelize(Seq("20180319 162058")).map(SafeFormat.get.parse(_)).collect

    res6: Array[java.util.Date] = Array(Mon Mar 19 16:20:58 GMT 2018)

So you would define the ThreadLocal at the top level of your job class or object, then call df.get to obtain the SimpleDateFormat within your RDD operations.

See:

DNA
  • 42,007
  • 12
  • 107
  • 146