I'm using pyspark to query from a collection of parquet files stored on hdfs. However, it seems that the query response time is faster the second time it runs. Below are the screenshots captured from spark UI, notice in query 1 (i.e., the second time), the total duration time is halved and the parquet scan time is nearly halved too.
There is a similar question (Why is execution time of spark sql query different between first time and second time of execution?), where they mentioned the shuffle file reuse in exchange phrase. However, in my scenario exchange only take hundred of ms and I think it's not the cause here.
Another similar post (Why does a Spark query run faster when it's executed a second time?) mentioend the saving of IO initialization. I use pyarrow to do a simple experiment (as shown below), we could save about 0.5s for each file. However, in our scenario, there are 68 files, shouldn't it save more? Does it mean we only need to initialize the IO once for all 68 files? But in this case why we could save more than 0.5s (i.e., the parquet scan time drop from 4.4 to 2.1)?
fs = pa.hdfs.connect()
fw1 = fs.open(save_path, 'rb') # about 0.5s
fw2 = fs.open(save_path, 'rb') # about 0.01s
I've checked the following cache mechanism of spark and hdfs:
- dataframe cache (Un-persisting all dataframes in (py)spark), which do not make any difference
- parquet metadata cache (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#metadata-refreshing), but I do not even create a table before, how could I refresh table meta (spark.catalog.listTables on the only database return [])
- hdfs Centralized Cache Management (https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html), however, the hdfs -cacheadmin -listPools return noting
So, none of the above work for me.. And the most important, I need a stable query response time, how could I disable the optimization whatever it is? Thanks.