4

It is understood , while migrating/load from oracle db to hdfs/parquet , it is preferred to use SQOOP rather than SPARK with JDBC driver.

Spark suppose to be 100x faster when processing right ? Then what is wrong with Spark ? Why people prefer SQOOP while loading data from oracle db tables ?

Please suggest me what should i need to do make Spark faster when loading data from oracle.

sandy kay
  • 115
  • 1
  • 14
BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • 1
    Possible duplicate of [How to improve performance for slow Spark jobs using DataFrame and JDBC connection?](https://stackoverflow.com/questions/32188295/how-to-improve-performance-for-slow-spark-jobs-using-dataframe-and-jdbc-connecti) – user10938362 May 24 '19 at 09:59

2 Answers2

4

Spark is fast when it knows how to parallelize queries. If you're just executing single query, then Spark doesn't know what to do. You can improve speed by using parameters lowerBound, upperBound, numPartitions when reading data with spark.read.jdbc, but it depends really on the design of your tables.

You can find more documentation here.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
2

The major point is already covered in Alex's answer.

I just wanted to add an example,

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[4]").appName("Test-JDBC").getOrCreate()

ds = spark.read.jdbc("jdbc:mysql://localhost:3306/stackexchange", "(select min(id), max(id) from post_history) as ph",
                     properties={"user": "devender", "password": "*****", "driver": "com.mysql.jdbc.Driver"})

r = ds.head()
minId = r[0]
maxId = r[1]

ds = spark.read.jdbc("jdbc:mysql://localhost:3306/stackexchange", "(select * from post_history) as ph",
                     properties={"user": "devender", "password": "*****", "driver": "com.mysql.jdbc.Driver"},
                     numPartitions=4, column="id", lowerBound=minId, upperBound=maxId)

count = ds.count()
print(count)

For more details, https://gist.github.com/devender-yadav/5c4328918602b7910ba883e18b68fd87


Note: Sqoop automatically executes boundary query to fetch MIN, MAX value for split by column (that query can also be overridden)

Dev
  • 13,492
  • 19
  • 81
  • 174
  • thank you , but is there any way we can derive the numPartitions dynamically ? – BdEngineer May 28 '19 at 09:42
  • No generic way. You need to create an algorithm based on the number of records (source side), max number of concurrent jobs run of spark (yarn) and the maximum number of concurrent database connections. – Dev May 28 '19 at 11:20