1

I'm using pyspark to query from a collection of parquet files stored on hdfs. However, it seems that the query response time is faster the second time it runs. Below are the screenshots captured from spark UI, notice in query 1 (i.e., the second time), the total duration time is halved and the parquet scan time is nearly halved too.

There is a similar question (Why is execution time of spark sql query different between first time and second time of execution?), where they mentioned the shuffle file reuse in exchange phrase. However, in my scenario exchange only take hundred of ms and I think it's not the cause here.

Another similar post (Why does a Spark query run faster when it's executed a second time?) mentioend the saving of IO initialization. I use pyarrow to do a simple experiment (as shown below), we could save about 0.5s for each file. However, in our scenario, there are 68 files, shouldn't it save more? Does it mean we only need to initialize the IO once for all 68 files? But in this case why we could save more than 0.5s (i.e., the parquet scan time drop from 4.4 to 2.1)?

fs = pa.hdfs.connect()
fw1 = fs.open(save_path, 'rb') # about 0.5s
fw2 = fs.open(save_path, 'rb') # about 0.01s

I've checked the following cache mechanism of spark and hdfs:

  1. dataframe cache (Un-persisting all dataframes in (py)spark), which do not make any difference
  2. parquet metadata cache (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#metadata-refreshing), but I do not even create a table before, how could I refresh table meta (spark.catalog.listTables on the only database return [])
  3. hdfs Centralized Cache Management (https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html), however, the hdfs -cacheadmin -listPools return noting

So, none of the above work for me.. And the most important, I need a stable query response time, how could I disable the optimization whatever it is? Thanks.

this is the first time I run the query this is the second time I run the query

cloudray
  • 41
  • 6
  • One thing you haven't written about is spark doesn't have to figure out where the data is in subsequent runs also if session isn't terminated and garbage collector hasn't ran spark will just check if the data has changed otherwise will take the previous file. – Equinox Nov 04 '20 at 10:35
  • any difference under under 5s seems very harsh tbh. You can may be just select the upper limit of the timing for "stable query response time". – Equinox Nov 04 '20 at 10:36
  • @venky__ I think spark may not cache all the files, as shown in the screenshot, after parquet scan, it read 21.8G files and output 6M rows, even for output cache it should occupy 1GB storage in our schema, but my physical storage only increases slightly (like 0.1G). In addition, the OP in the first link mention parquet is not cachable (but I'm not sure whether it's true or not, I don't find any official doc mention it). – cloudray Nov 04 '20 at 11:51

0 Answers0