0

I'm using jupyter on Ubuntu.

So i'm having the next problem, this is my code:

from pyspark import SparkContext
 sc = SparkContext.getOrCreate()
 ut = sc.textFile("hdfs://localhost:54310/hduser/firstnames")
 rows= ut.map(lambda line: line.split(";"))
 res = rows.filter(lamda row: row[2] >= "2000" and row[2] <= "2004")
 res = res.map(lambda row: ({row[1],row[2]},int(row[3])))

output:

[({'2001', 'Brussel'}, 9),
 ({'2001', 'Brussel'}, 104),
 ({'2001', 'Vlaanderen'}, 16),
 ({'2002', 'Brussel'}, 12), ...]

I need my output to be like:

[({'2001', 'Brussel'}, 113),
 ({'2001', 'Vlaanderen'}, 16),
 ({'2002', 'Brussel'}, 12)]

I've tried a couple of things with reduceByKey before and have seen a lot of questions about reduceByKey, but couldn't figure it out. Thanks in advance.

Matt
  • 27,170
  • 6
  • 80
  • 74

1 Answers1

0

As explained in A list as a key for PySpark's reduceByKey by zero323, keys have to implement hash method. You can use tuples:

>>> from operator import add
... 
... sc.parallelize([
...     (('2001', 'Brussel'), 9), (('2001', 'Brussel'), 104),
...     (('2001', 'Vlaanderen'), 16), (('2002', 'Brussel'), 12)
... ]).reduceByKey(add).take(2)
... 
[(('2002', 'Brussel'), 12), (('2001', 'Brussel'), 113)]

Replace:

res.map(lambda row: ({row[1],row[2]},int(row[3])))

with

res.map(lambda row: ((row[1], row[2]), int(row[3])))

or replace set with frozenset:

>>> sc.parallelize([
...     (frozenset(['2001', 'Brussel']), 9), (frozenset(['2001', 'Brussel']), 104),
...     (frozenset(['2001', 'Vlaanderen']), 16), (frozenset(['2002', 'Brussel']), 12)
... ]).reduceByKey(add).take(2)

[(frozenset({'2002', 'Brussel'}), 12), (frozenset({'2001', 'Brussel'}), 113)]