30

Here is my problem: I have a file in HDFS which can potentially be huge (=not enough to fit all in memory)

What I would like to do is avoid having to cache this file in memory, and only process it line by line like I would do with a regular file:

for line in open("myfile", "r"):
    # do some processing

I am looking to see if there is an easy way to get this done right without using external libraries. I can probably make it work with libpyhdfs or python-hdfs but I'd like if possible to avoid introducing new dependencies and untested libs in the system, especially since both of these don't seem heavily maintained and state that they shouldn't be used in production.

I was thinking to do this using the standard "hadoop" command line tools using the Python subprocess module, but I can't seem to be able to do what I need since there is no command line tools that would do my processing and I would like to execute a Python function for every linein a streaming fashion.

Is there a way to apply Python functions as right operands of the pipes using the subprocess module? Or even better, open it like a file as a generator so I could process each line easily?

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)

If there is another way to achieve what I described above without using an external library, I'm also pretty open.

Thanks for any help !

Charles Menguy
  • 40,830
  • 17
  • 95
  • 117
  • You could use https://bitbucket.org/turnaev/cyhdfs i released it few days ago and it is used in production. PyPI link http://pypi.python.org/pypi/cyhdfs/0.1.2 is slightly older bu ok. – SanityIO Sep 19 '12 at 06:13

4 Answers4

47

You want xreadlines, it reads lines from a file without loading the whole file into memory.

Edit:

Now I see your question, you just need to get the stdout pipe from your Popen object:

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in cat.stdout:
    print line
Keith Randall
  • 22,985
  • 2
  • 35
  • 54
  • 1
    How is this different from doing a `for line in open("myfile")`? I think the difficulty here is that I'm not dealing with a regular file, but a file that is in HDFS and I'm wondering if there's a way I can make the subprocess module (or something else) process it line by line with some Python code. (I didn't downvote) – Charles Menguy Sep 18 '12 at 22:08
  • I guess I misunderstood your question then. Don't you want to process it line by line *in python*? Doesn't HDFS have a way to `cat` a file? (I hope it does.) Just call that as a subprocess and wrap the result in an `xreadlines` or a `for line in...`. – Keith Randall Sep 18 '12 at 22:12
  • Yes you can cat a file with `hadoop fs -cat /path/to/myfile`, as I've written in the little subprocess statement above. Maybe I'm missing something, but objects returned by `subprocess.Popen` are not iterable file-like objects, right? I've tried doing a `for line in ...` on my `cat` object above but all I get is `TypeError: 'Popen' object is not iterable`. Am I misunderstanding something? It would be awesome if you could show a little example of what you're thinking of. – Charles Menguy Sep 18 '12 at 22:18
  • That works great, I had no idea the `stdout` and `stderr` of a `Popen` object could be used like files, easier than I thought, thanks for the help ! – Charles Menguy Sep 18 '12 at 22:27
  • 4
    Note that if you replace -cat with -text it will handle compression too. – Joe K Sep 18 '12 at 22:52
  • 3
    Note that, since 2.3, [`xreadlines` is deprecated](http://docs.python.org/release/2.3/lib/module-xreadlines.html) (just use `for line in file`, as in your **Edit**). – Adam Monsen Feb 15 '13 at 18:27
  • 1
    @CharlesMenguy: On Python 2, you could use `for line in iter(cat.stdout.readline, ''): print line,` (note: comma at the end): it fixes two issues with the answer: 1. there is a read-ahead bug in Python 2 that delays output from the `cat` command 2. `print line` (no comma) may introduce unnecessary newlines – jfs Oct 04 '14 at 12:26
  • Instead of forking a separate process to call `hadoop fs -cat`, it may be preferrable to specify an HDFS path in the `-files` argument when you run the application. This tells YARN to pull the file as a local resource to the nodes running the containers. The Python code can then load it as a local file in the working directory. This technique can provide better performance in very large jobs. More details are in this question: http://stackoverflow.com/questions/34362331/mapreduce-how-to-allow-mapper-to-read-an-xml-file-for-lookup/34366077#34366077 – Chris Nauroth Dec 21 '15 at 19:07
32

If you want to avoid adding external dependencies at any cost, Keith's answer is the way to go. Pydoop, on the other hand, could make your life much easier:

import pydoop.hdfs as hdfs
with hdfs.open('/user/myuser/filename') as f:
    for line in f:
        do_something(line)

Regarding your concerns, Pydoop is actively developed and has been used in production for years at CRS4, mostly for computational biology applications.

Simone

simleo
  • 2,775
  • 22
  • 23
1

In the last two years, there has been a lot of motion on Hadoop-Streaming. This is pretty fast according to Cloudera: http://blog.cloudera.com/blog/2013/01/a-guide-to-python-frameworks-for-hadoop/ I've had good success with it.

Brian Dolan
  • 3,086
  • 2
  • 24
  • 35
1

You can use the WebHDFS Python Library (built on top of urllib3):

from hdfs import InsecureClient
client_hdfs = InsecureClient('http://host:port', user='root')
with client_hdfs.write(access_path) as writer:
    dump(records, writer)  # tested for pickle and json (doesnt work for joblib)

Or you can use the requests package in python as:

import requests
from json import dumps
params = (('op', 'CREATE')
('buffersize', 256))
data = dumps(file)  # some file or object - also tested for pickle library
response = requests.put('http://host:port/path', params=params, data=data)  # response 200 = successful

Hope this helps!

Ramsha Siddiqui
  • 460
  • 6
  • 20