13

I am using Python 2 subprocess with threading threads to take standard input, process it with binaries A, B, and C and write modified data to standard output.

This script (let's call it: A_to_C.py) is very slow and I'd like to learn how to fix it.

The general flow is as follows:

A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))

B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))

C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin)) 

produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()

produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()

A_process.wait()
B_process.wait()
C_process.wait()

The idea is that standard input goes into A_to_C.py:

  1. The A binary processes a chunk of standard input and creates A-output with the function produceA.
  2. The B binary processes a chunk of A's standard output and creates B-output via the function produceB.
  3. The C binary processes a chunk of B's standard output via the function produceC and writes C-output to standard output.

I did profiling with cProfile and nearly all of the time in this script appears to be spent in acquiring thread locks.

For instance, in a test 417s job, 416s (>99% of the total runtime) is spent on acquiring thread locks:

$ python                                                                                                                                                                                                                                         
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)                                                                                                                                                                                                                                              
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2                                                                                                                                                                                                                                              
Type "help", "copyright", "credits" or "license" for more information.                                                                                                                                                                                                                        
>>> import pstats                                                                                                                                                                                                                                                                             
>>> p = pstats.Stats('1.profile')                                                                                                                                                                                                                                                             
>>> p.sort_stats('cumulative').print_stats(10)                                                                                                                                                                                                                                                
Thu Jun 12 22:19:07 2014    1.profile                                                                                                                                                                                                                                                         

         1755 function calls (1752 primitive calls) in 417.203 CPU seconds                                                                                                                                                                                                                    

   Ordered by: cumulative time                                                                                                                                                                                                                                                                
   List reduced from 162 to 10 due to restriction <10>                                                                                                                                                                                                                                        

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)                                                                                                                                                                                                                       
        1    0.020    0.020  417.203  417.203 A_to_C.py:90(<module>)                                                                                                                                                                                  
        1    0.000    0.000  417.123  417.123 A_to_C.py:809(main)                                                                                                                                                                                     
        6    0.000    0.000  416.424   69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)                                                                                                                                                                         
       32  416.424   13.013  416.424   13.013 {method 'acquire' of 'thread.lock' objects}                                                                                                                                                                                                     
        3    0.000    0.000  416.422  138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)                                                                                                                                                                         
        3    0.000    0.000    0.498    0.166 A_to_C.py:473(which)                                                                                                                                                                                    
       37    0.000    0.000    0.498    0.013 A_to_C.py:475(is_exe)                                                                                                                                                                                   
        3    0.496    0.165    0.496    0.165 {posix.access}                                                                                                                                                                                                                                  
        6    0.000    0.000    0.194    0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)                                                                                                                                                           
        3    0.000    0.000    0.191    0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)

What am I doing wrong with my threading.Thread and/or subprocess.Popen arrangement which is causing this issue?

dano
  • 91,354
  • 19
  • 222
  • 219
Alex Reynolds
  • 95,983
  • 54
  • 240
  • 345
  • 4
    It could be caused by the `.join()` call as written [here](http://hg.python.org/cpython/file/2.7/Lib/threading.py#l949) – Maciej Gol Jun 13 '14 at 08:11
  • Have you tried writing your code purely as subprocesses, with no threading? – mpez0 Jun 15 '14 at 16:40
  • Yes, and it locks up between the first two subprocesses. The point of using threads was to avoid I/O blocking. – Alex Reynolds Jun 15 '14 at 16:47
  • I'm not sure if I understand the three threads, do they only forward between the two streams that are passed as argument? In that case, putting the three executables into one subprocess call and chaining them (`exe1|exe2|exe3`) would be preferable. If that is what "locks up", there is something else broken. Also, as kroolik suggests, time spent waiting for another process/thread isn't really something you can optimize in the calling process, so those numbers could be misleading. – Ulrich Eckhardt Jun 15 '14 at 17:09
  • The functions I describe (`produceA`, `produceB` and `produceC`) modify the data between processes/threads. If I just needed to pipe from one executable to the next, I would have used a `bash` script and called it a day. Here, I need to modify the data using Python or any scripting or shell environment that provides encapsulation, data containers, etc. – Alex Reynolds Jun 15 '14 at 18:18
  • 3
    You are mis-reading the profiler output. cProfile sees that your *main thread* has to wait for the `.join()` calls to complete. The `416.424` total time is *just the main thread waiting* for the worker threads to do their work. The rest of your program is hardly doing work, as you are I/O bound (e.g. waiting for external processes to do their job). **How is this a problem**? – Martijn Pieters Jun 16 '14 at 11:21
  • 1
    The problem is that if I do something like `A|B|C` with, say, `awk` to simulate some of what the production functions do, the time taken to complete the task is greatly reduced. Something about this Python-based threading approach appears to consume a great deal more time than a shell-based or other approach. The problem is that Python scripting offers some features that would be difficult to emulate with a shell-based approach, but the performance makes the script unusable; my goal is to figure out why this is consuming so much time, or find an alternative way to manage standard I/O streams. – Alex Reynolds Jun 16 '14 at 17:57
  • 1
    Could you post the complete code? This is missing `produceA` `produceB` `produceC` functions. (Or at least, show it with dummy functions that pass through) – Alex I Jun 16 '14 at 22:34

5 Answers5

11

I think you are just being mislead by the way cProfile works. For example, here's a simple script that uses two threads:

#!/usr/bin/python

import threading
import time

def f():
    time.sleep(10)


def main():
    t = threading.Thread(target=f)
    t.start()
    t.join()

If I test this using cProfile, here's what I get:

>>> import test
>>> import cProfile
>>> cProfile.run('test.main()')
         60 function calls in 10.011 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   10.011   10.011 <string>:1(<module>)
        1    0.000    0.000   10.011   10.011 test.py:10(main)
        1    0.000    0.000    0.000    0.000 threading.py:1008(daemon)
        2    0.000    0.000    0.000    0.000 threading.py:1152(currentThread)
        2    0.000    0.000    0.000    0.000 threading.py:241(Condition)
        2    0.000    0.000    0.000    0.000 threading.py:259(__init__)
        2    0.000    0.000    0.000    0.000 threading.py:293(_release_save)
        2    0.000    0.000    0.000    0.000 threading.py:296(_acquire_restore)
        2    0.000    0.000    0.000    0.000 threading.py:299(_is_owned)
        2    0.000    0.000   10.011    5.005 threading.py:308(wait)
        1    0.000    0.000    0.000    0.000 threading.py:541(Event)
        1    0.000    0.000    0.000    0.000 threading.py:560(__init__)
        2    0.000    0.000    0.000    0.000 threading.py:569(isSet)
        4    0.000    0.000    0.000    0.000 threading.py:58(__init__)
        1    0.000    0.000    0.000    0.000 threading.py:602(wait)
        1    0.000    0.000    0.000    0.000 threading.py:627(_newname)
        5    0.000    0.000    0.000    0.000 threading.py:63(_note)
        1    0.000    0.000    0.000    0.000 threading.py:656(__init__)
        1    0.000    0.000    0.000    0.000 threading.py:709(_set_daemon)
        1    0.000    0.000    0.000    0.000 threading.py:726(start)
        1    0.000    0.000   10.010   10.010 threading.py:911(join)
       10   10.010    1.001   10.010    1.001 {method 'acquire' of 'thread.lock' objects}
        2    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        4    0.000    0.000    0.000    0.000 {method 'release' of 'thread.lock' objects}
        4    0.000    0.000    0.000    0.000 {thread.allocate_lock}
        2    0.000    0.000    0.000    0.000 {thread.get_ident}
        1    0.000    0.000    0.000    0.000 {thread.start_new_thread}

As you can see, it says that almost all of the time is spent acquiring locks. Of course, we know that's not really an accurate representation of what the script was doing. All the time was actually spent in a time.sleep call inside f(). The high tottime of the acquire call is just because join was waiting for f to finish, which means it had to sit and wait to acquire a lock. However, cProfile doesn't show any time being spent in f at all. We can clearly see what is actually happening because the example code is so simple, but in a more complicated program, this output is very misleading.

You can get more reliable results by using another profiling library, like yappi:

>>> import test
>>> import yappi
>>> yappi.set_clock_type("wall")
>>> yappi.start()
>>> test.main()
>>> yappi.get_func_stats().print_all()

Clock type: wall
Ordered by: totaltime, desc

name                                    #n         tsub      ttot      tavg
<stdin>:1 <module>                      2/1        0.000025  10.00801  5.004003
test.py:10 main                         1          0.000060  10.00798  10.00798
..2.7/threading.py:308 _Condition.wait  2          0.000188  10.00746  5.003731
..thon2.7/threading.py:911 Thread.join  1          0.000039  10.00706  10.00706
..ython2.7/threading.py:752 Thread.run  1          0.000024  10.00682  10.00682
test.py:6 f                             1          0.000013  10.00680  10.00680
..hon2.7/threading.py:726 Thread.start  1          0.000045  0.000608  0.000608
..thon2.7/threading.py:602 _Event.wait  1          0.000029  0.000484  0.000484
..2.7/threading.py:656 Thread.__init__  1          0.000064  0.000250  0.000250
..on2.7/threading.py:866 Thread.__stop  1          0.000025  0.000121  0.000121
..lib/python2.7/threading.py:541 Event  1          0.000011  0.000101  0.000101
..python2.7/threading.py:241 Condition  2          0.000025  0.000094  0.000047
..hreading.py:399 _Condition.notifyAll  1          0.000020  0.000090  0.000090
..2.7/threading.py:560 _Event.__init__  1          0.000018  0.000090  0.000090
..thon2.7/encodings/utf_8.py:15 decode  2          0.000031  0.000071  0.000035
..threading.py:259 _Condition.__init__  2          0.000064  0.000069  0.000034
..7/threading.py:372 _Condition.notify  1          0.000034  0.000068  0.000068
..hreading.py:299 _Condition._is_owned  3          0.000017  0.000040  0.000013
../threading.py:709 Thread._set_daemon  1          0.000018  0.000035  0.000035
..ding.py:293 _Condition._release_save  2          0.000019  0.000033  0.000016
..thon2.7/threading.py:63 Thread._note  7          0.000020  0.000020  0.000003
..n2.7/threading.py:1152 currentThread  2          0.000015  0.000019  0.000009
..g.py:296 _Condition._acquire_restore  2          0.000011  0.000017  0.000008
../python2.7/threading.py:627 _newname  1          0.000014  0.000014  0.000014
..n2.7/threading.py:58 Thread.__init__  4          0.000013  0.000013  0.000003
..threading.py:1008 _MainThread.daemon  1          0.000004  0.000004  0.000004
..hon2.7/threading.py:569 _Event.isSet  2          0.000003  0.000003  0.000002

With yappi, it's much easier to see that the time is being spent in f.

I suspect that you'll find that in reality, most of your script's time is spent doing whatever work is being done in produceA, produceB, and produceC.

dano
  • 91,354
  • 19
  • 222
  • 219
  • "I suspect that you'll find that in reality, most of your script's time is spent doing whatever work is being done in produceA, produceB, and produceC." - that is a good assumption, however, it happens to not be true. the total amount of cpu time in the OP's script (in one particular test) is ~6s, but it takes ~14s wall clock time to run (and <0.1s to just transfer data). So, while there are problems with the interpretation of profiler output, this really *is* unexpectedly slow. – Alex I Jun 23 '14 at 04:05
  • Hi @dano, how can I retrieve top 10 time consuming methods? I tried yappi.get_func_stats(yappi.SORTTYPE_TSUB, yappi.SORTORDER_DESC, 10).print_all() but I get an error that yappi.SORTTYPE_TSUB doesn't existt – Dejell Feb 14 '17 at 13:37
6

TL;DR If your program runs slower than expected, it is probably due to the details of what the intermediate functions do rather than due to IPC or threading. Test with mock functions and processes (as simple as possible) to isolate just the overhead of passing data to/from subprocesses. In a benchmark based closely on your code (below), the performance when passing data to/from subprocesses seems to be roughly equivalent to using shell pipes directly; python is not particularly slow at this task.

What is going on with the original code

The general form of the original code is:

def produceB(from_stream, to_stream):
    while True:
        buf = from_stream.read()
        processed_buf = do_expensive_calculation(buf)
        to_stream.write(processed_buf)

Here the calculation between read and write takes about 2/3 of the total cpu time of all processes (main and sub) combined - this is cpu time, not wall clock time btw.

I think that this prevents the I/O from running at full speed. Reads and writes and the calculation each need to have their own thread, with queues to provide buffering between the read and calculation and between the calculation and write (since the amount of buffering that pipes provide is insufficient, I believe).

I show below that if there is no processing in between read and write (or equivalently: if the intermediate processing is done in separate thread), then the throughput from threads + subprocess is very high. It is also possible to have separate threads for reads and writes; this adds a bit of overhead but makes writes not block reads and vice versa. Three threads (read, write and processing) is even better, then neither step blocks the others (within the limits of the queue sizes, of course).

Some benchmarks

All benchmarking below is on python 2.7.6 on Ubuntu 14.04LTS 64bit (Intel i7, Ivy Bridge, quad core). The test is to transfer approx 1GB of data in 4KB blocks between two dd processes, and pass the data through python as an intermediary. The dd processes use medium sized (4KB) blocks; typical text I/O would be smaller (unless it is cleverly buffered by the interpreter, etc), typical binary I/O would of course be much larger. I have one example based on exactly how you did this, and one example based on an alternate approach I had tried some time ago (which turns out to be slower). By the way, thanks for posting this question, it is useful.

Threads and blocking I/O

First, let's convert the original code in the question into a slightly simpler self-contained example. This is just two processes communicating with a thread that pumps data from one to the other, doing blocking reads and writes.

import subprocess, threading

A_process = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
B_process = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)

def convert_A_to_B(src, dst):
    read_size = 8*1024
    while True:
        try:
            buf = src.read(read_size)
            if len(buf) == 0:  # This is a bit hacky, but seems to reliably happen when the src is closed
                break
            dst.write(buf)
        except ValueError as e: # Reading or writing on a closed fd causes ValueError, not IOError
            print str(e)
            break

convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, args=(A_process.stdout, B_process.stdin))

convert_A_to_B_thread.start()

# Here, watch out for the exact sequence to clean things up
convert_A_to_B_thread.join()

A_process.wait()
B_process.stdin.close()
B_process.wait()

Results:

244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.638977 s, 1.6 GB/s
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.635499 s, 1.6 GB/s

real    0m0.678s
user    0m0.657s
sys 0m1.273s

Not bad! It turns out that the ideal read size in this case is roughly 8k-16KB, much smaller and much larger sizes are somewhat slower. This is probably related to the 4KB block size we asked dd to use.

Select and non-blocking I/O

When I was looking at this type of problem before, I headed in the direction of using select(), nonblocking I/O, and a single thread. An example of that is in my question here: How to read and write from subprocesses asynchronously?. That was for reading from two processes in parallel, which I have extended below to reading from one process and writing to another. The nonblocking writes are limited to PIPE_BUF or less in size, which is 4KB on my system; for simplicity, the reads are also 4KB although they could be any size. This has a few weird corner cases (and inexplicable hangs, depending on the details) but in the form below it works reliably.

import subprocess, select, fcntl, os, sys

p1 = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)

def make_nonblocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

make_nonblocking(p1.stdout)
make_nonblocking(p2.stdin)

print "PIPE_BUF = %d" % (select.PIPE_BUF)

read_size = select.PIPE_BUF
max_buf_len = 1 # For reasons which I have not debugged completely, this hangs sometimes when set > 1
bufs = []

while True:
    inputready, outputready, exceptready = select.select([ p1.stdout.fileno() ],[ p2.stdin.fileno() ],[]) 

    for fd in inputready: 
        if fd == p1.stdout.fileno():
            if len(bufs) < max_buf_len:
                data = p1.stdout.read(read_size)
                bufs.append(data)
    for fd in outputready: 
        if fd == p2.stdin.fileno() and len(bufs) > 0:
            data = bufs.pop(0)
            p2.stdin.write(data)

    p1.poll()
    # If the first process is done and there is nothing more to write out
    if p1.returncode != None and len(bufs) == 0:
        # Again cleanup is tricky.  We expect the second process to finish soon after its input is closed
        p2.stdin.close()
        p2.wait()
        p1.wait()
        break

Results:

PIPE_BUF = 4096
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 3.13722 s, 319 MB/s
244133+0 records in
244133+0 records out
999968768 bytes (1.0 GB) copied, 3.13599 s, 319 MB/s

real    0m3.167s
user    0m2.719s
sys 0m2.373s

This is however significantly slower than the version above (even if the read/write size is made 4KB in both for an apples-to-apples comparison). I'm not sure why.

P.S. Late addition: It appears that it is ok to ignore or exceed PIPE_BUF. This causes an IOError exception to be thrown much of the time from p2.stdin.write() (errno=11, temporarily unavailable), presumably when there is enough room in the pipe to write something, but less than the full size we are requesting. The same code above with read_size = 64*1024, and with that exception caught and ignored, runs at 1.4GB/s.

Pipe directly

Just as a baseline, how fast is it to run this using the shell version of pipes (in subprocess)? Let's have a look:

import subprocess
subprocess.call("dd if=/dev/zero bs=4k count=244140 | dd of=/dev/null bs=4k", shell=True)

Results:

244140+0 records in
244140+0 records out
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.425261 s, 2.4 GB/s
999997440 bytes (1.0 GB) copied, 0.423687 s, 2.4 GB/s

real    0m0.466s
user    0m0.300s
sys 0m0.590s

This is notably faster than the threaded python example. However, this is just one copy, while the threaded python version is doing two (into and out of python). Modifying the command to "dd if=/dev/zero bs=4k count=244140 | dd bs=4k | dd of=/dev/null bs=4k" bring the performance to 1.6GB, in line with the python example.

How to run a comparison in a complete system

Some additional thoughts on how to run a comparison in a complete system. Again for simplicity this is just two processes, and both scripts have the exact same convert_A_to_B() function.

Script 1: Pass data in python, as above

A_process = subprocess.Popen(["A", ...
B_process = subprocess.Popen(["B", ...
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, ...

Script 2: Comparison script, pass data in shell

convert_A_to_B(sys.stdin, sys.stdout)

run this in the shell with: A | python script_2.py | B

This allows an apples-to-apples comparison in a complete system, without using mock functions/processes.

How does block read size affect the results

For this test, the code from the first (threaded) example above is used, and both dd and the python script are set to use the same block size reads/writes.

| Block size | Throughput |
|------------|------------|
| 1KB | 249MB/s |
| 2KB | 416MB/s |
| 4KB | 552MB/s |
| 8KB | 1.4GB/s |
| 16KB | 1.8GB/s |
| 32KB | 2.9GB/s |
| 64KB | 3.0GB/s |
| 128KB | 1.0GB/s |
| 256KB | 600MB/s |

In theory there should be better performance with larger buffers (perhaps up to cache effects) but in practice Linux pipes slow down with very large buffers, even when using pure shell pipes.

Community
  • 1
  • 1
Alex I
  • 19,689
  • 9
  • 86
  • 158
  • Interestingly enough, my `convert_A_to_B` function equivalent is nearly identical to yours. It reads in data in 4 kB chunks. I changed the read size from 4 to 8 kB but this had no effect on my test results. – Alex Reynolds Jun 18 '14 at 23:43
  • @AlexReynolds: I should clarify that 8-16KB is only the optimum when both dd processes are set to use 4KB. I've added a comparison table above when using a range of sizes. – Alex I Jun 19 '14 at 01:36
  • @AlexReynolds: Could you please compare your code to plain shell pipes using the method I outline in the last part of the answer? It may be that you are already running at the same speed as shell pipes. If you would like a method faster than pipes, I can put something together (mmap ought to be much faster, for example). But my theory is that you are already at near shell pipe speeds, with minimal overhead due to doing things in a subprocess. – Alex I Jun 19 '14 at 01:55
  • ... that is, minimal overhead due to doing things in a subprocess launched from python, as opposed to separate processes and python interpreter both launched in a shell. – Alex I Jun 19 '14 at 07:10
  • @AlexReynolds: You mention in another comment that your test runs in 68 or 41 minutes. How much data does it transfer? (Pipes can do a few terabytes, shared memory a few tens of terabytes, in that amount of time) Could you post a small, complete runnable example (including the external programs, and the convert functions)? It is hard to come up with any way to speed it up without being able to actually run the test. – Alex I Jun 19 '14 at 08:14
  • The test input file I used in that test is ~750 MB in size. I would prefer not to post the full code at this time, mainly because I'm embarrassed by how slowly this runs and I don't want to publicize the code I've written until I have had a chance to fix it or rewrite it. I do appreciate the help you have provided so far, though. – Alex Reynolds Jun 19 '14 at 08:44
  • @AlexReynolds: Per my benchmarks, 750MB whould take several seconds to transfer over pipes on typical PC hardware in a naive implementation, or less than a second if implemented carefully. This leads me to think that the slowness you experience is not due to pipes or IPC or python's subprocess.Popen() at all, and is probably due to some particular detail of what your code does (which is not included in the code you posted - since that does run fast :). I would really like to have a shot at solving your problem, but I think you have not provided sufficient information to solve it. – Alex I Jun 20 '14 at 03:58
  • @AlexReynolds: You don't need to be embarrassed: it is probably some really obscure detail that is responsible for this, I think your code is overall solid. Just post a complete example (with the application-specific details removed), and if anyone makes it faster give them the bounty, problem solved :) – Alex I Jun 20 '14 at 04:01
  • Here is a link to the script I wrote. Threads and intermediate functions are not as clearly named as my example code in this question, but the general process of three chained binaries is identical. I'm linking to the line of code where the processes and threads are instantiated: https://github.com/bedops/bedops/blob/master/applications/bed/conversion/src/bam2starch.py#L902 – Alex Reynolds Jun 20 '14 at 07:28
  • @AlexReynolds: I see what is going on with your code. I added a new section at the top with explanation. I haven't quite gotten working faster example yet, maybe later today. – Alex I Jun 22 '14 at 13:00
  • @AlexReynolds: Looks like ~70% of the total cpu time for all processes combined is spent inside `convertSAMToBED()` (I tried some small-ish bam files, this is measured by having a non-threaded script that does just that part of the work: it is a reliable result, does not depend on any profiler issues :) This is called in between a read and a write, I think that is the problem. – Alex I Jun 22 '14 at 13:27
1

Your calls to subprocess.Popen() implicitly specify the default value of bufsize, 0, which forces unbuffered I/O. Try adding a reasonable buffer size (4K, 16K, even 1M) and see if it makes any difference.

Oleg
  • 1,037
  • 7
  • 13
  • By adding `bufsize=-1`, I was able to get the process time on a sample input from 68 minutes down to 41 minutes. It's an improvement, definitely — so you are the front-runner, so far, and anyone who uses `subprocess.Popen()` should know about the `bufsize` parameter — but the best solution (one we don't own, where my codebase is concerned, and written in C++) does this work in 4.5 minutes, so I still have some ways to go. I may start investigating `popen()` and `pthreads` in C. – Alex Reynolds Jun 18 '14 at 23:41
0

Since you talked about popen() and pthreads in comments, I guess you are under a POSIX system (maybe Linux). So did you try to use subprocess32 instead of the standard subprocess library.

Its use is strongly encouraged by the documentation and may lead to some improvment.

PS: I believe mixing forks (subprocess) and threads is a bad idea.

PS2: Why python produceA.py | A | python produceB.py | B | python produceC.py | C does not fit your needs ? Or its equivalent using subprocess ?

Lithy
  • 817
  • 12
  • 23
  • I'm after a one-script approach. – Alex Reynolds Jun 22 '14 at 02:21
  • @Lithy: "mixing forks (subprocess) and threads is a bad idea" - but the very first thing that the fork does is call exec (or rather execve) which forgets all the information that would have been protected by mutexes anyway. No, mixing forks+exec and threads is just fine. – Alex I Jun 22 '14 at 13:35
0

This scenario is particularly well suited for a pipeline, where parallelism is implicitly managed by the OS. Since you are after a one-script solution, here you are:

#! /usr/bin/python2

import sys
import subprocess
import pipes

# Define these as needed

def produceA(input, output):
    output.write(input.read())

def produceB(input, output):
    output.write(input.read())

def produceC(input, output):
    output.write(input.read())

# Magic starts here

COMMAND = "{me} prepare_A | A - | {me} A_to_B | B - | {me} B_to_C | C -"


def bootstrap(input, output):
    """Prepares and runs the pipeline."""
    me = "./{}".format(pipes.quote(__file__))
    subprocess.call(
        COMMAND.format(me=me), 
        stdin=input, stdout=output, shell=True, bufsize=-1
    )


if __name__ == '__main__':
    ACTIONS = {
        "prepare_A": produceA,
           "A_to_B": produceB,
           "B_to_C": produceC
    }

    action = ACTIONS[sys.argv[1]] if len(sys.argv) > 1 else bootstrap

    action(sys.stdin, sys.stdout)

This script will setup a pipeline or run of one of the produce functions, depending on the specified command.

Make it executable and run it without arguments:

./A_to_C.py < A.txt > C.txt

Note: it seems like you are using Python 2.6, so this solution is for Python 2.x, although it should run fine in Python 3.x, except that the quote function has been moved to shlex since Python 3.3

Stefano Sanfilippo
  • 32,265
  • 7
  • 79
  • 80
  • "multithreading does not give any perceivable advantage (every process/thread is waiting for some input from its predecessor)" - say what? every step in the pipeline can run simultaneously. In a pipeline A->B->C, C can work on unit 1 (the earliest to enter) while B works on 2 and A works on 3. Next, C works on 2, B works on 3, etc ... That triples the throughput on a machine with enough cores, if the steps take about the same amount of time. How is that not an advantage? – Alex I Jun 22 '14 at 13:32
  • No. Unless you have either buffers or async writing routines in place, if B is busy calculating (i.e. not reading), A will block until it can write, so until B reads. Anyway, that was not the main point, so I removed it. – Stefano Sanfilippo Jun 22 '14 at 13:57