3

spark is lazy right? so what does load() do?

start = timeit.default_timer()

 df = sqlContext.read.option(
     "es.resource", indexes
 ).format("org.elasticsearch.spark.sql")
 end = timeit.default_timer()

 print('without load: ', end - start) # almost instant
 start = timeit.default_timer()

 df = df.load()
 end = timeit.default_timer()
 print('load: ', end - start) # takes 1sec

 start = timeit.default_timer()

 df.show()
 end = timeit.default_timer()
 print('show: ', end - start) # takes 4 sec

If show() is the only action, I would guess load won't take much time as 1sec. So I'm concluding load() is an action (as opposed to transformation in spark)

Does load actually load whole data into memory? I don't think so, but then what does it do?

I've searched and looked at the doc https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html but it doesn't help..

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
eugene
  • 39,839
  • 68
  • 255
  • 489
  • 2
    load will just give a pointer to data locations which will create a dataframe. sparks rdd model is lazy unless until you do an action on rdd/dataframe/dataset it wont load data completely in to memory. if you understand sparks rdd lineage you will get to know about this. – Ram Ghadiyaram Jun 29 '19 at 16:21
  • ok... why does it take whole 1 sec to `load()` if it is lazy ? – eugene Jun 29 '19 at 23:57
  • Due to some other reasons. Might be but it's not an action. – Ram Ghadiyaram Jun 30 '19 at 06:23

2 Answers2

4

tl;dr load() is a DataFrameReader api(org.apache.spark.sql.DataFrameReader#load) as seen from the below code, that returns a DataFrame, on top which Spark transformations can be applied.

/**
   * Loads input in as a `DataFrame`, for data sources that support multiple paths.
   * Only works if the source is a HadoopFsRelationProvider.
   *
   * @since 1.6.0
   */
  @scala.annotation.varargs
  def load(paths: String*): DataFrame

One needs to create a DataFrame to perform a transformation.
To create a dataframe from a path(HDFS, S3 etc), users can use spark.read.format("<format>").load().(There are datasource specific API as well that loads the files automatically like spark.read.parquet(<path>))

Why does it take whole 1 second?

In file based sources, this time can be attributed to listing of files. In HDFS these listing is not expensive, where as in case of cloud storage like S3, this listing is very expensive and takes time propotionate to number of files.
In your case the datasource used is elastic-search, The time can be attributed to connection establishment, collecting metadata to perform a distributed scan etc which depends on Elastic serach connector impl. We can enable the debug logs and check for more information. If elastic search has way to log the request it received, we could check the elastic search logs for the requests that were made after the time load() was fired.

DaRkMaN
  • 1,014
  • 6
  • 9
  • 1
    I cannot find load as a transformation in databricks documentation. – thebluephantom Jul 02 '19 at 05:59
  • 2
    load is not a transfrmation, it is an DataFrameReader api, to create a dataframe from datasources. You can apply transformations, once u `load` – DaRkMaN Jul 02 '19 at 06:01
  • 1
    My fault, it is not a transformation, it is an api on top of DataframeReader `org.apache.spark.sql.DataFrameReader#load`.. @thebluephantom, thanks for letting know. I have fixed the answer now – DaRkMaN Jul 02 '19 at 06:30
1

It does nothing. It is just part of the sqlContext.read as a parameter, that you did not set directly on the read. read allows data formats to be specified. The DF or underlying RDD is evaluated lazily as they say.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83