0

I have the input data of the format

RDD[
(Map1, RecordA),
(Map2, RecordX),
(Map1, RecordB),
(Map2, RecordY),
(Map1, RecordC),
(Map2, RecordZ)
]

Expected out format is (List of RDDs) :

List[
RDD[RecordA, RecordB, RecordC],
RDD[RecordX, RecordY, RecordZ]
]

I want the inner RDDs to be grouped by the key that is Map1, Map2 and I want to create a outter List as a collection of inner RDDs.

I tried using reduceByKey API and aggregateByKey API and have not been successful so far!

Real World example :

RDD[
(Map("a"->"xyz", "b"->"per"), CustomRecord("test1", 1, "abc")),
(Map("a"->"xyz", "b"->"per"), CustomRecord("test2", 1, "xyz")),
(Map("a"->"xyz", "b"->"lmm"), CustomRecord("test3", 1, "blah")),
(Map("a"->"xyz", "b"->"lmm"), CustomRecord("test4", 1, "blah"))
]

final case class CustomRecord(
string1: String,
int1: Int,
string2: String)

Appreciate your help.

abc123
  • 527
  • 5
  • 16
  • 1
    You can't create RDD of RDD; Maybe RDD of Lists or Iterables? – Psidom Dec 03 '18 at 19:15
  • List[ RDD[RecordA, RecordB, RecordC], RDD[RecordX, RecordY, RecordZ] ] is also acceptable. – abc123 Dec 03 '18 at 19:17
  • I updated the question to reflect that. – abc123 Dec 03 '18 at 19:18
  • Can you give a real example instead of a schema? – Psidom Dec 03 '18 at 19:18
  • refer to https://stackoverflow.com/a/37580350/2204206 – Lior Chaga Dec 03 '18 at 19:32
  • @Psidom Done. I updated the question with my data. key is a Map[String, String] and value is a CustomRecord. – abc123 Dec 03 '18 at 19:32
  • @LiorChaga I don't want a List[List[], List[]] I was able to achieve this with aggregateByKey. I need List[RDD[], RDD[]] . – abc123 Dec 03 '18 at 19:36
  • you can't do that directly in spark. You can take the List>, iterate over the outer list and use sc.parallelize to convert the inner list to RDD, then collect back to List to get what you want. Or - you can take the original RDD, collect the keys, and then iterate over keys. For each key, map the original rdd to get an rdd of values for that key, and just add those rdds one by one to a list... – Lior Chaga Dec 03 '18 at 19:51
  • I tried doing that I am running into the org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext. So I was wondering if I could directly convert it to RDD – abc123 Dec 03 '18 at 19:59
  • spark functions should be serializable, as well as any constructor args you are passing them. You can post code snippet and I guess someone would spot the problem right away (I'm less of a scala person myself) – Lior Chaga Dec 04 '18 at 05:51

0 Answers0