1

I am having an issue running a simple python code inside hadoop streaming. I have tried all the suggestions in the previous posts with a similar error and still having the issue.

  1. added the usr/bin/env python
  2. chmod a+x the mapper and reducer python code
  3. put "" for the -mapper "python mapper.py -n 1 -r 0.4"

I have run the code outside and it worked well.

UPDATE: I run the code outside of hadoop streaming using the following code:

cat file |python mapper.py -n 5 -r 0.4 |sort|python reducer.py -f 3618 

This works fine .. But now I need to run it to HADOOP STREAMING

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.reduces=5  \
-files lr \
-mapper "python lr/mapper.py -n 5 -r 0.4"  \
-reducer "python lr/reducer.py -f 3618"  \
-input training \
-output models 

The hadoop streaming is the one failed. I looked at the logs and I did not see anything on it that told me why it is happening?

I have the following mapper.py:

#!/usr/bin/env python

import sys
import random

from optparse import OptionParser

parser = OptionParser()
parser.add_option("-n", "--model-num", action="store", dest="n_model",
                  help="number of models to train", type="int")
parser.add_option("-r", "--sample-ratio", action="store", dest="ratio",
                  help="ratio to sample for each ensemble", type="float")

options, args = parser.parse_args(sys.argv)

random.seed(8803)
r = options.ratio
for line in sys.stdin:
    # TODO
    # Note: The following lines are only there to help 
    #       you get started (and to have a 'runnable' program). 
    #       You may need to change some or all of the lines below.
    #       Follow the pseudocode given in the PDF.
    key = random.randint(0, options.n_model-1)
    value = line.strip()
    for i in range(1, options.n_model+1):
        m = random.random()
        if m < r:
            print "%d\t%s" % (i, value)

and my reducer.py:

#!/usr/bin/env python
import sys
import pickle
from optparse import OptionParser
from lrsgd import LogisticRegressionSGD
from utils import parse_svm_light_line

parser = OptionParser()
parser.add_option("-e", "--eta", action="store", dest="eta",
                  default=0.01, help="step size", type="float")
parser.add_option("-c", "--Regularization-Constant", action="store", dest="C",
                  default=0.0, help="regularization strength", type="float")
parser.add_option("-f", "--feature-num", action="store", dest="n_feature",
                  help="number of features", type="int")
options, args = parser.parse_args(sys.argv)

classifier = LogisticRegressionSGD(options.eta, options.C, options.n_feature)

for line in sys.stdin:
    key, value = line.split("\t", 1)
    value = value.strip()
    X, y = parse_svm_light_line(value)
    classifier.fit(X, y)

pickle.dump(classifier, sys.stdout)

When I run it outside the code, it runs OK, but when I run it inside hadoop-streaming it gives me the error:

17/02/07 07:44:34 INFO mapreduce.Job: Task Id : attempt_1486438814591_0038_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
frb
  • 3,738
  • 2
  • 21
  • 51
E B
  • 1,073
  • 3
  • 23
  • 36
  • Is this a Pseudo distributed setup? If not, do you have the module `lrsgd` installed on all nodes? Also post the command you are using to submit the job. – franklinsijo Feb 07 '17 at 08:26
  • @fanklinsijo i update the post to show you how i submit this.. – E B Feb 10 '17 at 00:52
  • @franklinsijo, where do i check the number of datanodes of this test cluster that i am running – E B Feb 10 '17 at 05:01
  • `hdfs dfsadmin -report` should give you the details of live Datanodes. – franklinsijo Feb 10 '17 at 05:02
  • @franlinsijo ... i only have 1 datanode and it is alive.. on the report .. so i am so confuse where python issue is running.. to add i have the lrsgd.py and mapper.py and reducer.py all with (775) already too – E B Feb 11 '17 at 20:10

3 Answers3

3

Use the answer by Harishanker in the post - How to resolve java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2?

Make sure that the both the mapper and the reducer files are executable using chmod. (Eg: 'chmod 744 mapper.py')

Then make the streaming command as such:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.reduces=5  \
-files lr \
-mapper lr/mapper.py -n 5 -r 0.4  \
-reducer lr/reducer.py -f 3618  \
-input training \
-output models 

Now it should work. Please comment if it doesn't.

Beck
  • 31
  • 2
0
hadoop jar /home/maria_dev/hadoop-streaming-2.7.3.jar \
-file ./mapper.py -mapper 'python mapper.py' \
-file ./reducer.py -reducer 'python reducer.py' \
-input /user/maria_dev/wordcount/worddata.txt \
-output /user/maria_dev/output 

this works for me. make sure each of file path is correct. originally, i forgot to specify -file for both python codes. and it does not work.

Jianjun Hu
  • 21
  • 1
  • 2
0

For complete noobs (like me!), make sure you have this in the first line of your .py files:

#!/usr/bin/env python

It's not just a comment, so don't accidentally delete it!