0

For my use case, I am trying to read one big oracle table using spark JDBC. Since, I do not have an integer type column in my table, I am using rownum as paritionColumn.

Here is what my spark query looks like: (For testing I am using a table with only 22000 rows.)

val df = spark.read.jdbc(jdbcUrl = url, table = select * from table1, 
                         columnName= "rownum", lowerBound = 0, upperBound = 22000, 
                         numPartitions = 3, connectionProperties = oracleProperties)

Ideally, it should return me 3 partitions with almost 7000 rows in each. But when I ran the count on each partitions of dataframe I can see that only one partition has rows while others are 0.

df.rdd.mapPartitionsWithIndex{case(i, rows) => Iterator((i, rows.size))}.toDF().show()

output:

+---+----+
| _1| _2 |    
+---+----+    
| 0 |7332|    
| 1 | 0  |    
| 2 | 0  |    
+---+----+

Can you please suggest why its only returning rows in one partition?

My source is a Oracle Database. Using oracle jdbc driver oracle.jdbc.driver.OracleDriver jar --> ojdbc7.jar

reference thread: http://apache-spark-user-list.1001560.n3.nabble.com/Question-on-using-pseudo-columns-in-spark-jdbc-options-td30154.html

philantrovert
  • 9,904
  • 3
  • 37
  • 61
Devang
  • 27
  • 1
  • 5
  • Read [Pseudocolumn in Spark JDBC](https://stackoverflow.com/q/47615975/6910411) first. And show us actual code please. With this query (ignoring missing quotes) a whole code would fail on runtime. – zero323 Apr 12 '18 at 10:12

1 Answers1

1

After some googling around I was able to make it work. I made some

I was trying to read from an Oracle Database using Spark jdbc. I was able to use Pseudocolumn ROWNUM of oracle to parallelize the read from spark with some hacks. Trick is that you have to alias the ROWNUM column and then use that alias column.

I wanted to query entire "table1" and create multiple partitions in spark for that table.

val df = spark.read.jdbc(jdbcUrl= url, table = "select * from table1", 
                         columnName="ROWNUM", lowerBound = 0, upperBound = 22000,
                          numPartitions = 3, connectionProperties = oracleProperties)

In order to partition on Pseudocolumn modify the query like following:

val df = spark.read.jdbc(jdbcUrl= url, table = "(select t1.*, ROWNUM as num_rows from (select * from table1) t1) oracle_table1", 
                         columnName="num_rows", lowerBound = 0, upperBound = 22000,
                          numPartitions = 3, connectionProperties = oracleProperties)

This way we are actually making the psuedocolumn as an actual column and use it for partitioning.

Devang
  • 27
  • 1
  • 5