4

I need clarity about how spark works under the hood when it comes to fetch data external databases. What I understood from spark documentation is that, if I do not mention attributes like "numPartitons","lowerBound" and "upperBound" then read via jdbc is not parallel.In that case what happens? Is data read by 1 particular executor which fetches all the data ? How is parallelism achieved then? Does that executor share the data later to other executors?But I believe executors cannot share data like this.

Please let me know if any one of you have explored this.

Edit to my question - Hi Amit, thanks for your response, but that is not what I am looking for. Let me elaborate:- Refer this - https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Refer below code snippet –

val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)

Output :

Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   1|212267|
|                   3| 56714|
|                   4|124824|
|                   2|232193|
|                   0|627712|
+--------------------+------+

Here I read the table using the custom function db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000) which specifies to create 5 partition based on field bu_id whose lower value is 10 and upper value is 9000. See how spark read data in 5 partitions with 5 parallel connections (as mentioned by spark doc). Now lets read this table without mentioning any of the parameter above –

I simply get the data using another function - val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)

Here I am only passing the spark session(ss), query for getting the data(MultiJoin) and another parameter for exception handling(bs). The o/p is like below – Fetch Starts

== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   0|1253710|

See how data is read into one partition, means spawning only 1 connection. Question remains this partition will be at one machine only and 1 task will be assigned to this. So there is no parallelism here.How does the data gets distributed to other executors then?

By the way this is the spark-submit command I used for both scenarios –

spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar
  • You have configured executors to 1 and that is why there is no parallelism. I think by default it is set to 2. Try removing this explicit configuration you have made and check. – Amit Apr 21 '20 at 14:36

1 Answers1

2

Re:"to fetch data external databases" In your spark application this is generally the part of the code that will be executed on executors. Number of executors can be controlled by passing a spark configuration "num-executors". If you have worked with Spark and RDD/Dataframe, then one of the example from where you would connect to the database is the transformation functions such as map,flatmap,filter etc. These functions when getting executed on executors ( configured by num-executors) will establish the database connection and use it.

One important thing to note here is that, if you work with too many executors then your database server might getting slower and slower and eventually non-responsive. If you give too less of executors then it might cause your spark job taking more time to finish. Hence you have to find an optimum number based on your DB server capacity.

Re:"How is parallelism achieved then? Does that executor share the data later to other executors?"

Parallelism as mentioned above is achieved by configuring number of executors. Configuring number of executors is just one way of increasing parallelism and it is not the only way. Consider a case where you have a smaller size data resulting in fewer partitions, then you will see lesser parallelism. So you need to have good number of partitions (those corresponds to tasks) and then appropriate(definite number depends on the use case) number of executors to execute those tasks in parallel. As long as you can process each record individually it scales, however as soon as you have an action that would cause a shuffle you would see statistics regarding tasks and executors in action. Spark will try to best distribute the data so that it can work at optimum level.

Please refer https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-1/ and subsequent parts to understand more about the internals.

Amit
  • 1,111
  • 1
  • 8
  • 14
  • Hi Amit,I have edited my question and given more details. Hopefully it will give explain what exactly I am trying to ask. – Sukanta Nath Apr 21 '20 at 06:18