I am working on a MapReduce problem where I want to filter every Map partition output.I want to filter only those keys which occur more than a threshold value in their map-partition.
So I have a RDD like (key, value<tuple>)
For every value of the tuple I want to get the count of its occurrence throughout the RDD, divided by map-partitions. Then I'll filter this count.
Eg: RDD: {(key1, ("a","b","c")),
(key2, ("a","d"),
(key3, ("b","c")}
Using flatMapValues I have been able to reduce this as
{(key1, a), (key1, b), (key1, c), (key2, a), (key2, d), (key3, b), (key3, c)}
Now using combineByKey
step I have been able to get the count of each value in the respective partition.
suppose there were two partitions then it will return like
("a", [1, 1]), ("b", [1,1]), ("c", [1,1]), ("d", 1)
Now I want to filter this (Key, Value) such that the tuple of values makes individual Key-Value pair, i.e. what flatMapValues had done for me before but I am unable to use flatMapValues
python
from itertools import count
import pyspark
import sys
import json
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.rdd import RDD
sc = SparkContext('local[*]', 'assignment2_task1')
RDD = sc.textFile(sys.argv[1])
rdd1_original = RDD.map(lambda x: x.split(",")).map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x, y: x + y)
rdd3_candidate = rdd1_original.flatMapValues(lambda a: a).map(lambda x: (x[1], 1)).combineByKey(lambda value: (value),lambda x, y: (x + y), lambda x, y: (x,y))
new_rdd = rdd3_candidate.flatMapValues(lambda a:a)
print(new_rdd.collect())
Expected answer:
[("a",1),("a", 1), ("b", 1), ("b", 1), ("c", 1), ("c", 1), ("d", 1)
Current error:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
process()
File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
return f(*args, **kwargs)
File "/Users/yashphogat/Python_Programs/lib/python3.6/site-packages/pyspark/rdd.py", line 1967, in <lambda>
<b>flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
TypeError: 'int' object is not iterable