1

I am trying to run a chain of existing scripts in python using subprocess. The chain works as expected when I use this code:

p1 = subprocess.Popen(samtoolsSortArguments, stdout=subprocess.PIPE)
p2 = subprocess.Popen(samtoolsViewArguments, stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
p3 = subprocess.Popen(htseqCountArguments, stdin=p2.stdout, stdout=file_out)
p2.stdout.close()
p3.communicate()
file_out.close()

The output looks like this:

100000 GFF lines processed.
[bam_sort_core] merging from 2 files...
200000 GFF lines processed.
300000 GFF lines processed.
400000 GFF lines processed.
500000 GFF lines processed.
600000 GFF lines processed.
700000 GFF lines processed.
800000 GFF lines processed.
900000 GFF lines processed.
1000000 GFF lines processed.
1100000 GFF lines processed.
1200000 GFF lines processed.
1300000 GFF lines processed.
1400000 GFF lines processed.
1500000 GFF lines processed.
1600000 GFF lines processed.
1700000 GFF lines processed.
1800000 GFF lines processed.
1900000 GFF lines processed.
2000000 GFF lines processed.
2100000 GFF lines processed.
2200000 GFF lines processed.
2300000 GFF lines processed.
2400000 GFF lines processed.
2500000 GFF lines processed.
2600000 GFF lines processed.
2700000 GFF lines processed.
2764635 GFF lines processed.
100000 SAM alignment records processed.
200000 SAM alignment records processed.
300000 SAM alignment records processed.
400000 SAM alignment records processed.
500000 SAM alignment records processed.
600000 SAM alignment records processed.
700000 SAM alignment records processed.
800000 SAM alignment records processed.
900000 SAM alignment records processed.
1000000 SAM alignment records processed.
1100000 SAM alignment records processed.
1200000 SAM alignment records processed.
1300000 SAM alignment records processed.
1400000 SAM alignment records processed.
1500000 SAM alignment records processed.
1600000 SAM alignment records processed.
1700000 SAM alignment records processed.
1800000 SAM alignment records processed.
1900000 SAM alignment records processed.
2000000 SAM alignment records processed.
2100000 SAM alignment records processed.
2200000 SAM alignment records processed.
2300000 SAM alignment records processed.
2400000 SAM alignment records processed.
2500000 SAM alignment records processed.
2600000 SAM alignment records processed.
2700000 SAM alignment records processed.
2800000 SAM alignment records processed.
2900000 SAM alignment records processed.

All of this output is coming from stderr and I'd like to be able to write it to both the terminal and a logfile. In order to accomplish this I am using the unix tee command as a subprocess in python and passing it stderr from the previous subprocess command. The code looks like this:

p1 = subprocess.Popen(samtoolsSortArguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
tee = subprocess.Popen(['tee', logfile], stdin=p1.stderr)
p1.stderr.close()

p2 = subprocess.Popen(samtoolsViewArguments, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p1.stdout.close()
tee = subprocess.Popen(['tee', logfile], stdin=p2.stderr)
p2.stderr.close()

p3 = subprocess.Popen(htseqCountArguments, stdin=p2.stdout, stdout=file_out, stderr=subprocess.PIPE)
p2.stdout.close()
tee = subprocess.Popen(['tee', logfile], stdin=p3.stderr)

p3.communicate()
p3.stderr.close()
tee.communicate()
file_out.close()

The stdout output from this code that is written to my file_out handle is correct. Even the stderr being printed to the screen and logfile seem to be the correct information. However, the output for stderr is truncated on some lines and I can't figure out why. Here's what my logfile and terminal look like (they match):

 GFF lines processed.
[bam_sort_core] merging from 2 files...
 GFF lines processed.
300000 GFF lines processed.
400000 GFF lines processed.
500000 GFF lines processed.
600000 GFF lines processed.
700000 GFF lines processed.
800000 GFF lines processed.
900000 GFF lines processed.
1000000 GFF lines processed.
1100000 GFF lines processed.
1200000 GFF lines processed.
1300000 GFF lines processed.
1400000 GFF lines processed.
1500000 GFF lines processed.
1600000 GFF lines processed.
1700000 GFF lines processed.
1800000 GFF lines processed.
1900000 GFF lines processed.
 GFF lines processed.
GFF lines processed.
FF lines processed.
F lines processed.
 lines processed.
ines processed.
700000 GFF lines processed.
2764635 GFF lines processed.
nt records processed.
 records processed.
300000 SAM alignment records processed.
cords processed.
ds processed.
processed.
essed.
d.
000000 SAM alignment records processed.
00 SAM alignment records processed.
 alignment records processed.
1500000 SAM alignment records processed.
1600000 SAM alignment records processed.
1800000 SAM alignment records processed.
1900000 SAM alignment records processed.
2000000 SAM alignment records processed.
2100000 SAM alignment records processed.
2200000 SAM alignment records processed.
2500000 SAM alignment records processed.
2600000 SAM alignment records processed.
2700000 SAM alignment records processed.
2900000 SAM alignment records processed.

Why is the output when passed to tee truncated? Is this just a column shift? Is there a way to fix this, or am I just trying to do too much with subprocess?

EDIT: Here's an SSCCE of @tdelaney code. It reproduces the same error that I was having using it in my broader context. This example should be run from a folder containing a file called test.txt. test.txt should read as follows (or anything similar so long as some lines are "test"):

test
not
test

And here's toy code (make sure to change the shebang to point to your python):

#!/usr/local/bin/python2

import sys
import subprocess
import threading

logfile = "./testlog.txt"

arg1 = ["ls", "-l"]
arg2 = ["find", "-name", "test.txt"]
arg3 = ["xargs", "grep", "-i", "-n", "test"]

def log_writer(pipe, log_fp, lock):
    for line in pipe:
        with lock:
            log_fp.write(line)
            sys.stdout.write(line)

with open(logfile, 'w') as log_fp:
    lock = threading.Lock()
    threads = []
    p1 = subprocess.Popen(arg1, stdout=subprocess.PIPE)
    threads.append(threading.Thread(target=log_writer, args=(p1.stdout, log_fp, lock)))

    p2 = subprocess.Popen(arg2, stdin=p1.stdout, stdout=subprocess.PIPE)
    p1.stdout.close()
    threads.append(threading.Thread(target=log_writer, args=(p2.stdout, log_fp, lock)))

    p3 = subprocess.Popen(arg3, stdin=p2.stdout, stdout=subprocess.PIPE)
    p2.stdout.close()
    threads.append(threading.Thread(target=log_writer, args=(p3.stdout, log_fp, lock)))

    for t in threads:
        t.start()

    p3.communicate()

    for t in threads:
        t.join()

Note: If I comment out the close() and communicate() lines the code runs. I'm a little concerned about doing so though since then I'm going to hit all kinds of other problems in my broader context.

sage88
  • 4,104
  • 4
  • 31
  • 41
  • don't call `p1.stdout.close()`, `p2.stdout.close()`, `p3.communicate()` if you want to read from the pipes in the same process (you do in the code with threads). Use `for p in [p1, p2, p3]: p.wait()` to reap the child processes. Add `with pipe` at the top of `log_writer()` to close the pipe on EOF or an error. See [Python subprocess get children's output to file and terminal?](http://stackoverflow.com/a/4985080/4279) – jfs Apr 01 '15 at 09:52

1 Answers1

1

The problem is that you have multiple tees writing to a single file. They each have their own file pointer and current offset to the file and will overwrite bits of each others stuff. One solution is to implement the log file write using threads and a mutex in python.

#!/bin/env python

import sys
import subprocess
import threading

logfile = "./testlog.txt"
file_out = open("./test.output.txt", "w")

arg1 = ["ls", "-l"]
arg2 = ["find", "-name", "test.txt"]
arg3 = ["xargs", "grep", "-i", "-n", "test"]

def log_writer(pipe, log_fp, lock):
    for line in pipe:
        with lock:
            log_fp.write(line)
            sys.stdout.write(line)

with open(logfile, 'w') as log_fp:
    lock = threading.Lock()
    threads = []
    processes = []
    p1 = subprocess.Popen(arg1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    threads.append(threading.Thread(target=log_writer, args=(p1.stderr, log_fp, lock)))
    processes.append(p1)

    p2 = subprocess.Popen(arg2, stdin=p1.stderr, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    p1.stdout.close()
    threads.append(threading.Thread(target=log_writer, args=(p2.stderr, log_fp, lock)))
    processes.append(p2)

    p3 = subprocess.Popen(arg3, stdin=p2.stdout, stdout=file_out, stderr=subprocess.PIPE)
    p2.stdout.close()
    threads.append(threading.Thread(target=log_writer, args=(p3.stderr, log_fp, lock)))
    processes.append(p3)

    file_out.close()

    for t in threads:
        t.start()

    for p in processes:
        p1.wait()

    for t in threads:
        t.join()
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • Looking at this I don't see how it could write to both the console and the logfile? I'm also running it right now, it doesn't seem to be working, but let me play with it for a bit. – sage88 Mar 31 '15 at 16:35
  • Looks like I forgot a line: `sys.stdout.write(line)`. I'll add that into the sample. – tdelaney Mar 31 '15 at 16:39
  • So it doesn't completely fail in that stdout is still written to my output file correctly. However, the log and console are blank and it throws an error: IOError: close() called during concurrent operation on the same file object. Exception in thread Thread-3: ValueError: I/O operation on closed file and the line in that thread throwing the error is: log_fp.write(line). Did you test a variation of those code (just curious)? – sage88 Mar 31 '15 at 16:42
  • Like your original post, its an unrunnable example. If you want to post a running example, I'd be happy to try it. You would get the error you describe if you close `logfile` before joining the writer threads. I am suprized that you don't see anything on the console. – tdelaney Mar 31 '15 at 17:00
  • Hey I added a toy example of your code to show the error it's hitting. It just runs some basic unix commands and all of it is run through stdout rather than both stdout and stderr for simplicity. See my edit to the question above. – sage88 Mar 31 '15 at 17:40
  • That was interesting. The bug in my code was that `p3.communicate()` tries to read and close `stderr` also. It can just be removed. Your working example tried to log `stdout` instead of `stderr`, but that's the handle being passed to the child and also closed in the parent, so you got more crashes. I'll update my solution to your example plus a few fixes to make it go. – tdelaney Mar 31 '15 at 18:09
  • I actually am not sure that I can remove p3.communicate(). While it fixes the bug in your code, it also makes it so that the subprocesses don't make the parent thread wait. All of this is occurring in a method and that method needs to wait until these commands have been run. I don't think that will happen without communicate. – sage88 Mar 31 '15 at 18:21
  • @sage88 - I put all of the subprocesses in a list and did a wait on them. This means there are no orphaned zombie processes and that the code waits until they are all done. – tdelaney Mar 31 '15 at 18:32
  • Seems like this goes against the recommendations in the python subprocess doc to avoid using wait() and instead use communicate()? – sage88 Mar 31 '15 at 18:39
  • @sage88 - I don't know of such warnings. `communicate` is used when you want to read all of `stdout` and `stderr` into memory and wait for the process to complete. It implements background threads to avoid deadlocks and calls `wait`. But since we want to do thing differently, we have our own threads to avoid deadlocks and do the wait ourselves. – tdelaney Mar 31 '15 at 19:04