2

I have a near-identical problem to one asked several years ago : Python subprocess with two inputs which received one answer but no implemention. I'm hoping that this repost may help clear things up for me and others.

As in the above, I would like to use subprocess to wrap a command-line tool that takes multiple inputs. In particular, I want to avoid writing the input files to disk, but would rather use e.g. named pipes, as alluded to in the above. That should read "learn how to" as I admittedly I have never tried using named pipes before. I'll further state that the inputs I have are currently two pandas dataframes, and I'd like to get one back as output.

The generic command-line implementation:

/usr/local/bin/my_command inputfileA.csv inputfileB.csv -o outputfile

My current implementation, predictably, doesn't work. I don't see how/when the dataframes get sent to the command process through the named pipes, and I'd appreciate some help!

import os
import StringIO
import subprocess
import pandas as pd
dfA = pd.DataFrame([[1,2,3],[3,4,5]], columns=["A","B","C"])
dfB = pd.DataFrame([[5,6,7],[6,7,8]], columns=["A","B","C"]) 

# make two FIFOs to host the dataframes
fnA = 'inputA'; os.mkfifo(fnA); ffA = open(fnA,"w")
fnB = 'inputB'; os.mkfifo(fnB); ffB = open(fnB,"w")

# don't know if I need to make two subprocesses to pipe inputs 
ppA  = subprocess.Popen("echo", 
                    stdin =subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE)
ppB  = subprocess.Popen("echo", 
                    stdin = suprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE)

ppA.communicate(input = dfA.to_csv(header=False,index=False,sep="\t"))
ppB.communicate(input = dfB.to_csv(header=False,index=False,sep="\t"))


pope = subprocess.Popen(["/usr/local/bin/my_command",
                        fnA,fnB,"stdout"],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
(out,err) = pope.communicate()

try:
    out = pd.read_csv(StringIO.StringIO(out), header=None,sep="\t")
except ValueError: # fail
    out = ""
    print("\n###command failed###\n")

os.unlink(fnA); os.remove(fnA)
os.unlink(fnB); os.remove(fnB)
Community
  • 1
  • 1
blackgore
  • 157
  • 1
  • 9

2 Answers2

3

You don't need additional processes to pass data to a child process without writing it to disk:

#!/usr/bin/env python
import os
import shutil
import subprocess
import tempfile
import threading
from contextlib import contextmanager    
import pandas as pd

@contextmanager
def named_pipes(count):
    dirname = tempfile.mkdtemp()
    try:
        paths = []
        for i in range(count):
            paths.append(os.path.join(dirname, 'named_pipe' + str(i)))
            os.mkfifo(paths[-1])
        yield paths
    finally:
        shutil.rmtree(dirname)

def write_command_input(df, path):
    df.to_csv(path, header=False,index=False, sep="\t")

dfA = pd.DataFrame([[1,2,3],[3,4,5]], columns=["A","B","C"])
dfB = pd.DataFrame([[5,6,7],[6,7,8]], columns=["A","B","C"])

with named_pipes(2) as paths:
    p = subprocess.Popen(["cat"] + paths, stdout=subprocess.PIPE)
    with p.stdout:
        for df, path in zip([dfA, dfB], paths):
            t = threading.Thread(target=write_command_input, args=[df, path]) 
            t.daemon = True
            t.start()
        result = pd.read_csv(p.stdout, header=None, sep="\t")
p.wait()

cat is used for demonstration. You should use your command instead ("/usr/local/bin/my_command"). I assume that you can't pass the data using standard input and you have to pass input via files. The result is read from subprocess' standard output.

jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • Thanks JF. The implementation is quite clear, it would not have occurred to me to use the threading module to pass the data rather than subprocess. I'm not sure I understand your comment that I do not require additional processes to pass data - doesn't this solution essentially create child processes through the threading module as an alternative to subprocess? – blackgore Jul 28 '15 at 16:06
  • @blackgore: 1. `threading` does not create new processes 2. `threading` is used here to perform async.io but it is not necessary; you could use a select loop instead – jfs Jul 28 '15 at 16:12
1

So there's a couple of things going on that might be screwing you up. The important thing from the previous post is to think of these FIFOs as you would normal files. Except that the normal thing that happens is that they block if you try to read from the pipe in one process without hooking up another process to write to it at the other end (and vice versa). This is how I might approach the situation, and I'll try my best to describe my thoughts.


First off, when you're in the main process, and you try to call ffA = open(fnA, 'w') you run into the issue I talked about above -- there's no one on the other end of the pipe reading data from it yet, so after issuing the command, the main process is just going to block. To account for this, you might want to change the code to remove the open() calls:

# make two FIFOs to host the dataframes
fnA = './inputA';
os.mkfifo(fnA);
fnB = './inputB';
os.mkfifo(fnB);

Okay, so we have the pipes 'inputA' and 'inputB' made and ready to be opened for reading/writing. To prevent the blocking from happening like above, we need to start a couple of subprocesses to call open(). Since I'm not particularly familiar with the subprocess library, I'll take to just forking a couple child processes.

for x in xrange(2):

    pid = os.fork()
    if pid == 0:
            if x == 0:
                    dfA.to_csv(open(fnA, 'w'), header=False, index=False, sep='\t')
            else:
                    dfB.to_csv(open(fnB, 'w'), header=False, index=False, sep='\t')
            exit()
    else:
            continue

Okay so now we'll have these two child processes blocking while waiting to write to their respective FIFOs. Now we can run our command to connect to the other end of the pipe and start reading.

pope = subprocess.Popen(["./my_cmd.sh",
                        fnA,fnB],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
(out,err) = pope.communicate()

try:
    out = pd.read_csv(StringIO.StringIO(out), header=None,sep="\t")
except ValueError: # fail
    out = ""
    print("\n###command failed###\n")

The last note I found is that unlinking the pipe seems to delete it, so no need to call remove().

os.unlink(fnA); 
os.unlink(fnB);
print "out: ", out

On my machine the print statement yields:

out:     0  1  2
0  1  2  3
1  3  4  5
2  5  6  7
3  6  7  8

my command, by the way, is just a couple cat statements:

#!/bin/bash

cat $1
cat $2
rabbit
  • 1,476
  • 12
  • 16
  • Thanks NBartley. I recreated this using a direct 'cat' command in place of the shell script you included. I'm not familiar enough with os.fork to understand the presence of the exit() function - when I implement this, it raises a SystemExit each time. Is that expected behaviour, or a typo on my part? – blackgore Jul 28 '15 at 14:38
  • Hmm. Does the code work otherwise? If not, then I imagine a typo. – rabbit Jul 28 '15 at 15:20
  • But it could be an unexpected system difference -- I am using Python 2.7.3 on an Ubuntu machine. An alternative you might try is to replace exit() with os._exit(os.EX_OK). – rabbit Jul 28 '15 at 15:21
  • mine is a similar setup (ubuntu 14.10, python 2.7.8), so I'd have thought a systematic difference would be unlikely, but that's that's exactly what I ended up using as a fix ( os._exit(1) ) and it worked just fine without raising any codes. – blackgore Jul 29 '15 at 09:57
  • 1
    thanks again for the help. I've implemented and tested both answers, using shared code where possible, and can confirm that both work as intended. I'm choosing J.F. Sebastian's implementation as the selected answer largely because it worked out as slightly more efficient (~1.2x faster) when tested on my system, using the 'cat' example. That said, it should be pointed out that NBartley's answer is probably closer in spirit and syntax to my posted code, and so any inefficiencies may be OP's own! – blackgore Jul 29 '15 at 10:30