1

I have the following rdd data.

[(13, 'Munich@en'), (13, 'Munchen@de'), (14, 'Vienna@en'), (14, 'Wien@de'),(15, 'Paris@en')]

I want to combine the above rdd , using reduceByKey method, that would result the following output, i.e to join the entries into a dictionary based on entry's language.

[
    (13, {'en':'Munich','de':'Munchen'}), 
    (14, {'en':'Vienna', 'de': 'Wien'}), 
    (15, {'en':'Paris', 'de':''})
]

The examples for reduceByKey were all numerical operations such as addition, so I am not very sure how to go about updating a dictionary in each reduce step.

This is my code:

rd0 = sc.parallelize(
    [(13, 'munich@en'),(13, 'munchen@de'), (14, 'Vienna@en'),(14,'Wien@de'),(15,'Paris@en')]
)

def updateDict(x,xDict):
    xDict[x[:-3]]=x[-2:]

rd0.map(lambda x: (x[0],(x[1],{'en':'','de':''}))).reduceByKey(updateDict).collect()

I am getting the following error message but not sure what I am doing wrong.

    return f(*args, **kwargs)
  File "<ipython-input-209-16cfa907be76>", line 2, in ff
TypeError: 'tuple' object does not support item assignment

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
pault
  • 41,343
  • 15
  • 107
  • 149
user1848018
  • 1,086
  • 1
  • 14
  • 33

1 Answers1

2

There are some problems with your code - for instance, your updateDict does not return a value. Here is a different approach:

First, map the values into dictionaries. One way is to split on "@", reverse, and pass the result into the dict constructor.

rd1 = rd0.mapValues(lambda x: dict([reversed(x.split("@"))]))
print(rd1.collect())
#[(13, {'en': 'munich'}),
# (13, {'de': 'munchen'}),
# (14, {'en': 'Vienna'}),
# (14, {'de': 'Wien'}),
# (15, {'en': 'Paris'})]

Now you can call reduceByKey and merge the two dictionaries. Finally add in the missing keys with a dictionary comprehension over the required keys, defaulting to empty string if the key is missing.

def merge_two_dicts(x, y):
    # from https://stackoverflow.com/a/26853961/5858851
    # works for python 2 and 3
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z

rd2 = rd1.reduceByKey(merge_two_dicts)\
    .mapValues(lambda x: {k: x.get(k, '') for k in ['en', 'de']})
print(rd2.collect())
#[(14, {'de': 'Wien', 'en': 'Vienna'}),
# (13, {'de': 'munchen', 'en': 'munich'}),
# (15, {'de': '', 'en': 'Paris'})]
pault
  • 41,343
  • 15
  • 107
  • 149
  • Is updating dictionaries, an expensive operation for a large scale data? – user1848018 Sep 17 '19 at 22:09
  • @user1848018 the performance varies, depending on how you do it. Take a look at the answer on the post I linked - there's a lot of discussion about performance. If you're using python 3.5+, you can use `.reduceByKey(lambda a, b: {**a, **b})`. Without knowing anything else about your problem, I would say that this step is unlikely to be your bottle neck unless your dictionaries are HUGE (tens to hundreds of thousands of keys). – pault Sep 17 '19 at 22:13