I need to read parquet files from AWS s3 which are partitioned by dates and hours. Partition structure looks like this: year/month/date/hour - 2021/10/31/00
The problem is that each of the hour directories contains multiple small parquet files each around 100-200 KB. So when I try to read the data for a day Spark job starts failing with lost executors on reading all of those files. What would be the best approach to this case of reading multiple small files?
Parquet files reading mechanism looks like this:
private def buildDateTime(year: String, month: String, day: String, hour: String): DateTime = {
new DateTime(year.toInt, month.toInt, day.toInt, hour.toInt, 0)
}
private def fileObjectsParquet(feedName: String, startDate: DateTime, feedPath: String): List[FSObject] = {
val fls = listFSObjects(feedPath)
.filter(fs => fs.path.endsWith(".parquet"))
fls.filter { fs =>
fs.path match {
case dataRegex(year, month, day, hour)
if (buildDateTime(year, month, day, hour).isEqual(startDate) ||
buildDateTime(year, month, day, hour).isAfter(startDate.minusHours(1))) => true
case _ => false
}
}
}
private def readParquetFiles(feedName: String, feedPath: String, startDate: DateTime): DataFrame = {
fileObjectsParquet(feedName, startDate, feedPath).map { fs =>
sparkSession
.read
.format("parquet")
.load(fs.path)
}.reduce(_ union _)
}
In this example, the list objects method uses the AWS s3 client to get all the files path to reading.
Some of the Spark job configurations:
spark.driver.maxResultSize: 6g
spark.cores.max: '64'
spark.executor.cores: '8
spark.executor.memory: 48g
spark.mesos.executor.memoryOverhead: '5000'
spark.memory.offHeap.enabled: 'true'
spark.memory.offHeap.size: 5g
spark.driver.memory: 5000m
Specific error in the logs looks like this:
ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 135519 ms