2

I am having several csv files compressed within a google bucket, their are grouped in folders by hour, meaning another application saves several of those files in folders having the hour in their name.

I am basically then having a Spark application reading all of those files - thousands of them - with a simple code like the one below:

sparkSession.read
      .format("csv")
      .option("sep", "\t")
      .option("header", false)
      .option("inferSchema", false)
      .csv(path))

It takes more than an hour to read, is that because they are compressed?

I also noted in the Spark UI I only have one executor, never more than one. Can't I use several executors to read those files in parallel and do the processing faster? How to do that? I am bascailly trying to create a temp view with the files for further SQL statements from Spark.

I am running in Dataproc with the default Yarn configuration.

Filipe Miranda
  • 914
  • 1
  • 20
  • 33

2 Answers2

1

According to this article, there are ten things that you should take into consideration, if you want to improve your cluster performance.

Perhaps it will be a good idea to let Spark scale the number of executors automatically by setting the spark.dynamicAllocation.enabled parameter to true. Please note that this configuration also requires to enable parameter spark.shuffle.service.enabled, refer to documentation.

A second approach on executors is explained here, if you want to try this configuration, another Stackoverflow thread explains how to configure the yarn.scheduler.capacity.resource-calculator parameter in Dataproc.

EDIT:

I have recreated your scenario with reading many files from GCS bucket and I was able to see that more than one executor was used to proceed this operation.

How?

Using RDD.

Resilient Distributed Datasets (RDDs) are collections of immutable JVM objects that are distributed acress an Apache Spark cluster. Data in an RDD is split into chunks based on a key and then dispersed across all the executor nodes. RDDs are highly resilient, that is, there are able to recover quickly from any issues as the same data chunks are replicated across multiple executor nodes. Thus, even if one executor fails, another will still process the data.

There are two ways to create RDDs: parallelizing an existing collection, or referencing a dataset in an external storage system (GCS bucket). RDDs can be created using SparkContext’s textFile()/wholeTextFile() methods.

SparkContext.wholeTextFiles lets you read a directory containing multiple small files, and returns each of them as (filename, content) pairs. This is in contrast with SparkContext.textFile, which would return one record per line in each file.

I wrote code in Python and run a pySpark job in Dataproc:

import pyspark

sc = pyspark.SparkContext()
rdd_csv = sc.wholeTextFiles("gs://<BUCKET_NAME>/*.csv")
rdd_csv.collect()

I see, that you are using Scala language. Please, refer to the Spark documentation to get Scala snippets of code. I assume it will be similar to that:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object ReadManyFiles {
  def main(args: Array[String]) {
    if (args.length != 1) {
      throw new IllegalArgumentException(
          "1 argument is required: <inputPath> ")
    }

    val inputPath = args(0)

    val sc = new SparkContext(new SparkConf().setAppName("<APP_NAME>"))
    val rdd_csv = sc.wholeTextFiles(inputPath)
    rdd_csv.collect()
  }
}

where inputPath can be specified when running Dataflow job (or you can hardcoded it in your .scala file):

gcloud dataproc jobs submit spark \ 
    --cluster=${CLUSTER} \ 
    --class <CLASS> \ 
    --jars gs://${BUCKET_NAME}/<PATH>.jar \ 
    -- gs://${BUCKET_NAME}/input/

I hope it will help you. If you have more questions, please ask.

aga
  • 3,790
  • 3
  • 11
  • 18
  • Those properties are both set to true already there. Thanks for the article, I have read it before, but still could not link my situation to those points. – Filipe Miranda Apr 14 '20 at 14:12
  • Can you tell what kind of machines are you using (by describing the specification)? – aga Apr 15 '20 at 08:16
  • The master node is n1-highmem-4, then I have two workers, also n1-highmem-4. – Filipe Miranda Apr 15 '20 at 11:37
  • Are you sure that you have enough resources to run described job, I mean primary disk size and type (SSD - if you want high performance and lower latency)? – aga Apr 21 '20 at 15:16
0

The resources should have been scaled to your app dynamically already, usually you dont need to explicitly set executor numbers.

In your case, depends on how big your dataset is, it could be cluster size or VMs is too small to handle the increased input data size, maybe try to increase number of VMs/nodes in your cluster, or use VMs with more RAM.

Henry Gong
  • 306
  • 1
  • 3