2

I have created an RDD from Graphx which looks like this:

val graph = GraphLoader.edgeListFile(spark.sparkContext, fileName)
var s: VertexRDD[VertexId] = graph.connectedComponents().vertices

val nodeGraph: RDD[(String, Iterable[VertexId])] = s.groupBy(_._2) map { case (x, y) =>
  val rand = randomUUID().toString
  val clusterList: Iterable[VertexId] = y.map(_._1)
  (rand, clusterList)
}

nodeGraph is of type RDD[(String, Iterable[VertexId])], and the data inside will be of the form:

(abc-def11, Iterable(1,2,3,4)), 
(def-aaa, Iterable(10,11)), 
...

What I want to do now is to create a dataframe out of it, that should look like this:

col1        col2
abc-def11   1
abc-def11   2
abc-def11   3
abc-def11   4
def-aaa     10
def-aaa     11

How to do this in Spark?

Shaido
  • 27,497
  • 23
  • 70
  • 73
Aamir
  • 2,380
  • 3
  • 24
  • 54

1 Answers1

3

First, convert the RDD to a dataframe using toDF(), with your wanted column names. This is easiest done by changing Iterable[VertexId] to Seq[Long] first.

import spark.implicits._
val df = nodeGraph.map(x => (x._1, x._2.map(_.toLong).toSeq)).toDF("col1", "col2")

Note that this could be done while creating nodeGraph to save a step. Next, use the explode function to flatten the dataframe,

val df2 = df.withColumn("col2", explode($"col2"))

which will give you the desired output.

Shaido
  • 27,497
  • 23
  • 70
  • 73
  • I have one question, is `.toSeq` going to be a problem, what if Iterable has billion records, is it going to explode or fail if Iterable has billion records while calling ``toSeq` on it – Aamir Feb 08 '19 at 09:55
  • @Aamir: Seq is an extension to Iterable but I don't think it would have any performance impact. The reason behind the conversion is that Spark do not supply any in-built encoder for conversion from Iterable. It's possible to add with kyro but not as straight-forward (see: https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset). – Shaido Feb 08 '19 at 10:07