Above 2 approaches are may be right in general.. but some how I dont like collecting the
data because of performance reasons... specially if data is huge...
org.apache.spark.util.CollectionAccumulator
is right candidate for this kind of requirements... see docs
Also if data is huge then foreachPartition
is right candidate for this for performance reasons again!
Below is the implementation
package examples
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator
import scala.collection.JavaConversions._
import scala.collection.mutable
object TableTest extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
val spark = SparkSession.builder.appName(getClass.getName)
.master("local[*]").getOrCreate
import spark.implicits._
val lookup =
Seq(("db", "txn", "ID"), ("db", "sales", "ID")
, ("db", "fee", "ID")
).toDF("DB", "TBL", "COL")
val collAcc: CollectionAccumulator[String] = spark.sparkContext.collectionAccumulator[String]("mySQL Accumulator")
val data = lookup.foreachPartition { partition =>
partition.foreach {
{
record => {
val selectString = s"select count(*), transaction_number from ${record.getAs[String]("DB")}.${record.getAs[String]("TBL")} group by ${record.getAs[String]("COL")} having count(*)>1";
collAcc.add(selectString)
println(selectString)
}
}
}
}
val mycollectionOfSelects: mutable.Seq[String] = asScalaBuffer(collAcc.value)
val finaldf = mycollectionOfSelects.map { x => spark.sql(x)
}.reduce(_ union _)
finaldf.show
}
Sample Result :
[2019-08-13 12:11:16,458] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[Stage 0:> (0 + 0) / 2]
select count(*), transaction_number from db.txn group by ID having count(*)>1
select count(*), transaction_number from db.sales group by ID having count(*)>1
select count(*), transaction_number from db.fee group by ID having count(*)>1
Note : since those are psuedo tables I have NOT displayed dataframe.