You may consider using the RDD zip
method along with a transformation via map
:
val rddA = sc.parallelize(Seq(
("ICCH 1", 10.0), ("ICCH 2", 10.0), ("ICCH 4", 100.0), ("ICCH 5", 100.0), ("ICCH 2", 13.0)
))
val rddB = sc.parallelize(Seq(
(10.0, "A"), (64.0, "B"), (39.0, "A"), (9.0, "C"), (80.0, "D")
))
val zippedRDD = (rddA zip rddB).map{ case ((id, x), (y, c)) => (id, x, y, c) }
// zippedRDD: org.apache.spark.rdd.RDD[(String, Double, Double, String)] = ...
zippedRDD.collect
// Array[(String, Double, Double, String)] = Array(
// (ICCH 1,10.0,10.0,A), (ICCH 2,10.0,64.0,B), (ICCH 4,100.0,39.0,A), (ICCH 5,100.0,9.0,C), (ICCH 2,13.0,80.0,D)
// )
Be cautioned though maintaining the ordering between two RDDs is a tricky matter. Here's a relevant SO link
about the ordering issue.
And below is what the Spark API doc re: RDD zip
method says :
def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
Zips this RDD with another one, returning key-value pairs with the
first element in each RDD, second element in each RDD, etc. Assumes
that the two RDDs have the same number of partitions and the same
number of elements in each partition (e.g. one was made through a map
on the other).