3

I am trying to split a dataset into training and non-training using

inDataSet.randomSplit(weights.toArray, 0)

for every run, i get different results. is this expected? if so, how can i get same percentage of rows every time?

eg: the weights of random split of Training Offer are: ArrayBuffer(0.3, 0.7) - for this i have a total of 72 rows, for weight 0.3, am expecting, around 21 rows. sometimes i get 23, 29, 19, 4. Pls guide.

note: i gave the total weight of 1.0 (0.3 + 0.7) not to normalize.

-- the other question is useful, but that is within the single execution. I am running my test N times, and every time I get different result sets.

Vijay
  • 151
  • 1
  • 8
  • 1
    Possible duplicate of [How does Spark keep track of the splits in randomSplit?](https://stackoverflow.com/questions/38379522/how-does-spark-keep-track-of-the-splits-in-randomsplit) – Shaido Jun 22 '18 at 02:39
  • It's due to how the split is performed in Spark as can be seen in the answer to the question in the above comment. There is no guarantee on the number of rows in each part. The answers here could give an idea of how to always get the same number of rows: https://stackoverflow.com/questions/44135610/spark-scala-split-dataframe-into-equal-number-of-rows – Shaido Jun 22 '18 at 02:41
  • 1
    i had a look at the other ans, but my question is, for every run, it should get the same weighted no of rows in each dataset. not different.. – Vijay Jun 22 '18 at 05:34
  • The problem is in how Spark divides up the rows. It computes a random number between 0 and 1 for each row, and in this case if the number is below 0.3 it's in the first group otherwise in the second. This will of course give different sizes on the groups for each run (since random numbers are used). The second question I linked to contains information on how to get the same number of rows each time. – Shaido Jun 22 '18 at 05:37
  • Thanks, i see your point!. I had implemented a similar logic in my code, but that was not efficient, when i had "filter" and "union" followed by it. I had to combine the datasets for some other % operation across the whole dataset. Pls refer the implementation i had.. – Vijay Jun 22 '18 at 05:50

2 Answers2

0

One possible implementation (similar to the link in the second comment) i put in:

    def doTrainingOffer(inDataSet: Dataset[Row],
                      fieldName: String,
                      training_offer_list: List[(Long, Long, Int, String, String)]):
  (Dataset[Row], Option[Dataset[Row]]) = {
    println("Doing Training Offer!")

    val randomDs = inDataSet
              .withColumn("row_id", rank().over(Window.partitionBy().orderBy(fieldName)))
              .orderBy(rand)

    randomDs.cache()
    val count = randomDs.count()
    println(s"The total no of rows for this use case is: ${count}")

    val trainedDatasets = new mutable.ArrayBuffer[Dataset[Row]]()
    var startPos = 0l
    var endPos = 0l
    for (trainingOffer <- training_offer_list) {
      val noOfRows = scala.math.round(count * trainingOffer._3 / 100.0)
      endPos += noOfRows
      println(s"for training offer id: ${trainingOffer._1} and percent of ${trainingOffer._3}, the start and end are ${startPos}, ${endPos}")
      trainedDatasets += addTrainingData(randomDs.where(col("row_id") > startPos && col("row_id") <= endPos), trainingOffer)
      startPos = endPos
    }

    val combinedDs = trainedDatasets.reduce(_ union _)
    // (left over for other offer, trainedOffer)
    (randomDs.join(combinedDs, Seq(field_name), "left_anti"), Option(combinedDs))
  }

And another possible implementation::

val randomDs = inDataSet.orderBy(rand)
    randomDs.cache()
    val count = randomDs.count()
    println(s"The total no of rows for this use case is: ${count}")
    val trainedDatasets = new mutable.ArrayBuffer[Dataset[Row]]()

    for (trainingOffer <- training_offer_list) {
      if (trainedDatasets.length > 1) {
        val combinedDs = trainedDatasets.reduce(_ union _)
        val remainderDs = randomDs.join(combinedDs, Seq(field_name), "left_anti")
        trainedDatasets += addTrainingData(remainderDs.limit(scala.math.round(count * trainingOffer._3 / 100)), trainingOffer)
      }
      else if (trainedDatasets.length == 1) {
        val remainderDs = randomDs.join(trainedDatasets(0), Seq(field_name), "left_anti")
        trainedDatasets += addTrainingData(remainderDs.limit(scala.math.round(count * trainingOffer._3 / 100)), trainingOffer)
      }
      else {
        val tDs = randomDs.limit(scala.math.round(count * trainingOffer._3 / 100))
        trainedDatasets += addTrainingData(tDs, trainingOffer)
      }
    }

    val combinedDs = trainedDatasets.reduce(_ union _)
    // (left over for other offer, trainedOffer)
    (randomDs.join(combinedDs, Seq(field_name), "left_anti"), Option(combinedDs))
Vijay
  • 151
  • 1
  • 8
0

If you use the parameter seed=1234, you can produce consistent results. Using dataframe.cache() function may work as well.

AceRam
  • 279
  • 3
  • 6