Here is an end-to-end example that adapts the techniques mentioned in this previous question to fit your question more closely.
Python read file as stream from HDFS
This is a small Python Hadoop Streaming application that reads key-value pairs, checks the key against an XML configuration file stored in HDFS, and then emits the value only if the key matches the configuration. The matching logic is off-loaded into a separate Process.py module, which reads the XML configuration file from HDFS by using an external call to hdfs dfs -cat
.
First, we create a directory named pythonapp, containing the Python source files for our implementation. We'll see later when we submit the streaming job that we'll pass this directory in the -files
argument.
Why do we put the files into an intermediate directory instead of just listing each file separately in the -files
argument? That's because when YARN localizes the files for execution in containers, it introduces a layer of symlink indirection. Python then can't load the module correctly through the symlink. The solution is to package both files into the same directory. Then, when YARN localizes the files, the symlink indirection is done at the directory level instead of the individual files. Since both the main script and the module are physically in the same directory, Python will be able to load the module correctly. This question explains the issue in more detail:
How to import a custom module in a MapReduce job?
Mapper.py
import subprocess
import sys
from Process import match
for line in sys.stdin:
key, value = line.split()
if match(key):
print value
Process.py
import subprocess
import xml.etree.ElementTree as ElementTree
hdfsCatProcess = subprocess.Popen(
['hdfs', 'dfs', '-cat', '/pythonAppConf.xml'],
stdout=subprocess.PIPE)
pythonAppConfXmlTree = ElementTree.parse(hdfsCatProcess.stdout)
matchString = pythonAppConfXmlTree.find('./matchString').text.strip()
def match(key):
return key == matchString
Next, we put 2 files into HDFS. /testData is the input file, containing tab-delimited key-value pairs. /pythonAppConf.xml is the XML file, where we can configure a specific key to match.
/testData
foo 1
bar 2
baz 3
/pythonAppConf.xml
<pythonAppConf>
<matchString>foo</matchString>
</pythonAppConf>
Since we have set matchString
to foo
, and since our input file contains only a single record with key set to foo
, we expect the output of running the job to be a single line containing the value corresponding to key foo
, which is 1
. Taking it for a test run, we do get the expected results.
> hadoop jar share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D mapreduce.job.reduces=0 \
-files pythonapp \
-input /testData \
-output /streamingOut \
-mapper 'python pythonapp/Mapper.py'
> hdfs dfs -cat /streamingOut/part*
1
An alternative way to do this would be to specify the HDFS file in the -files
argument. This way, YARN will pull the XML file as a localized resource to the individual nodes running the containers before the Python script launches. Then, the Python code can open the XML file as if it was a local file in the working directory. For very large jobs running multiple tasks/containers, this technique is likely to outperform calling hdfs dfs -cat
from each task.
To test this technique, we can try a different version of the Process.py module.
Process.py
import xml.etree.ElementTree as ElementTree
pythonAppConfXmlTree = ElementTree.parse('pythonAppConf.xml')
matchString = pythonAppConfXmlTree.find('./matchString').text.strip()
def match(key):
return key == matchString
The command line invocation changes to specify an HDFS path in -files
, and once again, we see the expected results.
> hadoop jar share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D mapreduce.job.reduces=0 \
-files pythonapp,hdfs:///pythonAppConf.xml \
-input /testData \
-output /streamingOut \
-mapper 'python pythonapp/Mapper.py'
> hdfs dfs -cat /streamingOut/part*
1
The Apache Hadoop documentation discusses usage of the -files
option to pull HDFS files locally here.
http://hadoop.apache.org/docs/r2.7.1/hadoop-streaming/HadoopStreaming.html#Working_with_Large_Files_and_Archives