3

I have a dataframe with 2 or more columns and 1000 records. I want to split the data into 100 records chunks randomly without any conditions.

So expected output in records count should be something like this,

[(1,2....100),(101,102,103...200),.....,(900,901...1000)]

Here's the solution that worked for my use case after trying different approaches:

https://stackoverflow.com/a/61276734/12322995

jay
  • 110
  • 1
  • 11
  • 1
    Does this answer your question? [Spark Data Frame Random Splitting](https://stackoverflow.com/questions/40293970/spark-data-frame-random-splitting) – Shaido Apr 15 '20 at 02:10
  • Thanks for suggesting a possible answer but unfortunately it doesn't answer my question. I have updated my question with a solution and also why the `randomSplit` doesn't work in this scenario – jay Apr 16 '20 at 17:10
  • No problems. I would recommend you to move the solution part to a new answer instead of having it in the question (self-answered questions are fine), following this sites Q&A model. – Shaido Apr 17 '20 at 01:05

2 Answers2

1

As @Shaido said randomsplit is ther for splitting dataframe is popular approach..

Thought differently about repartitionByRange with => spark 2.3

repartitionByRange public Dataset repartitionByRange(int numPartitions, scala.collection.Seq partitionExprs) Returns a new Dataset partitioned by the given partitioning expressions into numPartitions. The resulting Dataset is range partitioned. At least one partition-by expression must be specified. When no explicit sort order is specified, "ascending nulls first" is assumed. Parameters: numPartitions - (undocumented) partitionExprs - (undocumented) Returns: (undocumented) Since: 2.3.0

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.{Dataset, SparkSession}

object RepartitionByRange extends App {


  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)

  val spark = SparkSession.builder().appName(getClass.getName).master("local").getOrCreate()
  val sc = spark.sparkContext

  import spark.implicits._

  val t1 = sc.parallelize(0 until 1000).toDF("id")


  val repartitionedOrders: Dataset[String] = t1.repartitionByRange(10, $"id")
    .mapPartitions(rows => {
      val idsInPartition = rows.map(row => row.getAs[Int]("id")).toSeq.sorted.mkString(",")
      Iterator(idsInPartition)
    })

  repartitionedOrders.show(false)
  println("number of chunks or partitions :" + repartitionedOrders.rdd.getNumPartitions)

}

Result :

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                          |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99                                                                                                              |
|100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199|
|200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255,256,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,277,278,279,280,281,282,283,284,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299|
|300,301,302,303,304,305,306,307,308,309,310,311,312,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,333,334,335,336,337,338,339,340,341,342,343,344,345,346,347,348,349,350,351,352,353,354,355,356,357,358,359,360,361,362,363,364,365,366,367,368,369,370,371,372,373,374,375,376,377,378,379,380,381,382,383,384,385,386,387,388,389,390,391,392,393,394,395,396,397,398,399|
|400,401,402,403,404,405,406,407,408,409,410,411,412,413,414,415,416,417,418,419,420,421,422,423,424,425,426,427,428,429,430,431,432,433,434,435,436,437,438,439,440,441,442,443,444,445,446,447,448,449,450,451,452,453,454,455,456,457,458,459,460,461,462,463,464,465,466,467,468,469,470,471,472,473,474,475,476,477,478,479,480,481,482,483,484,485,486,487,488,489,490,491,492,493,494,495,496,497,498,499|
|500,501,502,503,504,505,506,507,508,509,510,511,512,513,514,515,516,517,518,519,520,521,522,523,524,525,526,527,528,529,530,531,532,533,534,535,536,537,538,539,540,541,542,543,544,545,546,547,548,549,550,551,552,553,554,555,556,557,558,559,560,561,562,563,564,565,566,567,568,569,570,571,572,573,574,575,576,577,578,579,580,581,582,583,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599|
|600,601,602,603,604,605,606,607,608,609,610,611,612,613,614,615,616,617,618,619,620,621,622,623,624,625,626,627,628,629,630,631,632,633,634,635,636,637,638,639,640,641,642,643,644,645,646,647,648,649,650,651,652,653,654,655,656,657,658,659,660,661,662,663,664,665,666,667,668,669,670,671,672,673,674,675,676,677,678,679,680,681,682,683,684,685,686,687,688,689,690,691,692,693,694,695,696,697,698,699|
|700,701,702,703,704,705,706,707,708,709,710,711,712,713,714,715,716,717,718,719,720,721,722,723,724,725,726,727,728,729,730,731,732,733,734,735,736,737,738,739,740,741,742,743,744,745,746,747,748,749,750,751,752,753,754,755,756,757,758,759,760,761,762,763,764,765,766,767,768,769,770,771,772,773,774,775,776,777,778,779,780,781,782,783,784,785,786,787,788,789,790,791,792,793,794,795,796,797,798,799|
|800,801,802,803,804,805,806,807,808,809,810,811,812,813,814,815,816,817,818,819,820,821,822,823,824,825,826,827,828,829,830,831,832,833,834,835,836,837,838,839,840,841,842,843,844,845,846,847,848,849,850,851,852,853,854,855,856,857,858,859,860,861,862,863,864,865,866,867,868,869,870,871,872,873,874,875,876,877,878,879,880,881,882,883,884,885,886,887,888,889,890,891,892,893,894,895,896,897,898,899|
|900,901,902,903,904,905,906,907,908,909,910,911,912,913,914,915,916,917,918,919,920,921,922,923,924,925,926,927,928,929,930,931,932,933,934,935,936,937,938,939,940,941,942,943,944,945,946,947,948,949,950,951,952,953,954,955,956,957,958,959,960,961,962,963,964,965,966,967,968,969,970,971,972,973,974,975,976,977,978,979,980,981,982,983,984,985,986,987,988,989,990,991,992,993,994,995,996,997,998,999|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

number of chunks or partitions : 10


UPDATE : randomsplit example :

  import spark.implicits._

  val t1 = sc.parallelize(0 until 1000).toDF("id")
println("With Random Split ")
  val dfarray = t1.randomSplit(Array(1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
  println("number of dataframes " + dfarray.length + "element order is not guaranteed ")
  dfarray.foreach {
    df => df.show
  }

Result : Will be split in to 10 dataframes and order is not guaranteed.

With Random Split 
number of dataframes 10element order is not guaranteed 
+---+
| id|
+---+
|  2|
| 10|
| 16|
| 30|
| 36|
| 46|
| 51|
| 91|
|100|
|121|
|136|
|138|
|149|
|152|
|159|
|169|
|198|
|199|
|220|
|248|
+---+
only showing top 20 rows

+---+
| id|
+---+
| 26|
| 40|
| 45|
| 54|
| 63|
| 72|
| 76|
|107|
|129|
|137|
|142|
|145|
|153|
|162|
|173|
|179|
|196|
|208|
|214|
|232|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  7|
| 12|
| 31|
| 32|
| 38|
| 42|
| 53|
| 61|
| 68|
| 73|
| 80|
| 89|
| 96|
|115|
|117|
|118|
|131|
|132|
|139|
|146|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  0|
| 24|
| 35|
| 57|
| 58|
| 65|
| 77|
| 78|
| 84|
| 86|
| 90|
| 97|
|156|
|158|
|168|
|174|
|182|
|197|
|218|
|242|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  1|
|  3|
| 17|
| 18|
| 19|
| 33|
| 70|
| 71|
| 74|
| 83|
|102|
|104|
|108|
|109|
|122|
|128|
|143|
|150|
|154|
|157|
+---+
only showing top 20 rows

+---+
| id|
+---+
| 14|
| 15|
| 29|
| 44|
| 64|
| 75|
| 88|
|103|
|110|
|113|
|116|
|120|
|124|
|135|
|155|
|213|
|221|
|238|
|241|
|251|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  5|
|  9|
| 21|
| 22|
| 23|
| 25|
| 27|
| 47|
| 52|
| 55|
| 60|
| 62|
| 69|
| 93|
|111|
|114|
|141|
|144|
|161|
|164|
+---+
only showing top 20 rows

+---+
| id|
+---+
| 13|
| 20|
| 39|
| 41|
| 49|
| 56|
| 67|
| 85|
| 87|
| 92|
|105|
|106|
|126|
|127|
|160|
|165|
|166|
|171|
|175|
|184|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  4|
| 34|
| 50|
| 79|
| 81|
|101|
|119|
|123|
|133|
|147|
|163|
|170|
|180|
|181|
|193|
|202|
|207|
|222|
|226|
|233|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  6|
|  8|
| 11|
| 28|
| 37|
| 43|
| 48|
| 59|
| 66|
| 82|
| 94|
| 95|
| 98|
| 99|
|112|
|125|
|130|
|134|
|140|
|183|
+---+
only showing top 20 rows
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Thanks for the answer. But what if I wanted it dynamically. Like I don't know the count of total records but I still want it to have chunks of 100. I guess repartitionByRange only helps when you know exactly how many records you have and we divide it into 10 partitions because we wan want 100 each – jay Apr 15 '20 at 14:59
  • updated randomsplit example as well. AFAIK there is no other way to achive this. – Ram Ghadiyaram Apr 15 '20 at 15:15
  • @RamGhandiyaram Thanks for sharing your thoughts on a possible solution. randomSplit might definitely no be an option and repartitionByRange can work if we change the partitions count dynamically. But I have edited my question with the solution that worked for me https://stackoverflow.com/q/61219832/12322995 Thank you – jay Apr 16 '20 at 16:51
  • `df.collect.grouped(10)` is wrong it may cause OOM for large data since you are collecting .. you count approx logic can be applied for repartitionBy Range to know number of partition you want you can do count approx and divide by 100 and pass that to repartitionByRange should work – Ram Ghadiyaram Apr 16 '20 at 17:11
1

Since I want the data to be evenly distributed and to be able to use the chunks separately or in iterative manner using randomSplit doesn't work as it may leave empty dataframes or unequal distribution.

So using grouped can be one of the most feasible solutions here if you don't mind calling collect on your dataframe.

Eg: val newdf = df.collect.grouped(10)

That gives an Iterator[List[org.apache.spark.sql.Row]] = non-empty iterator. Can also convert it into list by adding .toList at the end

Another possible solution if we don't want Array chunks of data from the dataframe but still want to partition the data with equal counts of records we can try to use countApprox by adjusting timeout and confidence as required. Then divide that with number of records we need in a partition, which can be later used as number of partitions when using repartition or Coalesce.

countApprox instead of count because it is less expensive operation and you can feel the difference when the data size is huge

val approxCount = df.rdd.countApprox(timeout = 1000L,confidence = 0.95).getFinalValue().high

val numOfPartitions = Math.max(Math.round(approxCount / 100), 1).toInt

df.repartition(numOfPartitions)
jay
  • 110
  • 1
  • 11