0

I can read the Oracle table using this simple Scala program:

   val spark = SparkSession
.builder
.master("local[4]")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", 4)
.config("spark.task.cpus", 1)
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:oracle:thin:@x.x.x.x:1521:orcl")
.option("dbtable", "big_table")
.option("user", "test")
.option("password", "123456")
.load()

 jdbcDF.show()

However, the table is huge and each node have to read part of it. So, I must use a hash function to distribute data among Spark nodes. To have that Spark has Predicates. In fact, I did that in Python. The table has the column named NUM, that Hash Function receives each value and returns an Integer between num_partitions and 0. The predicate list is in following:

 hash_function = lambda x: 'ora_hash({}, {})'.format(x, num_partitions)    
 hash_df = connection.read_sql_full(
    'SELECT distinct {0} hash FROM {1}'.format(hash_function(var.hash_col), source_table_name))
hash_values = list(hash_df.loc[:, 'HASH'])

hash_values for num_partitions=19 is :

hash_values=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]

predicates = [
    "to_date({0},'YYYYMMDD','nls_calendar=persian')= to_date({1} ,'YYYYMMDD','nls_calendar=persian') " \
    "and hash_func({2},{3}) = {4}"
        .format(partition_key, current_date, hash_col, num_partitions, hash_val) for hash_val in
    hash_values]

Then I read the table based on the predicates like this:

 dataframe = spark.read \
        .option('driver', 'oracle.jdbc.driver.OracleDriver') \
        .jdbc(url=spark_url,
              table=table_name,
              predicates=predicates)

Would you please guide me how to create Predicates List in Scala as I explained in Python?

Any help is really appreciated.

M_Gh
  • 1,046
  • 4
  • 17
  • 43

1 Answers1

1

Problem Solved.

I changed the code like this, then it's work:

import org.apache.spark.sql.SparkSession
import java.sql.Connection
import oracle.jdbc.pool.OracleDataSource

object main extends App {

def read_spark(): Unit = {
   val numPartitions = 19
   val partitionColumn = "name"
   val field_date = "test"
   val current_date = "********"
   // Define JDBC properties
   val url = "jdbc:oracle:thin:@//x.x.x.x:1521/orcl"
   val properties = new java.util.Properties()
   properties.put("url", url)
   properties.put("user", "user")
   properties.put("password", "pass")
   // Define the where clauses to assign each row to a partition
   val predicateFct = (partition: Int) => s"""ora_hash("$partitionColumn",$numPartitions) = $partition"""
   val predicates = (0 until numPartitions).map{partition => predicateFct(partition)}.toArray

   val test_table = s"(SELECT * FROM table where $field_date=$current_date) dbtable"
 // Load the table into Spark
   val df = spark.read
  .format("jdbc")
  .option("driver", "oracle.jdbc.driver.OracleDriver")
  .option("dbtable", test_table)
  .jdbc(url, test_table, predicates, properties)
  df.show()
 }
 val spark = SparkSession
.builder
.master("local[4]")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", 4)
.config("spark.task.cpus", 1)
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

 read_spark()
  
 }
M_Gh
  • 1,046
  • 4
  • 17
  • 43