I have a large 3GB csv file which I converted to parquet and compressed with snappy compression using the built-in pandas to_parquet method and uploaded to s3.
I have a spark workflow which reads in the file using the following code:
case class RawOpenAddress(
lon: Double,
lat: Double,
number: String,
street: String,
hash: String,
unit: Option[String],
city: Option[String],
district: Option[String],
region: Option[String],
postcode: Option[String],
id: Option[String]
)
def readRawOpenAddresses(spark: SparkSession, path: String): Dataset[RawOpenAddress] = {
openAddressFromReader(spark, _.parquet(path))
}
def openAddressFromReader(
spark: SparkSession,
fromReader: DataFrameReader => DataFrame
): Dataset[RawOpenAddress] = {
import spark.implicits._
val reader = spark.read.options(Map("header" -> "true", "quote" -> "\"", "escape" -> "\""))
val addresses = fromReader(reader)
transformOpenAddresses(addresses).as[RawOpenAddress]
}
def main(args: Array[String]): Unit = {
val spark: SparkSession = <instantiate a new SparkSession>
val openAddresses = readRawOpenAddresses(spark, arguments.path + "/combined.parquet.snappy")
val count = openAddresses.mapPartitions(<some_map_function>).count()
I then launch my spark job using spark submit to the cluster using the following configs:
spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.yarn.appMasterEnv.PROFILE=production \
--conf spark.default.parallelism=1000 \
--executor-memory=8G \
--driver-memory=16G
When the job runs, executors start getting added at the initial call to read the file in stage 0, which takes a total of 30 secs to complete, but then in the next stage the executors get removed and the job continues to run indefinitely on one executor. I can explicitly set repartition
on the addresses
variable to force the executors to increase, but this leads to very expensive shuffling that causes my job to become stuck.
Is this due to the file format? Is it possible pandas is somehow saving a parquet file that spark somehow cannot properly split and distribute?