15

I have a Dataframe and wish to divide it into an equal number of rows.

In other words, I want a list of dataframes where each one is a disjointed subset of the original dataframe.

Let's say the input dataframer is the following:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+

I wish to split it in K equal sized dataframes. If k = 4, then a possible results would be:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+
Alessandro La Corte
  • 419
  • 3
  • 6
  • 18
  • Similar to `RDD`s there is no split operation in a single transformation. The easiest way would be to use `COUNT` once in cominbation with `LIMIT` and `OFFSET`...but, since there is also no `OFFSET` in Spark, you could use the workaround via sliding, see https://stackoverflow.com/questions/31685714/how-to-transform-data-with-sliding-window-over-time-series-data-in-pyspark – UninformedUser May 23 '17 at 13:39
  • Example with input and output will get you answers quickly. So update your question accordingly – Ramesh Maharjan May 23 '17 at 16:37
  • Could the .randomSplit() method be what you are looking for as proposed here: https://stackoverflow.com/questions/43567164/scala-how-so-i-split-dataframe-to-multiple-csv-files-based-on-number-of-rows ? – Steffen Schmitz May 23 '17 at 18:42
  • @SteffenSchmitz randomSplit gives an unbalanced distribution. – Alessandro La Corte May 24 '17 at 10:57
  • @RameshMaharjan I added input and output as you desired. – Alessandro La Corte May 24 '17 at 11:01

6 Answers6

6

Another solution is to use limit and except. The following program will return an array with Dataframes that have an equal number of rows. Except the first one that may contain less rows.

var numberOfNew = 4
var input = List(1,2,3,4,5,6,7,8,9).toDF
var newFrames = 0 to numberOfNew map (_ => Seq.empty[Int].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt

while (size > 0) {
    newFrames(numberOfNew) = input.limit(limit)
    input = input.except(newFrames(numberOfNew))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

+-----+
|value|
+-----+
|    7|
+-----+

+-----+
|value|
+-----+
|    4|
|    8|
+-----+

+-----+
|value|
+-----+
|    5|
|    9|
+-----+

...
Steffen Schmitz
  • 860
  • 3
  • 16
  • 34
3

According to my understanding from your input and required output, you can create row numbers by grouping the dataframe with one groupId.

Then you can just filter dataframe comparing the row number and storing them somewhere else according to your needs.

Following is the temporary solution to your needs. You can change according to your needs

val k = 4

val windowSpec = Window.partitionBy("grouped").orderBy("original_dt")

val newDF = dataFrame.withColumn("grouped", lit("grouping"))

var latestDF = newDF.withColumn("row", row_number() over windowSpec)

val totalCount = latestDF.count()
var lowLimit = 0
var highLimit = lowLimit + k

while(lowLimit < totalCount){
  latestDF.where(s"row <= ${highLimit} and row > ${lowLimit}").show(false)
  lowLimit = lowLimit + k
  highLimit = highLimit + k
}

I hope this will give you a good start.

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • I am having a bit of trouble with this code. I have not used Window functions before. Also, is it requiered to have a HiveContext? – Alessandro La Corte May 30 '17 at 07:18
  • What is the exact trouble? I haven't used HiveContext yet but according to what I understand HiveContext is used to create an environment (to read, write and configuratiions settings) . Rest of the works are transformations which is same in every context ( sqlContext, sparkContext or HiveContext) . Above solution is all transformations only. – Ramesh Maharjan May 30 '17 at 07:25
  • I get this error. "Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;" And the following import: "import org.apache.spark.sql.functions.row_number" – Alessandro La Corte May 30 '17 at 07:31
  • Thats because you haven't imported a spark sql library `import org.apache.spark.sql.functions._` – Ramesh Maharjan May 30 '17 at 07:32
  • I apologize as I forgot to mention I am using Spark 1.6. From this post: https://stackoverflow.com/questions/40319126/spark-window-functions-requires-hivecontext it seem I do require the HiveContext – Alessandro La Corte May 30 '17 at 07:49
  • Yes you are right. I didn't know that too. Why don't you upgrade your spark then? – Ramesh Maharjan May 30 '17 at 08:03
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/145422/discussion-between-ramesh-maharjan-and-alessandro-la-corte). – Ramesh Maharjan May 30 '17 at 08:23
3

This is an improved answer on that of Steffen Schmitz that is in fact incorrect. I have improved it for posterity and generalized it. I do wonder about the performance at scale, however.

var numberOfNew = 4  
var input = Seq((1,2),(3,4),(5,6),(7,8),(9,10),(11,12)).toDF
var newFrames = 0 to numberOfNew-1 map (_ => Seq.empty[(Int, Int)].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
val limit_fract = (size / numberOfNew.toFloat)
val residual = ((limit_fract.toDouble - limit.toDouble) * size).toInt
var limit_to_use = limit

while (numberOfNew > 0) {

    if (numberOfNew == 1 && residual != 0) limit_to_use = residual  

    newFrames(numberOfNew-1) = input.limit(limit_to_use)
    input = input.except(newFrames(numberOfNew-1))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

val singleDF = newFrames.reduce(_ union _)
singleDF.show(false)

returns individual dataframes:

+---+---+
| _1| _2|
+---+---+
|  7|  8|
|  3|  4|
| 11| 12|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  5|  6|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  9| 10|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  1|  2|
+---+---+
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
1

If you want to divide a dataset into n equal datasets

double[] arraySplit = {1,1,1...,n}; //you can also divide into ratio if you change the numbers.

List<Dataset<String>> datasetList = dataset.randomSplitAsList(arraySplit,1);
mustaccio
  • 18,234
  • 16
  • 48
  • 57
1

Dunno if this is performant compared to the other options, but I think it looks prettier at least:

import spark.implicits._
import org.apache.spark.sql.functions._

val df = Seq(1,2,3,4,5,6,7,8,9,0).toDF
val split_count = 4
val to_be_split = df.withColumn("split", monotonically_increasing_id % split_count)
val dfs = (0 until split_count).map(n => to_be_split.where('split === n).drop('split))

dfs.foreach(_.show)
+-----+
|value|
+-----+
|    1|
|    5|
|    9|
+-----+

+-----+
|value|
+-----+
|    2|
|    6|
|    0|
+-----+

+-----+
|value|
+-----+
|    3|
|    7|
+-----+

+-----+
|value|
+-----+
|    4|
|    8|
+-----+

Jeremy
  • 1,824
  • 14
  • 20
0

you could use

val result = df.randomSplit(Array(0.25,0.25,0.25,0.25), 1)

to split dataframe into smaller chunks. The array could be expanded based on required split. (second argument=1 is seed and could be changed if required)

To read use

result(0).count

or 

result(1).count 

based on how many splits are done.