One possible implementation (similar to the link in the second comment) i put in:
def doTrainingOffer(inDataSet: Dataset[Row],
fieldName: String,
training_offer_list: List[(Long, Long, Int, String, String)]):
(Dataset[Row], Option[Dataset[Row]]) = {
println("Doing Training Offer!")
val randomDs = inDataSet
.withColumn("row_id", rank().over(Window.partitionBy().orderBy(fieldName)))
.orderBy(rand)
randomDs.cache()
val count = randomDs.count()
println(s"The total no of rows for this use case is: ${count}")
val trainedDatasets = new mutable.ArrayBuffer[Dataset[Row]]()
var startPos = 0l
var endPos = 0l
for (trainingOffer <- training_offer_list) {
val noOfRows = scala.math.round(count * trainingOffer._3 / 100.0)
endPos += noOfRows
println(s"for training offer id: ${trainingOffer._1} and percent of ${trainingOffer._3}, the start and end are ${startPos}, ${endPos}")
trainedDatasets += addTrainingData(randomDs.where(col("row_id") > startPos && col("row_id") <= endPos), trainingOffer)
startPos = endPos
}
val combinedDs = trainedDatasets.reduce(_ union _)
// (left over for other offer, trainedOffer)
(randomDs.join(combinedDs, Seq(field_name), "left_anti"), Option(combinedDs))
}
And another possible implementation::
val randomDs = inDataSet.orderBy(rand)
randomDs.cache()
val count = randomDs.count()
println(s"The total no of rows for this use case is: ${count}")
val trainedDatasets = new mutable.ArrayBuffer[Dataset[Row]]()
for (trainingOffer <- training_offer_list) {
if (trainedDatasets.length > 1) {
val combinedDs = trainedDatasets.reduce(_ union _)
val remainderDs = randomDs.join(combinedDs, Seq(field_name), "left_anti")
trainedDatasets += addTrainingData(remainderDs.limit(scala.math.round(count * trainingOffer._3 / 100)), trainingOffer)
}
else if (trainedDatasets.length == 1) {
val remainderDs = randomDs.join(trainedDatasets(0), Seq(field_name), "left_anti")
trainedDatasets += addTrainingData(remainderDs.limit(scala.math.round(count * trainingOffer._3 / 100)), trainingOffer)
}
else {
val tDs = randomDs.limit(scala.math.round(count * trainingOffer._3 / 100))
trainedDatasets += addTrainingData(tDs, trainingOffer)
}
}
val combinedDs = trainedDatasets.reduce(_ union _)
// (left over for other offer, trainedOffer)
(randomDs.join(combinedDs, Seq(field_name), "left_anti"), Option(combinedDs))