0

I have an example case:

val df = ... // read from HDFS / file / ...
println(df.count)
val newDf = df.select // Other transformations... Keep processing the original df.

My question is, do I need to cache the original dataframe if I'm counting it in the middle of my process? I mean, I count the df, and then keep transforming it and processing it.
Does the .count means the df will be computed twice?

pault
  • 41,343
  • 15
  • 107
  • 149
J. Doe
  • 11
  • 1
  • 2
  • 1
    Yes, the DF will be evaluated twice. If you are going to re-use a DF then cache/persist it. If you do not cache it then it will be re-evaluated each time you use it. There is however a chance that it will be evicted from the cache before then... – Terry Dactyl Nov 01 '18 at 16:21
  • Short answer, yes. An Spark Dataframe is at the end just an RDD, and as any RDD they are lazy. Therefore, they get computed _(evaluating the DAG)_ every time you perform an action over them _(like count)_. Thus, I would call cache before counting, to ensure the same Dataframe is reused for the following computations. – Luis Miguel Mejía Suárez Nov 01 '18 at 16:23
  • @LuisMiguelMejíaSuárez+ Terry: Thank you. Is caching + unchaching a DF is expansive? – J. Doe Nov 01 '18 at 16:32
  • @J.Doe it may be, but probably less expensive than recomputing everything every time. Also, unless you really need to be sure the memory is free to continue, you can mark the uncaching as non-blocking and Spark will do it without stopping your application. – Luis Miguel Mejía Suárez Nov 01 '18 at 16:39
  • @LuisMiguelMejíaSuárez I see. Can you explain more about the non-blocking function? Thank you in advance. – J. Doe Nov 01 '18 at 16:47
  • @J.Doe Look at the [**unpersist** _scaladoc_](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@unpersist(blocking:Boolean):Dataset.this.type), you can set the operation as blocking or non-blocking. If blocking spark will pause its execution until all blocks are released, if non-blocking spark will continue the execution and the blocks will be released latter. In any case if spark gets out of memory it will reclaim it. – Luis Miguel Mejía Suárez Nov 01 '18 at 17:54

1 Answers1

4

Without knowing more about your use case and resources it is hard to give you a definitive answer. However it is most likely negative, despite the fact that Spark will access the source twice.

Overall there are multiple factors to consider:

  • How much data will be loaded in the first pass. With efficient on-disk input format Spark (like Parquet) Spark has no need to fully load dataset at all. This also applies to a number of other input formats, including but not limited to JDBC reader.
  • Cost of caching the data with Dataset API is quite high (that's why default storage mode is MEMORY_AND_DISK) and can easily exceed the cost of loading the data.
  • Impact on the subsequent processing. In general caching will interfere with partition pruning, predicate pushdown and projections (see Any performance issues forcing eager evaluation using count in spark?).

So...

Does the .count means the df will be computed twice?

To some extent depending on the input format. Plain text formats (like JSON or CSV) will require more repeated work than binary sources.

do I need to cache the original dataframe

Typically not, unless you know that cost of fetching data from the storage justifies disadvantages listed above.

Making the final decision requires a good understanding of your pipeline (primarily how data will be processed downstream and how caching affects this) and metrics you want to optimize (latency, total execution time, monetary cost of running required resources).

You should also consider alternatives to count, like processing InputMetrics or using Accumulators.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you for your answer. My original DF will not always be read from storage - it can be created from transforming other DF. My original cause for using "count" on the DF, is, that I want to force spark to create a job from this transformation. Do you have idea for a better way for creating a job (consisting of stages) without count? – J. Doe Nov 01 '18 at 17:55
  • There is no universal answer (as you see even when you restrict possible scenarios to very specific use case there are different considerations), however forcing execution is usually a bad idea / sign that one misunderstood how Spark works (as per [answer](https://stackoverflow.com/a/50380109/6910411) by [user8371915](https://stackoverflow.com/users/8371915/user8371915)) or sign of a [XY problem](https://meta.stackexchange.com/questions/66377/what-is-the-xy-problem). If you want to eagerly cache (useful in some categories of semi-interactive apps) use `CACHE TABLE` in SQL. – zero323 Nov 02 '18 at 21:31