-1

I have a list of lists in RDD and a list to intersect. B needs to interact with every list in A.

A = [[a,b,c,d],[e,f,g,h]....]
B = [a,b,c,d,e,f,g,h]

I need to intersect these two to get the common letters. I used the following but got error due to typeError

pwords = A.intersection(B)

I then tried to use parallelize based on few suggestions on stackoverflow but got an error.

text_words = sc.parallelize(A)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark/python/pyspark/context.py", line 501, in 
parallelize
c = list(c)    # Make it a list so we can compute its length
TypeError: 'PipelinedRDD' object is not iterable

When I tried to convert into list as shown in the error message. I again got an error.

TypeError: 'PipelinedRDD' object is not iterable 

I tried to follow Find intersection of two nested lists? and got this error:

TypeError: 'PipelinedRDD' object is not iterable
Saurabh
  • 9
  • 1

1 Answers1

0

I am not 100% sure about your question, but I guess you have the nested list as a RDD and want to intersect it with a static list B. Then each item in the nested lists should be checked for existence in B and if it exists it should remain.

If the order of the elements does not matter you can use the following code:

A = [["a","b","c","d"],["e","f","g","h"],["i","j","k","l"],["a","b","x","f","y"]]
B = ["a","b","c","d","e","f","g","h"]

text_words = sc.parallelize(A)
text_words.map(lambda x: list(set(x) & set(B))).collect()

Output:

[['a', 'c', 'b', 'd'], ['h', 'e', 'g', 'f'], [], ['a', 'b', 'f']]
gaw
  • 1,960
  • 2
  • 14
  • 18
  • Both nested list and list are RDDs. When I use sc.parallelize as you have mentioned. I am getting an error as mentioned in the description. – Saurabh Oct 22 '18 at 16:43
  • For it looks like your nested list A is already a pipelined rdd, which is why you get the error and cant parallelize it. As you can see in my code, if you just create the nested list it works fine – gaw Oct 23 '18 at 06:02
  • Yes, they both are RDDs but when I am trying to use map function as you suggested, I get this error: ------- pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. – Saurabh Oct 23 '18 at 14:59
  • This sounds like `A` is already a RDD and you dont need to do `parallelize` on it. You could just work with `A` directly. – gaw Oct 24 '18 at 07:24
  • Yes, I have NOT used parallelize. Even if I use the map function ONLY, I am getting the error mentioned in above comment. – Saurabh Oct 25 '18 at 16:06