0

I switched from using open to gzip.open in some python code using multiprocessing, and files that weren't empty in the former version of the code now end up being empty. Is this a known issue? I'm unable to find information about similar problems on the internet.

My code is inspired by this solution except I use imap_unordered to set the workers to work instead of apply_async in a loop.

I'll try to make a minimum working example and add it to this post if necessary, but in case the problem is well known, I already give a verbal description of the situation:

I have a working python program in which computations are happening in a multiprocessing.Pool of workers using the pool's imap_unordered method.

These computations have to write data to some common files. This is achieved using communications through a multiprocessing.Manager.Queue. The worker functions take this queue as argument and send information to it using the queue's put method.

A "writer" function takes the queue as argument and a bunch of file paths, uses the paths to open files in w mode. Based on information received through the queue's get method, things are written to one of the files.

The "writer" function and its list of arguments are passed to the pool's apply_async method.

All this seems to work correctly, and I obtain files with things written inside.

Now I want to write this in a compressed form using gzip. I simply used gzip.open instead of open, and opened the files in wb mode. Apart from this and the fact that I added a ".gz" suffix to my file paths, everything is the same.

The program runs with no error messages, but I end up with empty file.

Is the gzip module not usable with multiprocessing ?


Edit: Code example

#!/usr/bin/env python3

from multiprocessing import Manager, Pool, cpu_count
import time
from gzip import open as gzopen

def writer(queue, path1, path2):
    with gzopen(path1, "wb") as f1, gzopen(path2, "wb") as f2:
        while True:
            (where, what) = queue.get()
            print("I'm a writer. I have to write:\n%s to %s" % (what, where))
            if where == "out1":
                f1.write(what)
            elif where == "out2":
                f2.write(what)
            else:
                print("flushing files")
                f1.flush()
                f2.flush()
            break

def do_divmod(num_and_queue):
    (num, queue) = num_and_queue
    q, r = divmod(num, 2)
    time.sleep(1)
    if r:
        queue.put(("out2", "q: %d\n" % q))
    else:
        queue.put(("out1", "q: %d\n" % q))
    time.sleep(1)
    return (num, q, r)

def main():
    with Manager() as mgr, Pool(processes=cpu_count() - 2) as pool:
        write_queue = mgr.Queue()
        pool.apply_async(writer, (write_queue, "/tmp/out1.txt.gz", "/tmp/out2.txt.gz"))
        for (n, q, r) in pool.imap_unordered(
            do_divmod,
            ((number, write_queue) for number in range(25))):
            print("%d %% 2 = %d" % (n, r))
            print("%d / 2 = %d" % (n, q))
        write_queue.put(("", ""))

if __name__ == "__main__":
   main()

Running the above code results in empty /tmp/out1.txt.gz and /tmp/out2.txt.gz.

I have to say that I'm having trouble getting the non-gzip version to work as well: In both cases, the print("I'm a writer. I have to write:\n%s to %s" % (what, where)) seems to be executed only once:

$ ./test_multiprocessing.py 
I'm a writer. I have to write:
q: 0
 to out2
1 % 2 = 1
1 / 2 = 0
10 % 2 = 0
10 / 2 = 5
3 % 2 = 1
3 / 2 = 1
6 % 2 = 0
6 / 2 = 3
7 % 2 = 1
7 / 2 = 3
4 % 2 = 0
4 / 2 = 2
5 % 2 = 1
5 / 2 = 2
0 % 2 = 0
0 / 2 = 0
11 % 2 = 1
11 / 2 = 5
8 % 2 = 0
8 / 2 = 4
12 % 2 = 0
12 / 2 = 6
9 % 2 = 1
9 / 2 = 4
2 % 2 = 0
2 / 2 = 1
13 % 2 = 1
13 / 2 = 6
14 % 2 = 0
14 / 2 = 7
15 % 2 = 1
15 / 2 = 7
16 % 2 = 0
16 / 2 = 8
17 % 2 = 1
17 / 2 = 8
18 % 2 = 0
18 / 2 = 9
19 % 2 = 1
19 / 2 = 9
20 % 2 = 0
20 / 2 = 10
21 % 2 = 1
21 / 2 = 10
22 % 2 = 0
22 / 2 = 11
23 % 2 = 1
23 / 2 = 11
24 % 2 = 0
24 / 2 = 12

But at least, when the non-gzip version says it is writing something to a file, there is something in the file.

Community
  • 1
  • 1
bli
  • 7,549
  • 7
  • 48
  • 94

1 Answers1

0

I tried some modifications based on examples found in the documentation of the multiprocessing module. It seems that I somehow can force the gzipped files to get written by using the get method of the thing returned by apply_async:

#!/usr/bin/env python3

from multiprocessing import Manager, Pool, cpu_count
import time
from gzip import open as gzopen

def writer(queue, path1, path2):
    with gzopen(path1, "wb") as f1, gzopen(path2, "wb") as f2:
        while True:
            (where, what) = queue.get()
            print("I'm a writer. I have to write:\n%s to %s" % (what, where))
            # The encode seems necessary when things are actually written
            # (not necessary with files obtained with the normal open)
            if where == "out1":
                f1.write(what.encode())
            elif where == "out2":
                f2.write(what.encode())
            else:
                print("flushing files")
                f1.flush()
                f2.flush()
            break

def do_divmod(num_and_queue):
    (num, queue) = num_and_queue
    q, r = divmod(num, 2)
    time.sleep(1)
    if r:
        queue.put(("out2", "q: %d\n" % q))
    else:
        queue.put(("out1", "q: %d\n" % q))
    time.sleep(1)
    return (num, q, r)

def main():
    with Manager() as mgr, Pool(processes=cpu_count() - 2) as pool:
        write_queue = mgr.Queue()
        # getting the "result object"
        writing = pool.apply_async(writer, (write_queue, "/tmp/out1.txt.gz", "/tmp/out2.txt.gz"))
        for (n, q, r) in pool.imap_unordered(
                do_divmod,
                ((number, write_queue) for number in range(25))):
            print("%d %% 2 = %d" % (n, r))
            print("%d / 2 = %d" % (n, q))
        write_queue.put(("", ""))
        # Magic command to force the writing of the gzipped files
        # (not necessary when files are obtained through normal open)
        writing.get(timeout=1)

if __name__ == "__main__":
   main()

I have no idea why this works, and what is actually the thing that apply_async returns. The documentation doesn't say more that this "returns a result object". More explanations are welcome.

Note that the above code is still bugged. It solves the initial problem of gzipped files being empty but not the problem of the writer apparently doing some write job only once.


Edit: Further tests

It turns out that getting the result of pool.apply_async and running get on it is actually not the only way to get the gzipped files written. Changing what to what.encode() also forces the writing:

#!/usr/bin/env python3

from multiprocessing import Manager, Pool, cpu_count
import time
from gzip import open as gzopen

def writer(queue, path1, path2):
    with gzopen(path1, "wb") as f1, gzopen(path2, "wb") as f2:
        while True:
            (where, what) = queue.get()
            print("I'm a writer. I have to write:\n%s to %s" % (what, where))
            if where == "out1":
                # The encode method call seems to force the writing
                f1.write(what.encode())
                #f1.write(what)
            elif where == "out2":
                # The encode method call seems to force the writing
                f2.write(what.encode())
                #f2.write(what)
            else:
                print("flushing files")
                f1.flush()
                f2.flush()
            break

def do_divmod(num_and_queue):
    (num, queue) = num_and_queue
    q, r = divmod(num, 2)
    time.sleep(1)
    if r:
        queue.put(("out2", "q: %d\n" % q))
    else:
        queue.put(("out1", "q: %d\n" % q))
    time.sleep(1)
    return (num, q, r)

def main():
    with Manager() as mgr, Pool(processes=cpu_count() - 2) as pool:
        write_queue = mgr.Queue()
        #writing = pool.apply_async(writer, (write_queue, "/tmp/out1.txt.gz", "/tmp/out2.txt.gz"))
        pool.apply_async(writer, (write_queue, "/tmp/out1.txt.gz", "/tmp/out2.txt.gz"))
        for (n, q, r) in pool.imap_unordered(
            do_divmod,
            ((number, write_queue) for number in range(25))):
            print("%d %% 2 = %d" % (n, r))
            print("%d / 2 = %d" % (n, q))
        write_queue.put(("", ""))
        #writing.get(timeout=1)

if __name__ == "__main__":
   main()

I'm still clueless about what is happening...

bli
  • 7,549
  • 7
  • 48
  • 94
  • 1
    `class multiprocessing.pool.AsyncResult` The class of the result returned by `Pool.apply_async()` and `Pool.map_async()`. https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult – stovfl Mar 13 '17 at 21:29
  • @stovfl Thanks for the documentation. How is it that gzipped files in the `writer` function do not get written unless `get` is called, whereas normal files do? – bli Mar 14 '17 at 08:08
  • `Compress.flush([mode])` ... pending input is processed, ... mode ..., defaulting to Z_FINISH. ...prevents compressing any more data. ... the compress() method cannot be called again; the only realistic action is to delete the object. https://docs.python.org/3.5/library/zlib.html#zlib.Compress.flush – stovfl Mar 14 '17 at 09:38