1

I was asked to load a table that resides on Oracle database. I read the table in the below way:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
      def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
            val colList                = allColumns.split(",").toList
            val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
            val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
            val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
            val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                          .option("user", devUserName).option("password", devPassword)
                                                                          .option("partitionColumn","source_system_name")
                                                                          .option("lowerBound", 1).option("upperBound", 200000)
                                                                          .option("numPartitions",5).load()
            val totalCols:List[String] = splitColumns ++ textList
            val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
            hiveDataTypes              = cdt.gpDetails()
            val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
            val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
            val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
            val resultDF               = yearDF.select(allCols:_*)
            val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
            val finalDF                = stringColumns.foldLeft(resultDF) {
              (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
            }
            finalDF
      }
        val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)

Spark-submit used:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/username/jars/postgresql-42.1.4.jar  --jars /home/username/jars/postgresql-42.1.4.jar --num-executors 2 --executor-cores 3 --executor-memory 60g --driver-memory 40g --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/username/username.keytab --principal username@DEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

It is of 1Tb size. I have seen the official documentation of Spark on how to load the data into Hive tables using dataframes. But this is all 'in-memory' process. One of my colleague suggested the below steps instead of using dataframes to save the data into Hive tables.

  1. Read the RDBMS table & make changes as per the requirement and create a final datadframe.
  2. Save the dataframe into a file on HDFS.
  3. Load the same file into a Hive table.

I am not sure what is best way to go further. Could anyone let me know which is the best suited process to move huge data into Hive tables on HDFS.

Metadata
  • 2,127
  • 9
  • 56
  • 127
  • Writing file into HDFS and loading the file into Hive external table gives low performance when you query on it from my experience. – Mansoor Baba Shaik Oct 01 '18 at 13:31
  • use sqoop to load data directly into hive from oracle, no need to use spark. – Gaurang Shah Oct 01 '18 at 18:31
  • @Gaurang, I need to do some transformation in between. Before that, I'm trying to see if I can move the data across successfully. Hence I can't use scoop. – Metadata Oct 01 '18 at 18:32
  • then create external table and load files into those directory. this would be much faster. – Gaurang Shah Oct 01 '18 at 18:37
  • So 1. Dump data into a file on HDFS 2. Load the same file into the hive table. Did I understand it correctly. Because even saving it as a file on HDFS is giving me: GC Overhead memory exception – Metadata Oct 01 '18 at 18:40
  • @Gaurang Shah, I have posted the problem here:https://stackoverflow.com/questions/52463214/how-to-fix-the-exception-java-lang-outofmemoryerror-gc-overhead-limit-exceeded/52465485#52465485 but the given answer is also not working. – Metadata Oct 01 '18 at 18:48
  • @Metadata you should continue there. What I was suggesting is to save the file as csv in hdfs rather than loading it into table. sometime like `final_df.rdd.repartition(1).saveAsTextFile` – Gaurang Shah Oct 01 '18 at 19:13
  • @GaurangShah `repartition(1)` and saving as Text will negativly impact any future queries on that data, assuming it is less than the block size. Many files, and in a columnar format (such as default, Parquet) would be best – OneCricketeer Oct 02 '18 at 08:05
  • In any case, I use this approach - https://stackoverflow.com/a/35478497/2308683 – OneCricketeer Oct 02 '18 at 08:09
  • The "better" solution would actually use Debezium Postgres Connector, pull the data out of the database, into Kafka, then stream it row-by-row into HDFS, and then partition the stream into HDFS in smaller chunks. From that, you can build a Hive schema. That way, you aren't polling a massive chunk of JDBC data into memory all at once, and hoping it'll all make it to the final destination – OneCricketeer Oct 02 '18 at 08:12
  • Well, we don't have Kafka in our project. I tried this from your suggestion and implemented it as: spark.sql(s"create table default.sample as select * from preparedDF"); Output: java.lang.OutOfMemoryError: Java heap space – Metadata Oct 02 '18 at 08:51
  • @cricket_007 it's my mistake. please either remove the reparition or use it a higher number depends on your infrastructure – Gaurang Shah Oct 02 '18 at 14:35

0 Answers0