I need clarity about how spark works under the hood when it comes to fetch data external databases. What I understood from spark documentation is that, if I do not mention attributes like "numPartitons","lowerBound" and "upperBound" then read via jdbc is not parallel.In that case what happens? Is data read by 1 particular executor which fetches all the data ? How is parallelism achieved then? Does that executor share the data later to other executors?But I believe executors cannot share data like this.
Please let me know if any one of you have explored this.
Edit to my question - Hi Amit, thanks for your response, but that is not what I am looking for. Let me elaborate:- Refer this - https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
Refer below code snippet –
val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)
Output :
Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
| 1|212267|
| 3| 56714|
| 4|124824|
| 2|232193|
| 0|627712|
+--------------------+------+
Here I read the table using the custom function db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000) which specifies to create 5 partition based on field bu_id whose lower value is 10 and upper value is 9000. See how spark read data in 5 partitions with 5 parallel connections (as mentioned by spark doc). Now lets read this table without mentioning any of the parameter above –
I simply get the data using another function - val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)
Here I am only passing the spark session(ss), query for getting the data(MultiJoin) and another parameter for exception handling(bs). The o/p is like below – Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 0|1253710|
See how data is read into one partition, means spawning only 1 connection. Question remains this partition will be at one machine only and 1 task will be assigned to this. So there is no parallelism here.How does the data gets distributed to other executors then?
By the way this is the spark-submit command I used for both scenarios –
spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar