0

I am writing a reducer (python3) for Hadoop streaming, it doesn't work properly, for example for the below input:

data = 'dog\t1\t1\ndog\t1\t1\ndog\t0\t1\ndog\t0\t1\ncat\t0\t1\ncat\t0\t1\ncat\t1\t1\n'

import re
import sys

# initialize trackers
current_word = None

spam_count, ham_count = 0,0

# read from standard input
# Substitute read from a file


for line in data.splitlines():
#for line in sys.stdin:
# parse input
    word, is_spam, count = line.split('\t')
    count = int(count)

    if word == current_word:

        if is_spam == '1':
            spam_count += count
        else:
            ham_count += count
    else:
        if current_word:
        # word to emit...
            if spam_count:
               print("%s\t%s\t%s" % (current_word, '1', spam_count))
            print("%s\t%s\t%s" % (current_word, '0', ham_count))

        if is_spam == '1':
            current_word, spam_count = word, count
        else:
            current_word, ham_count = word, count



if current_word == word:
    if is_spam == '1':
        print(f'{current_word}\t{is_spam}\t{spam_count}')
    else:
        print(f'{current_word}\t{is_spam}\t{spam_count}')

I got :

#dog    1   2
#dog    0   2
#cat    1   3

The 2 'spam' dogs are OK as well as two "ham"-dogs. Cats are not doing so well.It should be:

#dog    1   2
#dog    0   2
#cat    0   2
#cat    1   1
  • I cannot find a bug here*

1 Answers1

0

The reason is: you should nullify ham_count, not only update spam_count, and visa versa.

Rewrite the

if is_spam == '1':
    current_word, spam_count = word, count
else:
    current_word, ham_count = word, count

as the

if is_spam == '1':
    current_word, spam_count = word, count
    ham_count = 0
else:
    current_word, ham_count = word, count
    spam_count = 0

Nevertheless, the ouptout won't be exactly as in your output
1) because you always print spam_count first (but in the example output, "cat ham" emits earlier)
2) the output block emits only spam or only ham depending on the current state of the is_spam variable, but I guess, you're planning to emit that all, right?

The output: 
dog 1   2
dog 0   2
cat 1   1

- there's correct count of "cat spam", but no "cat ham" - and I suppose, you should at least print something like this:

rewrite this code

if current_word == word:
    if is_spam == '1':
        print(f'{current_word}\t{is_spam}\t{spam_count}')
    else:
        print(f'{current_word}\t{is_spam}\t{spam_count}')

as the

print(f'{current_word}\t{1}\t{spam_count}')
print(f'{current_word}\t{0}\t{ham_count}')

- and complete output will be the

dog 1   2
dog 0   2
cat 1   1
cat 0   2

Itertools
Also, the itertools module is great for similar tasks:

import itertools    

splitted_lines = map(lambda x: x.split('\t'), data.splitlines())
grouped = itertools.groupby(splitted_lines, lambda x: x[0])

grouped is itertools.goupby object, and it's generator - so, be careful, it's lazy and it returns values only once (so, I show output here just as example, because it consumes generator values)

[(gr_name, list(gr)) for gr_name, gr in grouped] 
Out:
[('dog',
  [['dog', '1', '1'],
   ['dog', '1', '1'],
   ['dog', '0', '1'],
   ['dog', '0', '1']]),
 ('cat', [['cat', '0', '1'], ['cat', '0', '1'], ['cat', '1', '1']])]

Ok, now each group could be grouped again by it's is_spam geature:

import itertools    

def sum_group(group):
    """
    >>> sum_group([('1', [['dog', '1', '1'], ['dog', '1', '1']]), ('0', [['dog', '0', '1'], ['dog', '0', '1']])])
    [('1', 2), ('0', 2)]
    """
    return sum([int(i[-1]) for i in group])

splitted_lines = map(lambda x: x.split('\t'), data.splitlines())
grouped = itertools.groupby(splitted_lines, lambda x: x[0])

[(name, [(tag_name, sum_group(sub_group))
         for tag_name, sub_group 
         in itertools.groupby(group, lambda x: x[1])])
 for name, group in grouped]
Out:
[('dog', [('1', 2), ('0', 2)]), ('cat', [('0', 2), ('1', 1)])]

Complete example via itertools:

import itertools 


def emit_group(name, tag_name, group):
    tag_sum = sum([int(i[-1]) for i in group])
    print(f"{name}\t{tag_name}\t{tag_sum}")  # emit here
    return (name, tag_name, tag_sum)  # return the same data


splitted_lines = map(lambda x: x.split('\t'), data.splitlines())
grouped = itertools.groupby(splitted_lines, lambda x: x[0])


emitted = [[emit_group(name, tag_name, sub_group) 
            for tag_name, sub_group 
            in itertools.groupby(group, lambda x: x[1])]
            for name, group in  grouped]
Out:
dog 1   2
dog 0   2
cat 0   2
cat 1   1

- emitted contains list of tuples with the same data. As it's lazy approach, it works perfectly with streams; here is good iterools tutorial, if you're interested.

Mikhail Stepanov
  • 3,680
  • 3
  • 23
  • 24
  • You are right. You have to set the class to "0" when switching to another word. It was not an issue when dealing with just key-value pair. Whether it is 'cat ham' or 'cat spam' count comes first, it will further aggregate to 2 partitions by class and will be just fine. Sorry, the last lines of printouts in code were a typo(wrong cut and paste at midnight I was super tired), of course, I meant to print last spam and ham counts! – user8734221 Feb 03 '19 at 21:56
  • I hope my suggestion just helps to solve the problem. Besides, I advice to use `itertools` - because it's easier to debug, there's no counters and states which can cause a problem - altough it seems to be sophisticated at first. If there are still unresolved issues, please, notice – Mikhail Stepanov Feb 03 '19 at 22:07
  • * Also, the great example of itertools, couple articles referred to it in connection with streaming jobs but did not provide good examples.Спасибо! :)* – user8734221 Feb 03 '19 at 22:09