0

I have a 30 TB file in HDFS. Now, I am reading that file in spark. But after reading the file where that data will store? Suppose:

val customerDF = spark.read.format("csv").load("/path/to/file.csv")

Where will customerDF be stored?

philantrovert
  • 9,904
  • 3
  • 37
  • 61
swcraft
  • 1
  • 1
  • 7

2 Answers2

0

It won't be stored anywhere until you need to process it, this is called lazy evaluation. Spark will generate a graph (DAG) with all the transformations that it needs to perform, and then it need to persist the dataframe or perform an action over it, it will be loaded to memory and processed.

You algo have the persist command on a dataframe to make it persistent, there you can select a different StorageLevel

df.persist(cachePolicy)

More info about Storage Level hereStorages Level
enter image description here

philantrovert
  • 9,904
  • 3
  • 37
  • 61
SCouto
  • 7,808
  • 5
  • 32
  • 49
  • .persist will not force a DF to be evaluated either. You need to call an action such as count at which point the DF will be evaluated and cached. 30TB is a hell of a lot to cache though.... – Terry Dactyl Oct 19 '18 at 10:13
  • @TerryDactyl I don't think `df.count` is enough to cache the entire dataframe, you would need `df.rdd.count` – Raphael Roth Oct 19 '18 at 11:25
0

Based on your example, the file will not be read yet and nothing is stored anywhere at that point in time. Spark is lazy, it only reads things when an action like: write, count, collect, etc is called. If you do not use any sort of caching of the dataframes (via cache or persist), than what will be read and how much of it will be used from the file will depend on the following operations that caused projections: select, groupBy, join, etc. If you use shuffle operations (groupBy, window functions, joins), than projected data will be written to tmp folders on the worker/data nodes, to facilitate communication between the stages.

Example:

val customerDF = spark.read.format("").load("/path") //Files are not read yet
val customerStats = customerDF.groupBy("customer_id").count() //Files are not read yet
customerStats.show(100, false)

In the above example files are read only on show command, only customer_id is extracted and due to count in stage 1 partial counts are stored into SPARK_LOCAL_DIRS and sent to stage 2 which does final rollup and display on the screen of 100 lines.

alexeipab
  • 3,609
  • 14
  • 16
  • Thank you.. got it.. @alexeipab – swcraft Oct 19 '18 at 10:30
  • @swcraft If you liked the answer, could you accept it? – alexeipab Oct 19 '18 at 10:45
  • Wouldn't the DF be evaluated when you call count()? – Terry Dactyl Oct 19 '18 at 10:48
  • 1
    @TerryDactyl in case of customerDF.count -> yes as in this case count is an action on the DataSet. In case customerDF.groupBy("customer_id").count() -> no, as count is called on object returned by *groupBy*, which is *RelationalGroupedDataset*, in this case it is a transformation. – alexeipab Oct 19 '18 at 10:51
  • hope count() is an action.so it would be evaluated @Terry Dactyl . please correct me if any changes.. – swcraft Oct 19 '18 at 10:58
  • @swcraft You need to do `df.rdd.count` to mak sure the entire file is read, see https://stackoverflow.com/questions/42714291/how-to-force-dataframe-evaluation-in-spark – Raphael Roth Oct 19 '18 at 11:27