9

I'm porting a bash script to python 2.6, and want to replace some code:

cat $( ls -tr xyz_`date +%F`_*.log ) | filter args > bzip2

I guess I want something similar to the "Replacing shell pipe line" example at http://docs.python.org/release/2.6/library/subprocess.html, ala...

p1 = Popen(["filter", "args"], stdin=*?WHAT?*, stdout=PIPE)
p2 = Popen(["bzip2"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]

But, I'm not sure how best to provide p1's stdin value so it concatenates the input files. Seems I could add...

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = ... stdin=p0.stdout ...

...but that seems to be crossing beyond use of (slow, inefficient) pipes to call external programs with significant functionality. (Any decent shell performs the cat internally.)

So, I can imagine a custom class that satisfies the file object API requirements and can therefore be used for p1's stdin, concatenating arbitrary other file objects. (EDIT: existing answers explain why this isn't possible)

Does python 2.6 have a mechanism addressing this need/want, or might another Popen to cat be considered perfectly fine in python circles?

Thanks.

Tony Delroy
  • 102,968
  • 15
  • 177
  • 252
  • What does `filter` do? Do you need to call an external program for this functionality? – Sven Marnach May 18 '11 at 09:08
  • 1
    @Sven: filter is a few-hundred-line C++ program processing the ~10GB input... it's a bit much to rewrite, and it's convenient having it at C++ speeds to get quick edit/test turnaround. So that said, one extra `Popen/cat` in the context of a non-trivial job isn't a significant problem, it just feels sloppy :-}. – Tony Delroy May 18 '11 at 09:13

4 Answers4

6

You can replace everything that you're doing with Python code, except for your external utility. That way your program will remain portable as long as your external util is portable. You can also consider turning the C++ program into a library and using Cython to interface with it. As Messa showed, date is replaced with time.strftime, globbing is done with glob.glob and cat can be replaced with reading all the files in the list and writing them to the input of your program. The call to bzip2 can be replaced with the bz2 module, but that will complicate your program because you'd have to read and write simultaneously. To do that, you need to either use p.communicate or a thread if the data is huge (select.select would be a better choice but it won't work on Windows).

import sys
import bz2
import glob
import time
import threading
import subprocess

output_filename = '../whatever.bz2'
input_filenames = glob.glob(time.strftime("xyz_%F_*.log"))
p = subprocess.Popen(['filter', 'args'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
output = open(output_filename, 'wb')
output_compressor = bz2.BZ2Compressor()

def data_reader():
    for filename in input_filenames:
        f = open(filename, 'rb')
        p.stdin.writelines(iter(lambda: f.read(8192), ''))
    p.stdin.close()

input_thread = threading.Thread(target=data_reader)
input_thread.start()

with output:
    for chunk in iter(lambda: p.stdout.read(8192), ''):
        output.write(output_compressor.compress(chunk))

    output.write(output_compressor.flush())

input_thread.join()
p.wait()

Addition: How to detect file input type

You can use either the file extension or the Python bindings for libmagic to detect how the file is compressed. Here's a code example that does both, and automatically chooses magic if it is available. You can take the part that suits your needs and adapt it to your needs. The open_autodecompress should detect the mime encoding and open the file with the appropriate decompressor if it is available.

import os
import gzip
import bz2
try:
    import magic
except ImportError:
    has_magic = False
else:
    has_magic = True


mime_openers = {
    'application/x-bzip2': bz2.BZ2File,
    'application/x-gzip': gzip.GzipFile,
}

ext_openers = {
    '.bz2': bz2.BZ2File,
    '.gz': gzip.GzipFile,
}


def open_autodecompress(filename, mode='r'):
    if has_magic:
        ms = magic.open(magic.MAGIC_MIME_TYPE)
        ms.load()
        mimetype = ms.file(filename)
        opener = mime_openers.get(mimetype, open)
    else:
        basepart, ext = os.path.splitext(filename)
        opener = ext_openers.get(ext, open)
    return opener(filename, mode)
Rosh Oxymoron
  • 20,355
  • 6
  • 41
  • 43
  • Hi Rosh. Thanks again for this - elegant and concise code. Still, I've found some of my inputs on some hosts are compressed (gzip or bzip2) earlier in the day, so it's actually conveniently concise to use `bzcat --force` in the pipeline (though I'm sure with a few more tweaks the bz2 library you use above could satisfy this requirement at the input end too). But - straw that broke the camel's back and all that. I will try this out later for my own interest, but prefer the reassurance of something dead-simple for this production use. Hoping others will find this question/answer useful. Cheers. – Tony Delroy May 19 '11 at 07:58
  • I added an example of how you could detect the file type of the input files when reading, if you might find it useful. – Rosh Oxymoron May 19 '11 at 13:15
2

If you look inside the subprocess module implementation, you will see that std{in,out,err} are expected to be fileobjects supporting fileno() method, so a simple concatinating file-like object with python interface (or even a StringIO object) is not suitable here.

If it were iterators, not file objects, you could use itertools.chain.

Of course, sacrificing the memory consumption you can do something like this:

import itertools, os

# ...

files = [f for f in os.listdir(".") if os.path.isfile(f)]
input = ''.join(itertools.chain(open(file) for file in files))
p2.communicate(input)
abbot
  • 27,408
  • 6
  • 54
  • 57
  • Thanks for the explanation re `fileno()`. With so much input (~10GB), an in-memory join's probably not suitable but thanks for illustrating the technique. So, I know more about what not to try! Cheers. – Tony Delroy May 18 '11 at 09:26
  • @Tony, my opinion: if you have a large files, just go with Popen(cat). Of course, you can reimplement this in python, but what for? It will be a separate process anyway (if you want to autofeed its output to Popen object), so why not use standard tools? – abbot May 18 '11 at 11:04
  • interesting... I can't see a way to do it yet, so it's not clear to me that if there is a way then it'd involve a separate process. But, I'm feeling increasingly comfortable with the `Popen(["cat"...])` anyway... not a big deal for my specific application, but I just found it hard to imagine that in general an external process would be best practice for this. On the plus side, `Popen(["cat"...)` is concise and easily understood.... – Tony Delroy May 18 '11 at 11:18
  • @Tony, it is not the most effective method indeed, but `subprocess.Popen` objects do not provide a convenient API for an effective solution for your case. Check the `subprocess` module sources, implementation of the `communicate()` method. For an efficient solution you need to reimplement it replacing its string `input` argument with an iterator returning stirngs. – abbot May 18 '11 at 11:55
1

This should be easy. First, create a pipe using os.pipe, then Popen the filter with read end of the pipe as standard input. Then for each file in the directory with name matching the pattern, just pass its contents to the write end of the pipe. This should be exactly the same what the shell command cat ..._*.log | filter args does.

Update: Sorry, pipe from os.pipe is not needed, I forgot that subprocess.Popen(..., stdin=subprocess.PIPE) actualy creates one for you. Also a pipe cannot be stuffed with too much data, more data can be written to a pipe only after the previous data are read.

So the solution (for example with wc -l) would be:

import glob
import subprocess

p = subprocess.Popen(["wc", "-l"], stdin=subprocess.PIPE)

processDate = "2011-05-18"  # or time.strftime("%F")
for name in glob.glob("xyz_%s_*.log" % processDate):
    f = open(name, "rb")
    # copy all data from f to p.stdin
    while True:
        data = f.read(8192)
        if not data:
            break  # reached end of file
        p.stdin.write(data)
    f.close()

p.stdin.close()
p.wait()

Usage example:

$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_a.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_b.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_c.log 
$ ./example.py 
   30000
Messa
  • 24,321
  • 6
  • 68
  • 92
  • Thanks Messa - will give it a try, although I'm worried that if I try to write all the ~10GB input to the pipe before calling `p2.communicate()[0]` it will either block or fail after 64 kilobytes or whatever the buffer size is. Maybe I'll need another thread to feed the pipe while `communicate` runs? Will see how it goes. Thanks for the avenue of exploration. – Tony Delroy May 18 '11 at 09:31
  • I've been playing with this: my observations seem confirmed by the accepted answer to http://stackoverflow.com/questions/163542/python-how-do-i-pass-a-string-into-subprocess-popen-using-the-stdin-argument - that the filter must have `stdin=subprocess.PIPE` rather than a readable fd returned from `os.pipe()`. Weird thing is, I can only `communicate()` *between* `filter_pipe.stdin.write(...)` and ...`.close()`, which suggests `communicate()` is non-blocking. That seems to rule out the thread approach - `communicate()` won't wait until the other thread finishes writing. Too much data for mem.... – Tony Delroy May 18 '11 at 10:50
  • You can shorten that to `for chunk in iter(lambda: f.read(8192), ''): p.stdin.write(chunk)` or `p.stdin.writelines(iter(lambda: f.read(8192), ''))`. – Rosh Oxymoron May 18 '11 at 12:13
  • @Messa, @Rosh: both your solutions look very promising - I'll check them out in the morning and report back on whether they work well with the input volumes in question. Thank you both for your time and effort. – Tony Delroy May 18 '11 at 13:35
  • 1
    @Messa, it is not that easy and it will certainly block if the subprocess you open creates a large output which does not terminate with a file descriptor which 'writes' data automatically (e.g. is a pipe). Proof: https://gist.github.com/979549. Such cases require a more complex machinery with threads or something like that to drain the other end of the output pipe simultaneously with writing to the input pipe. Check `communicate()` implementation in `subprocess.py`. – abbot May 18 '11 at 21:03
  • @Messa: thanks again for your efforts and insights on this. I think abbot's right about this, and an approach like Rosh's is necessary... bit more than it's worth for my current needs but good to know about. Cheers. – Tony Delroy May 19 '11 at 08:00
1

When using subprocess you have to consider the fact that internally Popen will use the file descriptor(handler) and call os.dup2() for stdin, stdout and stderr before passing them to the child process created.

So if you don't want to use system shell pipe with Popen:

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = Popen(["filter", "args"], stdin=p0.stdout, stdout=PIPE)

...

I think your other option is to write a cat function in python and generate a file in cat-like way and pass this file to p1 stdin, don't think about a class that implement the io API because it will not work as i said because internally the child process will just get the file descriptors.

With that said i think your better option is to use unix PIPE way like in subprocess doc.

mouad
  • 67,571
  • 18
  • 114
  • 106
  • I'm with you up until the last line... by "first solution" do you mean the Popen of cat that you show above as "the trivial solution"? The link you've provided discusses the importance of closing the `stdout` for earlier pipes... I'd missed that so thanks! I'm starting to think this might be as good as it gets, but will leave the question open a while longer.... Cheers. – Tony Delroy May 18 '11 at 09:40
  • @Tony: yes by "first solution" i was referring to the the "trivial solution" :) , but after your comment i don't think it was good to call "trivial solution" for the pipe-line solution :) . edited – mouad May 18 '11 at 09:47
  • 1
    BTW / I shouldn't really create a temporary file with the concatenated output, as I want to ensure the program operates even when there's minimal free space on the disks (all too common, seems big banks all have people whose job is to constrain disk usage to frustratingly silly amounts, even on prod machines :-/). – Tony Delroy May 18 '11 at 10:59
  • @Tony: yes that another reason to use the cat and PIPE :) – mouad May 18 '11 at 11:07