It seems that using spark.read.json
within a function causes that function to run twice - once when it is cached, and then again when the result is counted.
Example code:
import org.apache.spark.sql.DataFrame
def foo(df: DataFrame): DataFrame = {
val barRdd = df.rdd.map { row =>
println("Hello World!\n")
row.getAs[String](0)
}
spark.read.json(barRdd).cache
}
val df = sc.parallelize(
Seq("[{'col1': 'hello', 'col2': 'world'}, {'col1': 'foo', 'col2': 'bar'}]")
).toDF("data")
If I run the above in a spark-shell and then run:
val df2 = foo(df)
df2.count
I would expect to see 'Hello World'
printed only once (because the DataFrame
is only fully instantiated when the count happens). However, in fact, 'Hello World'
appears twice, once when the function is initially run and once when the count happens.
If I remove spark.read.json
and replace the line with with barRdd.toDF("col1").cache
, then 'Hello World'
is printed once, as I would expect.
This is causing problems for us when calling an external API with a limited request quota. Why is this happening and what can we do to get around it?
I am on spark 2.4.3 and running this in a spark-shell.