1

I am executing multiple hive queries in a loop from my spark job using the following piece of code

implicit val sparkSession = SparkSession
      .builder()
      .config(sparkConf)
      .enableHiveSupport()
      .getOrCreate()

 val bizEventFolders = fs.listStatus(outputPath)
    bizEventFolders.foreach(folder => {
      val filePath = folder.getPath().toString
      if(filePath.contains(outputDir+"biz_evnt_key=")){
        val bizEventKey = filePath.replaceAll(outputDir+"biz_evnt_key=","")
        val addPartitionHiveQuery = s"alter table $tableName add if not exists partition (year=$year, month=$month, day=$day, hr=$hour,biz_evnt_key=$bizEventKey) location '${outputDir}biz_evnt_key=$bizEventKey'"
        sparkSession.sql(addPartitionHiveQuery)
        logger.info(s"successfully ran add partition hive query $addPartitionHiveQuery")
      }
    })

problem is, I have to run multiple such queries one after the other for adding all the partitions to the HIVE table, how can I submit all the queries at once to spark instead of firing them one by one?

Gabio
  • 9,126
  • 3
  • 12
  • 32

1 Answers1

0

You can use Scala Futures or another parallel AP to run those queries in parallel.

A simple solution might be using par:

bizEventFolders.par.foreach(folder => {
      val filePath = folder.getPath().toString
      if(filePath.contains(outputDir+"biz_evnt_key=")){
        val bizEventKey = filePath.replaceAll(outputDir+"biz_evnt_key=","")
        val addPartitionHiveQuery = s"alter table $tableName add if not exists partition (year=$year, month=$month, day=$day, hr=$hour,biz_evnt_key=$bizEventKey) location '${outputDir}biz_evnt_key=$bizEventKey'"
        sparkSession.sql(addPartitionHiveQuery)
        logger.info(s"successfully ran add partition hive query $addPartitionHiveQuery")
      }
    })

By invoking the par method on the sequential collection (list for example), it becomes a parallel collection and one can use it in the same way as it was a sequential collection.

Gabio
  • 9,126
  • 3
  • 12
  • 32