0

Purpose is to sort (key, value) by value of the input, which is a json file. I have 4 methods, two pairs of mappers and reducers.

Input is similar to

{ 
  id: 1, 
  user: {
    friends_count: 1
  } 
}

Output of first stage of mapper and reducer is something like

A 1
B 2
C 3
D 4

What i want is

1 A
2 B
3 C
4 D

In the first stage sort by key works fine, but at the second stage where i try to make value the key, an error is thrown which says

TypeError: at 0x7fa43ea615a0> is not JSON serializable

The code which i am using is

from mrjob.job import MRJob
from mrjob.step import MRStep
import json

class MRFrnsCounter(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer),
            MRStep(mapper = self.mapper_two,
                    reducer = self.reducer_two)
        ]

def mapper(self, _, line):
    f = json.loads(line)
    (uid, frns) = f["id"],f["user"]["friends_count"]
    yield (uid), (frns)

def reducer(self, uid, frns):
    yield uid, sum(frns)

def mapper_two(self, uid, frns):
    yield (frns), (uid)

def reducer_two(self, frns, uid):
    yield (frns), uid

if __name__ == '__main__':
    MRFrnsCounter.run()

The code breaks in the second mapper when the key and value are reversed. Any opinions would be appreciated.

Binary Nerd
  • 13,872
  • 4
  • 42
  • 44
Sid
  • 71
  • 1
  • 3
  • 17

1 Answers1

0

Why not just yield sum(frns), uid in the first reducer?

However, in your second mapper you are trying to yield a generator, not an integer. Iterate through the generator to yield frns, uid. Something like this:

for num in frns:
    yield num, uid
Bill
  • 57
  • 5
  • i tired that, but i read somewhere that reversing the keys should happen in the second stage. However, i am presented with the same error. – Sid Jun 23 '17 at 05:46
  • In mapper_two, you are trying to yield the generator object, which is not json serializable. You need to iterate through the generator in order to yield frns, uid. See the above edit. – Bill Jun 23 '17 at 13:57
  • i changed the second reducer to `def reducer_two(self, frns, uid): for num in frns: yield num, uid` But now it throws the error **TypeError: 'int' object is not iterable** – Sid Jun 24 '17 at 09:27
  • However, if i make uid as key, it sorts the output but in a very strange fashion. uid, 1 uid, 123 uid, 13 uid, 235 uid, 28 and so on.. i'm confused as to what is happening under the hood. – Sid Jun 24 '17 at 09:34
  • MapReduce does not sort reducer output. Try here: https://stackoverflow.com/questions/14322381/mapreduce-job-output-sort-order – Bill Jun 26 '17 at 13:44