6

I am running below code in spark using Java.

Code

Test.java

package com.sample;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;

import com.addition.AddTwoNumbers;

public class Test{

    private static final String APP_NAME = "Test";
    private static final String LOCAL = "local";
    private static final String MASTER_IP = "spark://10.180.181.26:7077";

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName(APP_NAME).setMaster(MASTER_IP);
        String connection = "jdbc:oracle:thin:test/test@//xyz00aie.in.oracle.com:1521/PDX2600N";
        // Create Spark Context
        SparkContext context = new SparkContext(conf);
        // Create Spark Session

        SparkSession sparkSession = new SparkSession(context);
        long startTime = System.currentTimeMillis();
        System.out.println("Start time is : " + startTime);
        Dataset<Row> txnDf = sparkSession.read().format("jdbc").option("url", connection)
                .option("dbtable", "CI_TXN_DETAIL_STG_100M").load();

        System.out.println(txnDf.filter((txnDf.col("TXN_DETAIL_ID").gt(new Integer(1286001510)))
                .and(txnDf.col("TXN_DETAIL_ID").lt(new Integer(1303001510)))).count());


        sparkSession.stop();
    }

}

I am simply trying to find count of range of rows. Range is 20 Million.

Below is snapshot of spark dashboard

enter image description here

Here I can see Active task only on one Executor. I have total of 10 Executors running.

My question

Why my application is showing active task on one Executor instead of distributing it across all 10 executors?

Below is my spark-submit command :

./spark-submit --class com.sample.Test--conf spark.sql.shuffle.partitions=5001 --conf spark.yarn.executor.memoryOverhead=11264 --executor-memory=91GB --conf spark.yarn.driver.memoryOverhead=11264 --driver-memory=91G --executor-cores=17  --driver-cores=17 --conf spark.default.parallelism=306 --jars /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar,/scratch/rmbbuild/spark_ormb/drools-jars/Addition-1.0.jar --driver-class-path /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar --master spark://10.180.181.26:7077 "/scratch/rmbbuild/spark_ormb/POC-jar/Test-0.0.1-SNAPSHOT.jar" > /scratch/rmbbuild/spark_ormb/POC-jar/logs/log18.txt
A Learner
  • 157
  • 1
  • 5
  • 16

1 Answers1

4

Looks like all data are read in one partition, and goes to one executor. For use more executors, more partitions have to be created. Parameter "numPartitions" can be used, along with partition column, as specified here:

https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#jdbc-reads

Also this link can be useful:

Spark: Difference between numPartitions in read.jdbc(..numPartitions..) and repartition(..numPartitions..)

pasha701
  • 6,831
  • 1
  • 15
  • 22
  • Thanks , my table has 100 Million rows i am using Dataset txnDf = sparkSession.read().format("jdbc").option("url", connection).option("partitionColumn", "TXN_DETAIL_ID").option("numPartitions", 1000).option("lowerBound", 1L).option("upperBound", 100000L) .option("dbtable", "CI_TXN_DETAIL_STG_100M").load(); as per your suggestion, do you think it is optimal? – A Learner Nov 22 '18 at 11:28
  • Depends on several factors - how many connections Oracle server can support at once; how many executors will be used; how many processor cores for one executor - 2-4 partitions can be used for one core. Guess, you can experiment on this. – pasha701 Nov 22 '18 at 12:07
  • Sure i will try I have 10 Executors , each executor is running on 17 cores and each executor has 91 GB memory, One thing i wanted to understand is does this upper bound means total records to be read or mazimum size of a partition. – A Learner Nov 22 '18 at 12:15
  • Saved me some incredible amount of time. – Rafael Barros Aug 27 '20 at 00:58