1

I want to split an RDD into multiple RDD based on a value in a row. The values in rows are pre-known and are fixed in nature.

for e.g.

source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])

should be split into two RDDs with one containing only a and another containing only b as keys

  1. I have tried groupByKey method and able to do it successfully after doing a collect() operation on grouped RDD, which I cannot do in production due to memory constraints
a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()
  1. The current implementation is to apply multiple filter operation to get each RDD
a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')

Can this be optimized further, what will be the best way to do it in production with data which cannot fit in memory?

Usage: These RDD will be converted into different Dataframes (one for each key), each with different schema and stored in S3 as output.

Note: I would prefer pyspark implementation. I have read a lot of stack overflow answers and blogs, and could not find anyway which is working for me yet.

I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old.

Sumit
  • 2,190
  • 23
  • 31
  • @eliasah I have already seen question which is marked duplicate for, which I have already mentioned in my question. I have asked this question as the provided solution seems not the most optimised way and is 3 years old. – Sumit Nov 26 '18 at 10:09
  • Person who downvoted the question can you please comment the reason, so either I can improve or defend my question – Sumit Nov 26 '18 at 11:50
  • If the duplicate has outdated answers, don't ask a new question, instead you can place a [bounty](https://stackoverflow.com/help/bounty) on the older question. *Current answers are outdated* is a valid and preselectable [bounty reason](https://stackoverflow.blog/2011/09/23/bounty-reasons-and-post-notices/). There are also high-rep users who are willing to spend their rep on such things if you ask nicely, you just have to find them in chat. – Max Vollmer Nov 26 '18 at 22:39

1 Answers1

0

You can using toDF too. Aslo, a_rdd and b_rdd are not rdd in your code as they are collected!

df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')
OmG
  • 18,337
  • 10
  • 57
  • 90
  • This method still need to scan whole DataFrame each time a new condition is required, which I want to avoid – Sumit Nov 26 '18 at 10:14
  • @SumitKumar If you don't wanna dataframes, using a filter over the rdd directly (the answer is updated). – OmG Nov 26 '18 at 10:24
  • Now it is same as my implementation which I want to further optimise if possible. Details are present in my question body – Sumit Nov 26 '18 at 10:29
  • @SumitKumar Nope. it is not the same. You collect it! but I am not. Directly applied filter over rdd. – OmG Nov 26 '18 at 10:31
  • collect is one of the methodology, I am already using second methodology, same as yours which does not use any `collect` or `groupBy` – Sumit Nov 26 '18 at 10:54