2

I want to split up a dataframe of 2,7 million rows into small dataframes of 100000 rows, so end up with like 27 dataframes, which I want to store as csv files too.

I took a look at this partitionBy and groupBy already, but I don't need to worry about any conditions, except that they have to be ordered by date. I am trying to write my own code to make this work, but if you know about some Scala (Spark) functions I could use, that would be great!

Thank you all for the suggestions!

Eve
  • 604
  • 8
  • 26
  • There are a couple of answers [here](https://stackoverflow.com/questions/44135610/spark-scala-split-dataframe-into-equal-number-of-rows) that could get you started. – Travis Hegner Mar 14 '19 at 13:25
  • Thanks, I started with this yes, before I asked the questions, but those solutions were not really helpful to me :) – Eve Mar 14 '19 at 14:14
  • @Eva, if your goal is to break data to save smaller csv files, you can just do df.repartition(27).write.csv("/path"). You will have part000, part002, .. part026 files under "/path" folder – C.S.Reddy Gadipally Mar 14 '19 at 15:01
  • I will give this a try too, it seems a very simple way to do it, if it takes care of the ordering, then it should be enough too! The problem comes only when the dataframe grows, then the repartition has to be changed everytime... But for quick results it looks good! Thanks :) – Eve Mar 14 '19 at 15:34
  • Repartitioning can be costly (it moves around all your data) and will not preserve the order. – Oli Mar 14 '19 at 16:12
  • Can you add examples of records? – joumvaer92 Mar 14 '19 at 22:09

1 Answers1

2

You could use zipWithIndex from the RDD API (no equivalent in SparkSQL unfortunately) that maps each row to an index, ranging between 0 and rdd.count - 1.

So if you have a dataframe that I assumed to be sorted accordingly, you would need to go back and forth between the two APIs as follows:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// creating mock data
val df = spark.range(100).withColumn("test", 'id % 10)

// zipping the data
val partitionSize = 5 // I use 5 but you can use 100000 in your case
val zipped_rdd = df.rdd
    .zipWithIndex.map{ case (row, id) => 
        Row.fromSeq(row.toSeq :+ id / partitionSize ) 
    }

//back to df
val newField = StructField("partition", LongType, false)
val zipped_df = spark
    .createDataFrame(zipped_rdd, df.schema.add(newField))

Let's have a look at the data, we have a new column called partition and that corresponds to the way you want to split your data.

zipped_df.show(15) // 5 rows by partition
+---+----+---------+
| id|test|partition|
+---+----+---------+
|  0|   0|        0|
|  1|   1|        0|
|  2|   2|        0|
|  3|   3|        0|
|  4|   4|        0|
|  5|   5|        1|
|  6|   6|        1|
|  7|   7|        1|
|  8|   8|        1|
|  9|   9|        1|
| 10|   0|        2|
| 11|   1|        2|
| 12|   2|        2|
| 13|   3|        2|
| 14|   4|        2|
+---+----+---------+

// using partitionBy to write the data
zipped_df.write
    .partitionBy("partition")
    .csv(".../testPart.csv")
Oli
  • 9,766
  • 5
  • 25
  • 46
  • I try to test it in Databricks, but at this point which exact library do I have to import? `val zipped_df = spark .createDataFrame(rdd, df.schema.add(newField))` for the rdd? – Eve Mar 14 '19 at 14:25
  • 1
    I'm not very familiar with databricks. Is the SparkSession created for you? If it is, I think you only need to import `org.apache.spark.sql.types._` to be able to create the `StructField` and `org.apache.spark.sql.Row`. Let me update my answer :) – Oli Mar 14 '19 at 15:15
  • These imports are not resolving the issue of the createDataFrame(rdd...) part: _error: not found: value rdd_ – Eve Mar 15 '19 at 07:51
  • 1
    Ho right, it's just a problem of variable name. I changed the name of one, but not of the other. Sorry. I fix the answer right away. – Oli Mar 15 '19 at 07:58
  • And yes, the SparkSession is created! – Eve Mar 15 '19 at 08:07
  • Oh I thought it is like specifying the way of creating :D thank you! – Eve Mar 15 '19 at 08:08