2

As the title describes, say I have two RDDs

rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,0,0])

or

rdd3 = sc.parallelize([("Id", 1),("Id", 2),("Id",3)])
rdd4 = sc.parallelize([("Result", 1),("Result", 0),("Result", 0)])

How can I create the following DataFrame?

Id    Result
1     1
2     0
3     0

If I could create the paired RDD [(1,1),(2,0),(3,0)] then sqlCtx.createDataFrame would give me what I want, but I don't know how?

I'd appreciate any comment or help!

Ehsan M. Kermani
  • 912
  • 2
  • 12
  • 26

2 Answers2

2

So first off, there is an RDD operation called RDD.zipWithIndex. If you called rdd2.zipWithIndex you would get:

scala> rdd2.zipWithIndex collect() foreach println
(1,0)
(0,1)
(0,2)

If you wanted to make it look like yours, just do this:

scala> rdd2.zipWithIndex map(t => (t._2 + 1,t._1)) collect() foreach println
(1,1)
(2,0)
(3,0)

If you really need to zip the two RDDs, then just use RDD.zip

scala> rdd1.zip(rdd2) collect() foreach println
(1,1)
(2,0)
(3,0)
David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • Nice! very useful transformations! I guess, I should accept Holden's answer since she answered first. Cheers! – Ehsan M. Kermani May 26 '15 at 18:39
  • 1
    I posted already this solution for DataFrames, maybe up vote this one too! http://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex – David Griffin May 26 '15 at 18:40
1

Provided that they have the same partitioner and the same number of elements per partition, you can use the zip function, e.g.

case class Elem(id: Int, result: Int)
val df = sqlCtx.createDataFrame(rdd1.zip(rdd2).map(x => Elem(x._1, x._2)))
Holden
  • 7,392
  • 1
  • 27
  • 33