1

I am trying to read data from Postgres table using Spark. Initially I was reading the data on the single thread without using lowerBound, upperBound, partitionColumn and numPartitions. The data that I'm reading is huge, around 120 Million records. So I decided to read the data in parallel using partitionColumn. I am able to read the data but it is taking more time to read it by 12 parallel threads than by a single thread. I am unable to figure out how can I see the 12 SQL queries that gets generated to fetch the data in parallel for each partition.

The code that I am using is:

val query = s"(select * from db.testtable) as testquery" 
val df = spark.read
    .format("jdbc")
    .option("url", jdbcurl)
    .option("dbtable", query)
    .option("partitionColumn","transactionbegin")
    .option("numPartitions",12) 
    .option("driver", "org.postgresql.Driver")
    .option("fetchsize", 50000)  
    .option("user","user")
    .option("password", "password")
    .option("lowerBound","2019-01-01 00:00:00")
    .option("upperBound","2019-12-31 23:59:00")
    .load
df.count()

Where and how can I see the 12 parallel queries that are getting created to read the data in parallel on each thread?
I am able to see that 12 tasks are created in the Spark UI but not able to find a way to see what separate 12 queries are generated to fetch data in parallel from the Postgres table.

Is there any way I can push the filter down so that it reads only this year worth of data, in this case 2019.

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35

2 Answers2

2

The SQL statement is printed using "info" log level, see here. You need to change Spark's log level to "info" to see the SQL. Additionally it printed the where condition alone too as here. You can also view the SQL in your Postgresql database using pg_stat_statements view which requires a separate plugin to be installed. There is a way to log the SQLs and see them as mentioned here.

I suspect the parallelism is slow for you because there is no index on the "transactionbegin" column of your table. The partitionColumn must be indexed otherwise it will scan the entire table in all those parallel sessions which will choke.

Salim
  • 2,046
  • 12
  • 13
  • Thanks for the Answer @Salim. Actually I figured out some of them myself related to pg_stat_statements but still what you have mentioned is similar to what I was looking to get the information for. I will definitely try it out. – Nikunj Kakadiya Jan 20 '21 at 04:41
0

It's not exactly multiple queries, but it will actually show the plan of execution that Spark has optimized based on your queries. It may not be perfect depending on stages you have to execute.

You can write your dag in the form of DataFrame and before actually calling an action, you can use explain() method on it. Reading it can be challenging, but it's upside down. Source is on the bottom while reading this. It may seem little bit unusual if you try to read, so start with basic transformations and go step by step if you're reading first time.

Piyush Patel
  • 1,646
  • 1
  • 14
  • 26
  • what explain would give. it just specifies the plan but does not give more information about queries. I wanted to know why it is taking more time when reading in parallel and taking less time when read on single thread. – Nikunj Kakadiya Dec 14 '20 at 12:40
  • I dont know what you mean by more information and also not sure about why it can take longer when reading in parallel. It also depends on the volume of the data. If the data is very small for learning purposes, it can happen that single thread can be faster. Also, in plan if you see exchange operation, that could be a reason for such latency. You can also see by which column it partitions the data in that plan. I guess it will be RoundRobin which means it's basically random. – Piyush Patel Dec 14 '20 at 16:37