1

In my MapReduce jobs, I pass a product name to the Mapper as a string argument. The Mapper.py script imports a secondary script called Process.py that does something with the product name and returns some emit strings to the Mapper. The mapper then emits those strings to the Hadoop framework so they can be picked up by the Reducer. Everything works fine except for the following:

The Process.py script contains a dictionary of lookup values that I want to move from inside the script to an xml file for easier updating. I have tested this locally and it works fine if I include the Windows path to the xml file in the Process.py script. However, testing this in the Hadoop MapReduce environment doesn't work for some reason.

I have tried specifying the the HDFS path to the xml document inside the Process.py script, and I have tried adding the name of the xml document as a -file argument in the MapReduce job command, but neither have worked.

For example, inside the Process.py, I have tried:
xml_file = r'appers@hdfs.network.com:/nfs_home/appers/cnielsen/product_lookups.xml'
and
xml_file = r'/nfs_home/appers/cnielsen/product_lookups.xml'

In the MapReduce command, I have included the name of the xml file as a -file argument. For example:
... -file product_lookups.xml -reducer ...

Question is: In the MapReduce environment, how do I allow Process.py script to read this xml document that is stored on HDFS?

Chris Nielsen
  • 839
  • 1
  • 16
  • 31

2 Answers2

2

Here is an end-to-end example that adapts the techniques mentioned in this previous question to fit your question more closely.

Python read file as stream from HDFS

This is a small Python Hadoop Streaming application that reads key-value pairs, checks the key against an XML configuration file stored in HDFS, and then emits the value only if the key matches the configuration. The matching logic is off-loaded into a separate Process.py module, which reads the XML configuration file from HDFS by using an external call to hdfs dfs -cat.

First, we create a directory named pythonapp, containing the Python source files for our implementation. We'll see later when we submit the streaming job that we'll pass this directory in the -files argument.

Why do we put the files into an intermediate directory instead of just listing each file separately in the -files argument? That's because when YARN localizes the files for execution in containers, it introduces a layer of symlink indirection. Python then can't load the module correctly through the symlink. The solution is to package both files into the same directory. Then, when YARN localizes the files, the symlink indirection is done at the directory level instead of the individual files. Since both the main script and the module are physically in the same directory, Python will be able to load the module correctly. This question explains the issue in more detail:

How to import a custom module in a MapReduce job?

Mapper.py

import subprocess
import sys
from Process import match

for line in sys.stdin:
    key, value = line.split()
    if match(key):
        print value

Process.py

import subprocess
import xml.etree.ElementTree as ElementTree

hdfsCatProcess = subprocess.Popen(
        ['hdfs', 'dfs', '-cat', '/pythonAppConf.xml'],
        stdout=subprocess.PIPE)
pythonAppConfXmlTree = ElementTree.parse(hdfsCatProcess.stdout)
matchString = pythonAppConfXmlTree.find('./matchString').text.strip()

def match(key):
    return key == matchString

Next, we put 2 files into HDFS. /testData is the input file, containing tab-delimited key-value pairs. /pythonAppConf.xml is the XML file, where we can configure a specific key to match.

/testData

foo 1
bar 2
baz 3

/pythonAppConf.xml

<pythonAppConf>
    <matchString>foo</matchString>
</pythonAppConf>

Since we have set matchString to foo, and since our input file contains only a single record with key set to foo, we expect the output of running the job to be a single line containing the value corresponding to key foo, which is 1. Taking it for a test run, we do get the expected results.

> hadoop jar share/hadoop/tools/lib/hadoop-streaming-*.jar \
      -D mapreduce.job.reduces=0 \
      -files pythonapp \
      -input /testData \
      -output /streamingOut \
      -mapper 'python pythonapp/Mapper.py'

> hdfs dfs -cat /streamingOut/part*
1   

An alternative way to do this would be to specify the HDFS file in the -files argument. This way, YARN will pull the XML file as a localized resource to the individual nodes running the containers before the Python script launches. Then, the Python code can open the XML file as if it was a local file in the working directory. For very large jobs running multiple tasks/containers, this technique is likely to outperform calling hdfs dfs -cat from each task.

To test this technique, we can try a different version of the Process.py module.

Process.py

import xml.etree.ElementTree as ElementTree

pythonAppConfXmlTree = ElementTree.parse('pythonAppConf.xml')
matchString = pythonAppConfXmlTree.find('./matchString').text.strip()

def match(key):
    return key == matchString

The command line invocation changes to specify an HDFS path in -files, and once again, we see the expected results.

> hadoop jar share/hadoop/tools/lib/hadoop-streaming-*.jar \
      -D mapreduce.job.reduces=0 \
      -files pythonapp,hdfs:///pythonAppConf.xml \
      -input /testData \
      -output /streamingOut \
      -mapper 'python pythonapp/Mapper.py'

> hdfs dfs -cat /streamingOut/part*
1   

The Apache Hadoop documentation discusses usage of the -files option to pull HDFS files locally here.

http://hadoop.apache.org/docs/r2.7.1/hadoop-streaming/HadoopStreaming.html#Working_with_Large_Files_and_Archives

Community
  • 1
  • 1
Chris Nauroth
  • 9,614
  • 1
  • 35
  • 39
  • My problem is not the same as that asked in the previous question you provided. I am using streaming MapReduce on a large data set but instead of having the Mapper do all of the heavy lifting, it is using a helper script called Process.py. `Mapper <--> Process.py` The Mapper still streams output to the Reducer. I want the Process.py script to be able to read in a small xml file that resides on HDFS in order to build a Python dictionary in memory. This works locally on my PC because I can just provide the path to the xml file and use lxml to parse it. – Chris Nielsen Dec 19 '15 at 16:29
  • @ChrisNielsen, the same techniques can be applied to your question. I edited my answer to show a full end-to-end working example. I also described how you can use the `-files` argument to tell YARN to pull the file as a local resource before the Python script runs, which is perhaps simpler and likely to provide better performance for very large jobs. – Chris Nauroth Dec 21 '15 at 19:01
  • Excellent! Not only one detailed option but two! Thank you for taking the time to answer this! I have tried the second option and it works perfectly! – Chris Nielsen Dec 22 '15 at 16:37
0

Thanks to Chris Nauroth for the answers he provided above. With this post I want to summarize exactly what it was that solved my problem.

The second answer he provided is very close to what I was originally trying to do. What I found out is that a couple of small changes were all that I needed to make it work. For example, in the Process.py script, I was previously trying to include a full path to the small lookup xml like so:

xml_file = r'appers@hdfs.network.com:/nfs_home/appers/cnielsen/product_lookups.xml'
and
xml_file = r'/nfs_home/appers/cnielsen/product_lookups.xml'

Turns out all I needed to do was provide the name of the file in my Process.py script, without a path. For example:
xml_file = 'product_lookups.xml'

Then for the actual Hadoop command, where I was previously trying this unsuccessfully: (using -file product_lookups.xml after the -mapper listing)

  > hadoop jar /share/hadoop/tools/lib/hadoop-streaming.jar \
  -file /nfs_home/appers/cnielsen/Mapper.py \
  -file /nfs_home/appers/cnielsen/Reducer.py \
  -mapper '/usr/lib/python_2.7.3/bin/python Mapper.py ProductName' \
  -file Process.py \
  -file product_lookups.xml \
  -reducer '/usr/lib/python_2.7.3/bin/python Reducer.py' \
  -input /nfs_home/appers/extracts/*/*.xml \
  -output /user/lcmsprod/output/cnielsen/test47

The correct way to construct the Hadoop command is to use -files and list this lookup file before any other file listings. For example, this worked:

  > hadoop jar /share/hadoop/tools/lib/hadoop-streaming.jar \
  -files /nfs_home/appers/cnielsen/product_lookups.xml \
  -file /nfs_home/appers/cnielsen/Mapper.py \
  -file /nfs_home/appers/cnielsen/Reducer.py \
  -mapper '/usr/lib/python_2.7.3/bin/python Mapper.py ProductName' \
  -file Process.py \
  -reducer '/usr/lib/python_2.7.3/bin/python Reducer.py' \
  -input /nfs_home/appers/extracts/*/*.xml \
  -output /user/lcmsprod/output/cnielsen/test47

Note: Even though this page says to construct the -files command like so:

-files hdfs://host:fs_port/user/testfile.txt

It did not work for me if I included hdfs:// or the host: portion, as can be seen from the actual command listed above.

Chris Nielsen
  • 839
  • 1
  • 16
  • 31
  • Thanks for summarizing here. I also meant to mention that there is a reason that I packaged the 2 Python files into an intermediate directory to pass to the `-files` argument. I just edited my answer to explain that and link to a previous answer that provides a more detailed explanation of that part. – Chris Nauroth Dec 22 '15 at 17:15
  • Yes, I noticed you have the name of the intermediate folder here, with a comma before listing the xml file: `-files pythonapp,hdfs:///pythonAppConf.xml` Should I be doing the same? – Chris Nielsen Dec 22 '15 at 17:23
  • Yes, I expect passing a comma-delimited listed to `-files` would work. The `-file` options is deprecated in favor of using `-files`. – Chris Nauroth Dec 22 '15 at 17:52