6

I need to Read Data from DB2 Database using Spark SQL (As Sqoop is not present)

I know about this function which will read data in parellel by opening multiple connections

jdbc(url: String, table: String, columnName: String, lowerBound: Long,upperBound: Long, numPartitions: Int, connectionProperties: Properties)

My issue is that I don't have a column which is incremental like this. Also I need to read data through Query only as my table is quite large. Does anybody know about way to read data through API or I have to create something on my own

Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35
Saurabh Sharma
  • 325
  • 2
  • 4
  • 15
  • What you mean by "incremental column"? You just give Spark the JDBC address for your server – OneCricketeer Aug 11 '16 at 20:50
  • Refer here. The examples don't use the column or bound parameters. https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases – OneCricketeer Aug 11 '16 at 20:53
  • That is correct. The issue is i wont have more than two executionors. That means a parellelism of 2. If i add these variables in test (String, lowerBound: Long,upperBound: Long, numPartitions)one executioner is creating 10 partitions. But if i dont give these partitions only two pareele reading is happening. Do we have any other way to do this? – Saurabh Sharma Aug 12 '16 at 20:00
  • I'm not sure. I'm not too familiar with the JDBC options for Spark – OneCricketeer Aug 12 '16 at 20:23

3 Answers3

9

Saurabh, in order to read in parallel using the standard Spark JDBC data source support you need indeed to use the numPartitions option as you supposed.

But you need to give Spark some clue how to split the reading SQL statements into multiple parallel ones. So you need some sort of integer partitioning column where you have a definitive max and min value.

If your DB2 system is MPP partitioned there is an implicit partitioning already existing and you can in fact leverage that fact and read each DB2 database partition in parallel:

var df = spark.read.
format("jdbc").
option("url", "jdbc:db2://<DB2 server>:<DB2 port>/<dbname>").
option("user", "<username>").
option("password", "<password>").
option("dbtable", "<your table>").
option("partitionColumn", "DBPARTITIONNUM(<a column name>)").
option("lowerBound", "<lowest partition number>").
option("upperBound", "<largest partition number>").
option("numPartitions", "<number of partitions>").
load()

So as you can see the DBPARTITIONNUM() function is the partitioning key here.

Just in case you don't know the partitioning of your DB2 MPP system, here is how you can find it out with SQL:

SELECT min(member_number), max(member_number), count(member_number) 
FROM TABLE(SYSPROC.DB_MEMBERS())

In case you use multiple partition groups and different tables could be distributed on different set of partitions you can use this SQL to figure out the list of partitions per table:

SELECT t2.DBPARTITIONNUM, t3.HOST_NAME
 FROM SYSCAT.TABLESPACES as t1,  SYSCAT.DBPARTITIONGROUPDEF as t2,
      SYSCAT.TABLES t4, TABLE(SYSPROC.DB_MEMBERS()) as t3 
 WHERE t1.TBSPACEID = t4.TBSPACEID AND
       t4.TABSCHEMA='<myschema>' AND
       t4.TABNAME='<mytab>' AND
       t1.DBPGNAME = t2.DBPGNAME AND
       t2.DBPARTITIONNUM = t3.PARTITION_NUMBER;
  • If your DB2 system is dashDB (a simplified form factor of a fully functional DB2, available in cloud as managed service, or as docker container deployment for on prem), then you can benefit from the built-in Spark environment that gives you partitioned data frames in MPP deployments automatically. All you need to do then is to use the special data source spark.read.format("com.ibm.idax.spark.idaxsource")... See also demo notebook here: https://github.com/ibmdbanalytics/dashdb_analytic_tools/blob/master/dashdblocal_notebooks/Tornado_Clustering.ipynb – Torsten Steinbach Dec 19 '16 at 12:59
  • Torsten, this issue is more complicated than that. I know what you are implying here but my usecase was more nuanced.For example, I have a query which is reading 50,000 records . It has subsets on partition on index, Lets say column A.A range is from 1-100 and 10000-60100 and table has four partitions. Ans above will read data in 2-3 partitons where one partition has 100 rcd(0-100),other partition based on table structure. This would lead to max 5 conn for data reading.I did this by extending the Df class and creating partition scheme , which gave me more connections and reading speed. – Saurabh Sharma Dec 22 '16 at 18:31
  • Sarabh, my proposal applies to the case when you have an MPP partitioned DB2 system. In this case don't try to achieve parallel reading by means of existing columns but rather read out the existing hash partitioned data chunks in parallel. Not sure wether you have MPP tough. I am not sure I understand what four "partitions" of your table you are referring to? Are these logical ranges of values in your A.A column? – Torsten Steinbach Dec 24 '16 at 13:15
  • Hi Torsten, Our DB is MPP only. We have four partitions in the table(As in we have four Nodes of DB2 instance). – Saurabh Sharma Dec 27 '16 at 03:45
  • @TorstenSteinbach Is there any way the jar file containing _com.ibm.idax.spark.idaxsource_ can be found outside dashDB local installation? I believe [this](https://github.com/SparkTC/spark-db2) github project might be of some relevance here but I reckon it has been out-of-support for some time already. – kasur May 23 '17 at 13:18
  • com.ibm.idax.spark.idaxsource is not distributed separately. It is available in dashDB local and not also in IBM Data Science Experience notebooks (datascience.ibm.com), where it can be used to work with the Bluemix Spark service with any dashDB system to read and write data in an optimised fashion. – Torsten Steinbach May 24 '17 at 13:55
  • you do not need numPartitions, but it should be the first choice if available. Using the predicates option as I describe in my answer will also provide parallel reads and improve performance significantly. – AdrianVeidt Jan 10 '18 at 15:01
5

You don't need the identity column to read in parallel and the table variable only specifies the source. After registering the table, you can limit the data read from it using your Spark SQL query using aWHERE clause. If this is not an option, you could use a view instead, or as described in this post, you can also use any arbitrary subquery as your table input.

val dataframe = sqlContext.read.format("jdbc").option("url", "jdbc:db2://localhost/sparksql").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "table").option("user", "root").option("password", "root").load()
dataframe.registerTempTable("table")
dataframe.sqlContext.sql("select * from table where dummy_flag=1").collect.foreach(println)
Community
  • 1
  • 1
Alex
  • 21,273
  • 10
  • 61
  • 73
  • As per this http://stackoverflow.com/questions/37468418/regarding-spark-dataframereader-jdbc the data will be read totally. Limiting data does not work in reading data – Saurabh Sharma Aug 12 '16 at 20:02
  • 1
    Can please you confirm this is indeed the case? As per zero323 comment and http://stackoverflow.com/questions/32573991/does-spark-predicate-pushdown-work-with-jdbc, there should be predicate push in this case assuming you are using a `where` predicate in your query. Let me know if this is not the case for DB2, in that case I shall change my answer to use a view, or alternatively you can post your own answer and I shall delete mine. – Alex Aug 12 '16 at 20:31
0

When you do not have some kind of identity column, the best option is to use the "predicates" option as described (

https://spark.apache.org/docs/2.2.1/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,predicates:Array[String],connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame

Each predicate should be built using indexed columns only and you should try to make sure they are evenly distributed. Spark will create a task for each predicate you supply and will execute as many as it can in parallel depending on the cores available.

Typical approaches I have seen will convert a unique string column to an int using a hash function, which hopefully your db supports (something like https://www.ibm.com/support/knowledgecenter/en/SSEPGG_9.7.0/com.ibm.db2.luw.sql.rtn.doc/doc/r0055167.html maybe). Then you can break that into buckets like

mod(abs(yourhashfunction(yourstringid)),numOfBuckets) + 1 = bucketNumber

If you have composite uniqueness, you can just concatenate them prior to hashing.

you can also improve your predicate by appending conditions that hit other indexes or partitions (i.e. AND partitiondate = somemeaningfuldate).

Lastly it should be noted that this is typically not as good as an identity column because it probably requires a full or broader scan of your target indexes - but it still vastly outperforms doing nothing else.

AdrianVeidt
  • 625
  • 5
  • 6