0

I need to read parquet files from AWS s3 which are partitioned by dates and hours. Partition structure looks like this: year/month/date/hour - 2021/10/31/00 The problem is that each of the hour directories contains multiple small parquet files each around 100-200 KB. So when I try to read the data for a day Spark job starts failing with lost executors on reading all of those files. What would be the best approach to this case of reading multiple small files?

Parquet files reading mechanism looks like this:

private def buildDateTime(year: String, month: String, day: String, hour: String): DateTime = {
    new DateTime(year.toInt, month.toInt, day.toInt, hour.toInt, 0)
  }

private def fileObjectsParquet(feedName: String, startDate: DateTime, feedPath: String): List[FSObject] = {
    val fls = listFSObjects(feedPath)
      .filter(fs => fs.path.endsWith(".parquet"))
    fls.filter { fs =>
      fs.path match {
        case dataRegex(year, month, day, hour)
          if (buildDateTime(year, month, day, hour).isEqual(startDate) ||
            buildDateTime(year, month, day, hour).isAfter(startDate.minusHours(1))) => true
        case _ => false
      }
    }
  }

private def readParquetFiles(feedName: String, feedPath: String, startDate: DateTime): DataFrame = {
    fileObjectsParquet(feedName, startDate, feedPath).map { fs =>
      sparkSession
        .read
        .format("parquet")
        .load(fs.path)
    }.reduce(_ union _)
  }

In this example, the list objects method uses the AWS s3 client to get all the files path to reading.

Some of the Spark job configurations:

spark.driver.maxResultSize: 6g
spark.cores.max: '64'
spark.executor.cores: '8
spark.executor.memory: 48g
spark.mesos.executor.memoryOverhead: '5000'
spark.memory.offHeap.enabled: 'true'
spark.memory.offHeap.size: 5g
spark.driver.memory: 5000m

Specific error in the logs looks like this:

ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 135519 ms
Cassie
  • 2,941
  • 8
  • 44
  • 92
  • how does this differ from https://stackoverflow.com/questions/43895728/apache-spark-on-hdfs-read-10k-100k-of-small-files-at-once – stevel Jan 27 '22 at 11:39
  • It's totally different. This question is not about whole text files and HDFS – Cassie Jan 27 '22 at 19:34
  • Are you sure that the files within the directory are not touched by some other process during the execution? Also, I would recommend that you concatenate the files using another process before reading them through spark – Amardeep Flora Jan 28 '22 at 04:57
  • Yes, they are not touched during the process. Can you provide some guidance regarding concatenation? Should I group somehow by hours right after reading or you mean something else? – Cassie Jan 28 '22 at 06:44
  • there is no difference when spark is reading files between s3 and hdfs except that s3 is slower – stevel Jan 28 '22 at 11:38
  • @stevel as a mechanism yes, but I've mentioned specific error I get not that it's just slow – Cassie Jan 28 '22 at 13:33

0 Answers0