10

I am trying to use Sailfish, which takes multiple fastq files as arguments, in a ruffus pipeline. I execute Sailfish using the subprocess module in python, but <() in the subprocess call does not work even when I set shell=True.

This is the command I want to execute using python:

sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file]

or (preferably):

sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file]

A generalization:

someprogram <(someprocess) <(someprocess)

How would I go about doing this in python? Is subprocess the right approach?

user207421
  • 305,947
  • 44
  • 307
  • 483
Michelle
  • 103
  • 1
  • 4
  • related: [Bash style process substitution with Python's Popen](http://stackoverflow.com/q/15343447/4279) – jfs Mar 03 '15 at 19:49
  • related to the title: [How do I use subprocess.Popen to connect multiple processes by pipes?](http://stackoverflow.com/q/295459/4279) – jfs Mar 05 '15 at 08:14

2 Answers2

11

To emulate the bash process substitution:

#!/usr/bin/env python
from subprocess import check_call

check_call('someprogram <(someprocess) <(anotherprocess)',
           shell=True, executable='/bin/bash')

In Python, you could use named pipes:

#!/usr/bin/env python
from subprocess import Popen

with named_pipes(n=2) as paths:
    someprogram = Popen(['someprogram'] + paths)
    processes = []
    for path, command in zip(paths, ['someprocess', 'anotherprocess']):
        with open(path, 'wb', 0) as pipe:
            processes.append(Popen(command, stdout=pipe, close_fds=True))
    for p in [someprogram] + processes:
        p.wait()

where named_pipes(n) is:

import os
import shutil
import tempfile
from contextlib import contextmanager

@contextmanager
def named_pipes(n=1):
    dirname = tempfile.mkdtemp()
    try:
        paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
        for path in paths:
            os.mkfifo(path)
        yield paths
    finally:
        shutil.rmtree(dirname)

Another and more preferable way (no need to create a named entry on disk) to implement the bash process substitution is to use /dev/fd/N filenames (if they are available) as suggested by @Dunes. On FreeBSD, fdescfs(5) (/dev/fd/#) creates entries for all file descriptors opened by the process. To test availability, run:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available

If it fails; try to symlink /dev/fd to proc(5) as it is done on some Linuxes:

$ ln -s /proc/self/fd /dev/fd

Here's /dev/fd-based implementation of someprogram <(someprocess) <(anotherprocess) bash command:

#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE

def kill(process):
    if process.poll() is None: # still running
        process.kill()

with ExitStack() as stack: # for proper cleanup
    processes = []
    for command in [['someprocess'], ['anotherprocess']]:  # start child processes
        processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
        stack.callback(kill, processes[-1]) # kill on someprogram exit

    fds = [p.stdout.fileno() for p in processes]
    someprogram = stack.enter_context(
        Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
    for p in processes: # close pipes in the parent
        p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
   raise CalledProcessError(someprogram.returncode, someprogram.args)

Note: on my Ubuntu machine, the subprocess code works only in Python 3.4+, despite pass_fds being available since Python 3.2.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • Thanks J.F. Sebastian! It actually worked with the simple subprocess argument `executable='/bin/bash'` that I was missing before. It works now with this call: `check_call('sailfish quant [options] <(gunzip -c file1 file2) <(gunzip -c file3 file4)', shell=True, executable='/bin/bash')`. Thank you so much for your help! You really went above and beyond in your answer--you not only helped me solve my problem but also helped me better understand piping in python. – Michelle Mar 05 '15 at 02:10
3

Whilst J.F. Sebastian has provided an answer using named pipes it is possible to do this with anonymous pipes.

import shlex
from subprocess import Popen, PIPE

inputcmd0 = "zcat hello.gz" # gzipped file containing "hello"
inputcmd1 = "zcat world.gz" # gzipped file containing "world"

def get_filename(file_):
    return "/dev/fd/{}".format(file_.fileno())

def get_stdout_fds(*processes):
    return tuple(p.stdout.fileno() for p in processes)

# setup producer processes
inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE)
inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE)

# setup consumer process
# pass input processes pipes by "filename" eg. /dev/fd/5
cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout), 
    file1=get_filename(inputproc1.stdout))
print("command is:", cmd)
# pass_fds argument tells Popen to let the child process inherit the pipe's fds
someprogram = Popen(shlex.split(cmd), stdout=PIPE, 
    pass_fds=get_stdout_fds(inputproc0, inputproc1))

output, error = someprogram.communicate()

for p in [inputproc0, inputproc1, someprogram]:
    p.wait()

assert output == b"hello\nworld\n"
Dunes
  • 37,291
  • 7
  • 81
  • 97
  • your code does: `inputcmd | someproc` -- it is different from `someproc <(inputcmd)`. btw, you should call `inputproc.communicate()` instead of `inputproc.wait()`, to close `inputproc.stdout.close()` in the parent so that `inputproc` wouldn't hang if `someproc` exits prematurely. It is not clear what you are trying to achieve with `StreamConnector` but it seems bloated. – jfs Mar 04 '15 at 09:44
  • My mistake. I thought `<(cmdlist)` connected a series of commands stdout to the stdin of the consumer process. The class was meant to cat-like utility for streams rather than files. Answer is much more simple now. – Dunes Mar 04 '15 at 13:08
  • `/dev/fd/#` or named pipes if the former is unavailable is exactly how bash implements the process substitution. You should close pipes in the parent so that if `inputproc1` or `inputproc2` die prematurely; `someprogram` could exit sooner. Otherwise *the solution should work on Python 3.4+*. I've added an exception-safe version of the code to [my answer](http://stackoverflow.com/a/28840955/4279) (just as an exercise). – jfs Mar 04 '15 at 16:34
  • about pipes I meant the other way around: if `someprogram` dies prematurely then the parent Python script will hang on `p.wait()` after `inputproc[01]` fill their OS stdout pipe buffers (~65K on my machine) if you don't close the pipes (`inputproc[01].stdout`) in the parent. – jfs Mar 05 '15 at 07:53