1

I am doing a data cleaning with very simple logic.

    val inputData= spark.read.parquet(inputDataPath)
    val viewMiddleTable = sdk70000DF.where($"type" === "0").select($"field1", $"field2", $field3)
      .groupBy($"field1", $"field2", $field3)
      .agg(count(lit(1)))

Read parquet data from hdfs, filter, select target fields and group by all fields, then count.

When I check the UI, below things happended.

Input 81.2 GiB Shuffle Write 645.7 GiB

enter image description here

How can the shuffle write data so much bigger than the originally read data? It should be just a little expanded in this situation. Can anyone explain this? Thanks.

Machi
  • 403
  • 2
  • 14
  • 1
    This is because parquet is encoded and compressed. The shuffle occur after the data is inflated – Lior Chaga Jun 24 '21 at 05:53
  • I referred this answer https://stackoverflow.com/questions/48847660/spark-parquet-snappy-overall-compression-ratio-loses-after-spark-shuffles-d?rq=1 and made some optimization as below: select... .repartition(1000, $"field1") .sortWithinPartitions() .group... New input and shuffle write data is:input 40.2Gib,shuffle write 77.3Gib,shuffle write/input is always about 2. Much better than the unoptimized , which is 40.7 vs. 334.9, with a ratio of 8. The shuffle data should still be parquet+snappy, but how data organized may affected the shuffle write data size. Maybe. – Machi Jun 24 '21 at 08:48

1 Answers1

1

Deserialization of parquet files that are columnar and compressed, means they expand in size in memory or on disk(spill). It's a well-known aspect of Spark - as you candidly observe yourself now.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83