1

I have two RDDs which have this form:

RDD A: (Columns: icchID, X_Coord)

[ICCH 1,10.0]
[ICCH 2,10.0]
[ICCH 4,100.0]
[ICCH 4,100.0]
[ICCH 2,13.0]

RDD B: (Columns: Y_Coord, Class)

[10.0,A]
[64.0,B]
[39.0,A]
[9.0,C]
[80.0,D]

I would like to combine those two RDDs so that I have an RDD which has this form:

[ICCH 1,10.0,10.0,A]
[ICCH 2,10.0,64.0,B]
[ICCH 4,100.0,39.0,A]
[ICCH 4,100.0,9.0,C]
[ICCH 2,13.0,80.0,D]

Note that both RDDs have the same Rows and Columns. Is it possible to do this using Scala?

P.S. I am a noob at Scala. I am using Databricks.

Aris Kantas
  • 375
  • 1
  • 5
  • 15

1 Answers1

2

You may consider using the RDD zip method along with a transformation via map:

val rddA = sc.parallelize(Seq(
  ("ICCH 1", 10.0), ("ICCH 2", 10.0), ("ICCH 4", 100.0), ("ICCH 5", 100.0), ("ICCH 2", 13.0)
))

val rddB = sc.parallelize(Seq(
  (10.0, "A"), (64.0, "B"), (39.0, "A"), (9.0, "C"), (80.0, "D")
))

val zippedRDD = (rddA zip rddB).map{ case ((id, x), (y, c)) => (id, x, y, c) }
// zippedRDD: org.apache.spark.rdd.RDD[(String, Double, Double, String)] = ...

zippedRDD.collect
// Array[(String, Double, Double, String)] = Array(
//   (ICCH 1,10.0,10.0,A), (ICCH 2,10.0,64.0,B), (ICCH 4,100.0,39.0,A), (ICCH 5,100.0,9.0,C), (ICCH 2,13.0,80.0,D)
// )

Be cautioned though maintaining the ordering between two RDDs is a tricky matter. Here's a relevant SO link about the ordering issue.

And below is what the Spark API doc re: RDD zip method says :

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

Leo C
  • 22,006
  • 3
  • 26
  • 39
  • Hello and sorry for the late answer. I tested your recomendation and I get this error: `error: constructor cannot be instantiated to expected type/found : (T1, T2)/ required: org.apache.spark.sql.Row`. Do you know what could cause it? – Aris Kantas Jul 16 '19 at 21:10
  • 1
    @Aris Kantas, maybe your rddA and rddB are `RDD[Row]` (which is essentially a DataFrame) rather than the general `RDD[T]`. If so, the `zip/map` code line can be changed to `(rddA zip rddB).map{ case (Row(id: String, x: Double), Row(y: Double, c: String)) => (id, x, y, c) }`. – Leo C Jul 16 '19 at 22:55
  • This is it! Thank you so much for your help! Hope you have a nice day! – Aris Kantas Jul 16 '19 at 23:00