0

It seems that using spark.read.json within a function causes that function to run twice - once when it is cached, and then again when the result is counted.

Example code:

import org.apache.spark.sql.DataFrame

def foo(df: DataFrame): DataFrame = {
  val barRdd = df.rdd.map { row =>
    println("Hello World!\n")
    row.getAs[String](0)
  }
  spark.read.json(barRdd).cache
}

val df = sc.parallelize(
    Seq("[{'col1': 'hello', 'col2': 'world'}, {'col1': 'foo', 'col2': 'bar'}]")
).toDF("data")

If I run the above in a spark-shell and then run:

val df2 = foo(df)
df2.count

I would expect to see 'Hello World' printed only once (because the DataFrame is only fully instantiated when the count happens). However, in fact, 'Hello World' appears twice, once when the function is initially run and once when the count happens.

If I remove spark.read.json and replace the line with with barRdd.toDF("col1").cache, then 'Hello World' is printed once, as I would expect.

This is causing problems for us when calling an external API with a limited request quota. Why is this happening and what can we do to get around it?

I am on spark 2.4.3 and running this in a spark-shell.

Oli
  • 9,766
  • 5
  • 25
  • 46
Megdatronica
  • 75
  • 1
  • 4
  • 1
    Does this answer your question? [Why does SparkSession execute twice for one action?](https://stackoverflow.com/questions/38924623/why-does-sparksession-execute-twice-for-one-action) – user10938362 Feb 06 '20 at 14:02
  • @user10938362 Thanks, this is the answer I needed! Couldn't find this in the search earlier. – Megdatronica Feb 06 '20 at 14:43

0 Answers0