-2

I have a data set like the following below:

Input Dataset

Id, Parent_id, Data
-----------------------
1, NULL, favorite: 3
2, NULL, favorite: 4
Output Dataset

Id, Parent_Id, Data
------------------------
1, NULL, favorite: 3
1_t1, 1, favorite: 3
1_t2, 1, favorite: 3
1_t3, 1, favorite: 3
2, NULL, favorite: 4
2_t1, 2, favorite: 4
2_t2, 2, favorite: 4
2_t3, 2, favorite: 4
2_t4, 2, favorite: 4

As you can see above that I am trying to explode the data column favorite counts property into their own individual rows and using the parent_id column to represent its root record.

So far I v tried using a Spark SQL Explode function to try to do this but however I wasn't able to get it working.

1 Answers1

2

If I understand your question correctly, you are trying to generate/create new rows from the existing row by using the number from the data column and want to generate that many number of new rows with new id and parent_id pointing to the origin record

if that's the case then you can do this using map and flatMap operations as below:

import org.apache.spark.sql.Row

import scala.collection.mutable.ArrayBuffer

import sparkSession.sqlContext.implicits._

val input = Seq(("1", "NULL", "favorite:3"), ("2", "NULL", "favorite:4")).toDF("id", "parent_id", "data")

input.printSchema()
input.show(false)

val resultRDD = input.rdd.map(row => {
  val list = new ArrayBuffer[Row]()
  list += row

  val pointer = row.getAs[String]("data").split(":")(1).toInt

  for (index <- 1 to pointer) {
    val newId = s"${row.getAs[String]("id")}_t$index"
    list += Row.fromSeq(Seq(newId, row.getAs[String]("id"), row.getAs[String]("data")))
  }

  list
}).flatMap(_.toIterator)


val resultDF = sparkSession.createDataFrame(resultRDD, input.schema)
resultDF.show(false)

and the result will be:

root
 |-- id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- data: string (nullable = true)

+---+---------+----------+
|id |parent_id|data      |
+---+---------+----------+
|1  |NULL     |favorite:3|
|2  |NULL     |favorite:4|
+---+---------+----------+

+----+---------+----------+
|id  |parent_id|data      |
+----+---------+----------+
|1   |NULL     |favorite:3|
|1_t1|1        |favorite:3|
|1_t2|1        |favorite:3|
|1_t3|1        |favorite:3|
|2   |NULL     |favorite:4|
|2_t1|2        |favorite:4|
|2_t2|2        |favorite:4|
|2_t3|2        |favorite:4|
|2_t4|2        |favorite:4|
+----+---------+----------+
Prasad Khode
  • 6,602
  • 11
  • 44
  • 59
  • Would RDD work if i have a table that has 1 billion records – Shivakanth Komatreddy Jun 17 '19 at 02:59
  • The problem I am having is converting it back to a dataframe. It seems like spark is complaining about encoding from a dataset to rdd to dataframe – Shivakanth Komatreddy Jun 18 '19 at 20:28
  • wats the error you are getting while converting `RDD` to `DataFrame`?? Just check if you are giving the proper schema to convert the RDD into the dataframe? – Prasad Khode Jun 19 '19 at 06:09
  • this is the exact error I get ---> Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. – Shivakanth Komatreddy Jun 19 '19 at 19:21
  • I think the problem is that in your solution we are returning a dataset of type Row and my version of spark is expecting a proper type because going from dataset to rdd it needs to have way to conver and i think data set api requires a encoder configuration. – Shivakanth Komatreddy Jun 19 '19 at 20:54
  • can you just check how can you convert `RDD` to a `DataFrame` [here](https://stackoverflow.com/a/37011786/1025328) its been nicely explained – Prasad Khode Jun 20 '19 at 06:44
  • I was able to convert it using a case class and providing that to the encoder, which worked but now my spark job fails when writing to files. – Shivakanth Komatreddy Jun 21 '19 at 18:16
  • you can create a new question specifying the problem that you are facing, I hope the first problem that you have mentioned is resolved by this answer if it is, then you can accept and upvote the answer. happy to help :-) – Prasad Khode Jun 22 '19 at 03:40