-1

I'm writing a Spark application using Scala. I have the following two RDDs:

(a, 1, some_values1)
(b, 1, some_values2)
(c, 1, some_values3)

and

(a, 2, some_values1)
(b, 2, some_values2)
(a, 3, some_values1)
(b, 3, some_values2)

I'm trying to get this output:

(a, 1, 2, computed_values1)
(b, 1, 2, computed_values2)
(c, 1, 2, None)
(a, 1, 3, computed_values1)
(b, 1, 3, computed_values2)
(c, 1, 3, None)

So, the letters here are used to match each record from the first RDD with the second one. I tried using the join method but didn't work for record c. How can I achieve this?

UPDATE

Another example:

(a, 1, some_values1)
(b, 1, some_values2)
(c, 1, some_values3)

and

(a, 2, some_values1)
(b, 2, some_values2)
(a, 3, some_values1)
(b, 3, some_values2)
(c, 3, some_values2)

I'm trying to get this output:

(a, 1, 2, computed_values1)
(b, 1, 2, computed_values2)
(c, 1, 2, None)
(a, 1, 3, computed_values1)
(b, 1, 3, computed_values2)
(c, 1, 3, computed_values3)
boring91
  • 1,051
  • 12
  • 28
  • Join "outer" can be used for preserve row from both dataframes, some info here: https://stackoverflow.com/questions/45990633/what-are-the-various-join-types-in-spark – pasha701 Sep 11 '18 at 13:00
  • @pasha701 Yeah I know, actually I tried using the outer join to achieve that but didn't work. – boring91 Sep 11 '18 at 13:15

2 Answers2

1

If I understand your requirement correctly, here's an approach:

  1. Create a RDD, say rdd2c2, of distinct values from the 2nd column of rdd2
  2. Perform a cartesian join on rdd1 and rdd2c2 and transform the result to a RDD[K,V] to make the 1st column and the rdd2c2 column as its key
  3. Transfrom rdd2 to a RDD[K,V] to make its 1st and 2nd columns as its key
  4. Perform a leftOuterJoin on the two RDD[K,V]s and transform elements to desired structure

Sample code:

val rdd1 = sc.parallelize(Seq(
  ("a", 1, "some_values1"),
  ("b", 1, "some_values2"),
  ("c", 1, "some_values3")
))

val rdd2 = sc.parallelize(Seq(
  ("a", 2, "some_values1"),
  ("b", 2, "some_values2"),
  ("a", 3, "some_values1"),
  ("b", 3, "some_values2"),
  ("c", 3, "some_values2")
))

val rdd2c2 = rdd2.map(_._2).distinct
// rdd2c2.collect: Array[Int] = Array(2, 3)

val rddKV1 = rdd1.cartesian(rdd2c2).
  map{ case (a, b) => ((a._1, b), (a._2, a._3))}
// rddKV1.collect: Array[((String, Int), (Int, String))] = Array(
//   ((a,2),(1,some_values1)),
//   ((a,3),(1,some_values1)),
//   ((b,2),(1,some_values2)),
//   ((b,3),(1,some_values2)),
//   ((c,2),(1,some_values3)),
//   ((c,3),(1,some_values3)))

val rddKV2 = rdd2.map(r => ((r._1, r._2), r._3))
// rddKV2.collect: Array[((String, Int), String)] = Array(
//   ((a,2),some_values1),
//   ((b,2),some_values2),
//   ((a,3),some_values1),
//   ((b,3),some_values2),
//   ((c,3),some_values2))

val rddJoined = rddKV1.leftOuterJoin(rddKV2).
  map{ case (k, v) => (k._1, v._1._1, k._2, v._2) }
// rddJoined.collect: Array[(String, Int, Int, Option[String])] = Array(
//   (a,1,3,Some(some_values1)),
//   (a,1,2,Some(some_values1)),
//   (c,1,2,None),
//   (b,1,2,Some(some_values2)),
//   (b,1,3,Some(some_values2)),
//   (c,1,3,Some(some_values2)))
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • Thanks for the answer. This will actually produce the expected result for the given example. But the issue here is that if rdd2 has let's say "c" with "3", the resulting rdd will not include this: (c, 1, 2, None). I'll update my question to add this example as well. – boring91 Sep 11 '18 at 23:03
  • @m2008m1033m, please see revised solution based on my understanding of your clarified requirement. – Leo C Sep 12 '18 at 00:19
  • It worked, thanks! Regarding the note at the end, if the distinct list is very large (because it actually is), can we use the sc.broadcast with it? – boring91 Sep 12 '18 at 01:07
  • In fact, using a suitably crafted `cartesian join` would remove the need of `collect`. Solution further revised. – Leo C Sep 12 '18 at 01:40
0

If "c" have to be in result only once (guess, misprint in desired output), can be achived with such code:

val data1 = List(
  ("a", 1, "some_values1"),
  ("b", 1, "some_values2"),
  ("c", 1, "some_values3")
)

val data2 = List(
  ("a", 2, "some_values1"),
  ("b", 2, "some_values2"),
  ("a", 3, "some_values1"),
  ("b", 3, "some_values2")
)

val rdd1 = sparkContext.parallelize(data1)
val rdd2 = sparkContext.parallelize(data2)

val rdd1WithKey = rdd1.map(v => (v._1, (v._2, v._3)))
val rdd2WithKey = rdd2.map(v => (v._1, (v._2, v._3)))

val joined = rdd1WithKey.fullOuterJoin(rdd2WithKey)
joined.foreach(println)

Output:

(b,(Some((1,some_values2)),Some((2,some_values2))))
(a,(Some((1,some_values1)),Some((2,some_values1))))
(b,(Some((1,some_values2)),Some((3,some_values2))))
(a,(Some((1,some_values1)),Some((3,some_values1))))
(c,(Some((1,some_values3)),None))
pasha701
  • 6,831
  • 1
  • 15
  • 22