4

I am trying to fetch rows from a lookup table (3 rows and 3 columns) and iterate row by row and pass values in each row to a SPARK SQL as parameters.

DB | TBL   | COL
----------------
db | txn   | ID

db | sales | ID

db | fee   | ID

I tried this in spark shell for one row, it worked. But I am finding it difficult to iterate over rows.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val db_name:String = "db"

val tbl_name:String = "transaction"

val unique_col:String = "transaction_number"

val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1") 

Please let me know how I can iterate over the rows and pass as parameters ?

user3190018
  • 890
  • 13
  • 26
vvazza
  • 421
  • 7
  • 21

3 Answers3

4

Above 2 approaches are may be right in general.. but some how I dont like collecting the data because of performance reasons... specially if data is huge...

org.apache.spark.util.CollectionAccumulator is right candidate for this kind of requirements... see docs

Also if data is huge then foreachPartition is right candidate for this for performance reasons again!

Below is the implementation

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator

import scala.collection.JavaConversions._
import scala.collection.mutable

object TableTest extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)


  val spark = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate

  import spark.implicits._

 val lookup =
    Seq(("db", "txn", "ID"), ("db", "sales", "ID")
     , ("db", "fee", "ID")
    ).toDF("DB", "TBL", "COL")
  val collAcc: CollectionAccumulator[String] = spark.sparkContext.collectionAccumulator[String]("mySQL Accumulator")
  val data = lookup.foreachPartition { partition =>
    partition.foreach {
      {
        record => {
          val selectString = s"select count(*), transaction_number from ${record.getAs[String]("DB")}.${record.getAs[String]("TBL")} group by ${record.getAs[String]("COL")} having count(*)>1";
          collAcc.add(selectString)
          println(selectString)
        }
      }
    }
  }
  val mycollectionOfSelects: mutable.Seq[String] = asScalaBuffer(collAcc.value)
  val finaldf = mycollectionOfSelects.map { x => spark.sql(x)
  }.reduce(_ union _)
  finaldf.show

}

Sample Result :

[2019-08-13 12:11:16,458] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[Stage 0:>                                                          (0 + 0) / 2]

select count(*), transaction_number from db.txn group by ID having count(*)>1

select count(*), transaction_number from db.sales group by ID having count(*)>1

select count(*), transaction_number from db.fee group by ID having count(*)>1


Note : since those are psuedo tables I have NOT displayed dataframe.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Hello Ram ! Thank you so much for the details. Will apply the solution. I have one question. In your solution, you have hard-coded table columns which i wanted it to be dynamic. How can i make it dynamic? – vvazza Aug 14 '19 at 18:14
2
val lookup =
  Seq(("db", "txn", "ID"), ("db", "sales", "ID")).toDF("DB", "TBL", "COL")

val data = lookup
  .collect()
  .map(
    x =>
      (x.getAs[String]("DB"), x.getAs[String]("TBL"), x.getAs[String]("COL"))
  )
  .map(
    y =>
      sparkSession.sql(
        s"select count(*), transaction_number from ${y._1}.${y._2} group by ${y._3} having count(*)>1"
    )
  )
  .reduce(_ union _)
Gforz
  • 150
  • 2
  • 13
0

Change the DF into Arrays. From that point you can iterate through the string objects and build the string input query for the Spark.sql command. Below I gave a quick except about how you would do it, however it is fairly complex.

//Pull in the needed columns, remove all duplicates
val inputDF = spark.sql("select * from " + dbName + "." + tableName). selectExpr("DB", "TBL", "COL").distinct

//Hold all of the columns as arrays
////dbArray(0) is the first element of the DB column
////dbArray(n-1) is the last element of the DB column
val dbArray = inputDF.selectExpr("DB").rdd.map(x=>x.mkString).collect
val tableArray  = inputDF.selectExpr("TBL").rdd.map(x=>x.mkString).collect
val colArray  = inputDF.selectExpr("COL").rdd.map(x=>x.mkString).collect

//Need to hold all the dataframe objects and values as we build insert and union them as we progress through loop
var dupDF = spark.sql("select 'foo' as bar")
var interimDF = dupDF
var initialDupDF = dupDF
var iterator = 1

//Run until we reach end of array
while (iterator <= dbArray.length)
{
  //on each run insert the array elements into string call
  initialDupDF = spark.sql("select count(*), transaction_number from " + dbArray(iterator - 1)  + "." + tableArray(iterator - 1) + " group by " + colArray(iterator - 1) + " having count(*)>1") 

  //on run 1 overwrite the variable, else union
  if (iterator == 1) {
    interimDF = initialDupDF
  } else {
    interimDF = dupDF.unionAll(initialDupDF)
  }

  //This is needed because you cant do DF = DF.unionAll(newDF)
  dupDF = interimDF
  iterator = iterator + 1
}
afeldman
  • 492
  • 2
  • 10