0

I am working on a MapReduce problem where I want to filter every Map partition output.I want to filter only those keys which occur more than a threshold value in their map-partition.

So I have a RDD like (key, value<tuple>)

For every value of the tuple I want to get the count of its occurrence throughout the RDD, divided by map-partitions. Then I'll filter this count.

Eg: RDD: {(key1, ("a","b","c")), 
          (key2, ("a","d"), 
          (key3, ("b","c")}

Using flatMapValues I have been able to reduce this as

{(key1, a), (key1, b), (key1, c), (key2, a), (key2, d), (key3, b), (key3, c)}

Now using combineByKey step I have been able to get the count of each value in the respective partition.

suppose there were two partitions then it will return like

("a", [1, 1]), ("b", [1,1]), ("c", [1,1]), ("d", 1)

Now I want to filter this (Key, Value) such that the tuple of values makes individual Key-Value pair, i.e. what flatMapValues had done for me before but I am unable to use flatMapValues

python

from itertools import count

import pyspark
import sys
import json
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.rdd import RDD

sc = SparkContext('local[*]', 'assignment2_task1')

RDD = sc.textFile(sys.argv[1])

rdd1_original = RDD.map(lambda x: x.split(",")).map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x, y: x + y)


rdd3_candidate = rdd1_original.flatMapValues(lambda a: a).map(lambda x: (x[1], 1)).combineByKey(lambda value: (value),lambda x, y: (x + y),                                                                                              lambda x, y: (x,y))

new_rdd = rdd3_candidate.flatMapValues(lambda a:a)
print(new_rdd.collect())

Expected answer:

[("a",1),("a", 1), ("b", 1), ("b", 1), ("c", 1), ("c", 1), ("d", 1)

Current error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/Users/yashphogat/Python_Programs/lib/python3.6/site-packages/pyspark/rdd.py", line 1967, in <lambda>
    <b>flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
TypeError: 'int' object is not iterable
Prasad Khode
  • 6,602
  • 11
  • 44
  • 59
Yash
  • 17
  • 2
  • Without having run your code, my guess is that problem is the `("d", 1)`. You need special treatment for that. – pault Jun 05 '19 at 13:56
  • @pault: Do you know how can i convert this integer to list. I have been trying to typecast but, its not working – Yash Jun 06 '19 at 04:05
  • something like `lambda x: [x] if not isinstance(x, list) else x` – pault Jun 06 '19 at 14:21
  • If you simplified your question into a small [mcve], it would be easier to give specific advice. At the moment, it's not clear what you're asking. Start with the inputs and the desired output. Show the output at each step. Explain in detail how you want to get there, with examples. [How to create good reproducible spark examples](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples). – pault Jun 06 '19 at 14:28
  • Does `RDD.flatMap(lambda row: [(x, 1) for x in row[1]]).collect()` give you the output you're looking for? – pault Jun 06 '19 at 14:31

0 Answers0