According to this article, there are ten things that you should take into consideration, if you want to improve your cluster performance.
Perhaps it will be a good idea to let Spark scale the number of executors automatically by setting the spark.dynamicAllocation.enabled
parameter to true
. Please note that this configuration also requires to enable parameter spark.shuffle.service.enabled
, refer to documentation.
A second approach on executors is explained here, if you want to try this configuration, another Stackoverflow thread explains how to configure the yarn.scheduler.capacity.resource-calculator
parameter in Dataproc.
EDIT:
I have recreated your scenario with reading many files from GCS bucket and I was able to see that more than one executor was used to proceed this operation.
How?
Using RDD.
Resilient Distributed Datasets (RDDs) are collections of immutable JVM objects that are distributed acress an Apache Spark cluster. Data in an RDD is split into chunks based on a key and then dispersed across all the executor nodes. RDDs are highly resilient, that is, there are able to recover quickly from any issues as the same data chunks are replicated across multiple executor nodes. Thus, even if one executor fails, another will still process the data.
There are two ways to create RDDs: parallelizing an existing collection, or referencing a dataset in an external storage system (GCS bucket). RDDs can be created using SparkContext’s textFile()
/wholeTextFile()
methods.
SparkContext.wholeTextFiles
lets you read a directory containing multiple small files, and returns each of them as (filename, content) pairs. This is in contrast with SparkContext.textFile
, which would return one record per line in each file.
I wrote code in Python and run a pySpark job in Dataproc:
import pyspark
sc = pyspark.SparkContext()
rdd_csv = sc.wholeTextFiles("gs://<BUCKET_NAME>/*.csv")
rdd_csv.collect()
I see, that you are using Scala language. Please, refer to the Spark documentation to get Scala snippets of code. I assume it will be similar to that:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object ReadManyFiles {
def main(args: Array[String]) {
if (args.length != 1) {
throw new IllegalArgumentException(
"1 argument is required: <inputPath> ")
}
val inputPath = args(0)
val sc = new SparkContext(new SparkConf().setAppName("<APP_NAME>"))
val rdd_csv = sc.wholeTextFiles(inputPath)
rdd_csv.collect()
}
}
where inputPath
can be specified when running Dataflow job (or you can hardcoded it in your .scala
file):
gcloud dataproc jobs submit spark \
--cluster=${CLUSTER} \
--class <CLASS> \
--jars gs://${BUCKET_NAME}/<PATH>.jar \
-- gs://${BUCKET_NAME}/input/
I hope it will help you. If you have more questions, please ask.