2

I have written a Scala program for loading data from an MS SQL Server and writing it to BigQuery. I execute this in a Spark cluster (Google Dataproc). My issue is that even though I have a cluster with 64 cores, and I specify the executor parameters when running the job, and I partition the data I'm reading, Spark only reads data from a single executor. When I start the job I can see all the executors firing up and on the SQL Server I can see connections from all 4 workers, but within a minute, they all shut down again, leaving only one, which then runs for over an hour before finishing.

The data set is 65 million records, and I'm trying to partition it into 60 partitions.

This is my cluster:

    gcloud dataproc clusters create my-cluster \
  --properties dataproc:dataproc.conscrypt.provider.enable=false,spark:spark.executor.userClassPathFirst=true,spark:spark.driver.userClassPathFirst=true \
  --region europe-north1 \
  --subnet my-subnet \
  --master-machine-type n1-standard-4 \
  --worker-machine-type n1-highmem-16 \
  --master-boot-disk-size 15GB \
  --worker-boot-disk-size 500GB \
  --image-version 1.4 \
  --master-boot-disk-type=pd-ssd \
  --worker-boot-disk-type=pd-ssd \
  --num-worker-local-ssds=1 \
  --num-workers=4

This is how I run the job:

    gcloud dataproc jobs submit spark \
--cluster my-cluster \
--region europe-north1 \
--jars gs://mybucket/mycode.jar,gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar \
--class Main \
--properties \
spark.executor.memory=19g, \
spark.executor.cores=4, \
spark.executor.instances=11 \
-- yarn

This is the code I use to read the data:

val data = sqlQuery(ss,
                    serverName,
                    portNumber,
                    databaseName,
                    userName,
                    password,
                    tableName)

writeToBigQuery(
      bqConfig,
      data,
      dataSetName,
      replaceInvalidCharactersInTableName(r.getAs[String]("TableName")),
      "WRITE_TRUNCATE")

def sqlQuery(ss: SparkSession,
             hostName: String,
             port: String,
             databaseName: String,
             user: String,
             password: String,
             query: String): DataFrame = {
  val result = ss.read.format("jdbc")
    .option("url", getJdbcUrl(hostName, port, databaseName))
    .option("dbtable", query)
    .option("user", user)
    .option("password", password)
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .option("numPartitions", 60)
    .option("partitionColumn", "entityid")
    .option("lowerBound", 1)
    .option("upperBound", 198012).load()

  result
}

def writeToBigQuery(bqConf: Configuration,
                    df: DataFrame,
                    dataset: String,
                    table: String,
                    writeDisposition: String = "WRITE_APPEND"): Unit = {

  //Convert illegal characters in column names
  var legalColumnNamesDf = df
  for (col <- df.columns) {
    legalColumnNamesDf = legalColumnNamesDf.withColumnRenamed(
      col,
      col
        .replaceAll("-", "_")
        .replaceAll("\\s", "_")
        .replaceAll("æ", "ae")
        .replaceAll("ø", "oe")
        .replaceAll("å", "aa")
        .replaceAll("Æ", "AE")
        .replaceAll("Ø", "OE")
        .replaceAll("Å", "AA")
    )
  }

  val outputGcsPath = s"gs://$bucket/" + HardcodedValues.SparkTempFolderRelativePath + UUID
    .randomUUID()
    .toString
  val outputTableId = s"$projectId:$dataset.$table"

  //Apply explicit schema since to avoid creativity of BigQuery auto config
  val uniqBqConf = new Configuration(bqConf)

  BigQueryOutputConfiguration.configure(
    uniqBqConf,
    outputTableId,
    s"""{"fields":${Json(DefaultFormats).write(
      legalColumnNamesDf.schema.map(
        f =>
          Map(
            "name" -> f.name,
            "type" -> f.dataType.sql
              .replace("BIGINT", "INT")
              .replace("INT", "INT64")
              .replaceAll("DECIMAL\\(\\d+,\\d+\\)", "NUMERIC"),
            "mode" -> (if (f.nullable) "NULLABLE"
                       else "REQUIRED")
        ))
    )} }""",
    outputGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_, _]]
  )

  uniqBqConf.set(
    BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
    if (Array("WRITE_APPEND", "WRITE_TRUNCATE") contains writeDisposition)
      writeDisposition
    else "WRITE_APPEND"
  )

  //Save to BigQuery
  legalColumnNamesDf.rdd
    .map(
      row =>
        (null,
         Json(DefaultFormats).write(
           ListMap(row.schema.fieldNames.toSeq.zip(row.toSeq): _*))))
    .saveAsNewAPIHadoopDataset(uniqBqConf)

}

Any ideas would be appreciated.

Bjoern
  • 433
  • 3
  • 16

2 Answers2

1

If you look at the Spark UI, is there a lot of skew where one task is reading most of the data? My guess is that you're picking a poor partition key, so most of the data ends up in one partition.

This stackoverflow answer provides a detailed explanation: What is the meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?. I think your entity ids would need to be evenly distributed between 1 and 198012 for it to be a good column to partition on.

Karthik Palaniappan
  • 1,373
  • 8
  • 11
  • 1
    Thanks for the suggestion, and while it is not a unique key, it is very evenly distributed. So hardly any skew. – Bjoern Aug 02 '19 at 08:42
  • 1
    Gotcha -- are you seeing skew when you look at the Spark UI though? (You can create a cluster with `--enable-component-gateway` to jump to the Spark UI from the "Web interfaces" tab -> YARN UI -> Spark ApplicationMaster in the GCP console). – Karthik Palaniappan Aug 02 '19 at 16:57
  • Thanks for your efforts. I tried stopping to tell spark how many executors to run and just do dynamic allocation, and now it works. I'm still learning Spark, so maybe I missed something else, but in any case, my problem is solved now. Thanks again. – Bjoern Aug 03 '19 at 19:24
1

In the end I tried stopping to tell spark how many executors to run and just do dynamic allocation, and now it works. I asked for 24 partitions and it dynamically allocates 8 executors with 3 cores each, running 24 tasks in parallel.

Bjoern
  • 433
  • 3
  • 16