I switched from using open
to gzip.open
in some python code using multiprocessing
, and files that weren't empty in the former version of the code now end up being empty. Is this a known issue? I'm unable to find information about similar problems on the internet.
My code is inspired by this solution except I use imap_unordered
to set the workers to work instead of apply_async
in a loop.
I'll try to make a minimum working example and add it to this post if necessary, but in case the problem is well known, I already give a verbal description of the situation:
I have a working python program in which computations are happening in a multiprocessing.Pool
of workers using the pool's imap_unordered
method.
These computations have to write data to some common files. This is achieved using communications through a multiprocessing.Manager.Queue
. The worker functions take this queue as argument and send information to it using the queue's put
method.
A "writer" function takes the queue as argument and a bunch of file paths, uses the paths to open
files in w
mode. Based on information received through the queue's get
method, things are written to one of the files.
The "writer" function and its list of arguments are passed to the pool's apply_async
method.
All this seems to work correctly, and I obtain files with things written inside.
Now I want to write this in a compressed form using gzip. I simply used gzip.open
instead of open
, and opened the files in wb
mode. Apart from this and the fact that I added a ".gz" suffix to my file paths, everything is the same.
The program runs with no error messages, but I end up with empty file.
Is the gzip module not usable with multiprocessing ?
Edit: Code example
#!/usr/bin/env python3
from multiprocessing import Manager, Pool, cpu_count
import time
from gzip import open as gzopen
def writer(queue, path1, path2):
with gzopen(path1, "wb") as f1, gzopen(path2, "wb") as f2:
while True:
(where, what) = queue.get()
print("I'm a writer. I have to write:\n%s to %s" % (what, where))
if where == "out1":
f1.write(what)
elif where == "out2":
f2.write(what)
else:
print("flushing files")
f1.flush()
f2.flush()
break
def do_divmod(num_and_queue):
(num, queue) = num_and_queue
q, r = divmod(num, 2)
time.sleep(1)
if r:
queue.put(("out2", "q: %d\n" % q))
else:
queue.put(("out1", "q: %d\n" % q))
time.sleep(1)
return (num, q, r)
def main():
with Manager() as mgr, Pool(processes=cpu_count() - 2) as pool:
write_queue = mgr.Queue()
pool.apply_async(writer, (write_queue, "/tmp/out1.txt.gz", "/tmp/out2.txt.gz"))
for (n, q, r) in pool.imap_unordered(
do_divmod,
((number, write_queue) for number in range(25))):
print("%d %% 2 = %d" % (n, r))
print("%d / 2 = %d" % (n, q))
write_queue.put(("", ""))
if __name__ == "__main__":
main()
Running the above code results in empty /tmp/out1.txt.gz
and /tmp/out2.txt.gz
.
I have to say that I'm having trouble getting the non-gzip version to work as well: In both cases, the print("I'm a writer. I have to write:\n%s to %s" % (what, where))
seems to be executed only once:
$ ./test_multiprocessing.py
I'm a writer. I have to write:
q: 0
to out2
1 % 2 = 1
1 / 2 = 0
10 % 2 = 0
10 / 2 = 5
3 % 2 = 1
3 / 2 = 1
6 % 2 = 0
6 / 2 = 3
7 % 2 = 1
7 / 2 = 3
4 % 2 = 0
4 / 2 = 2
5 % 2 = 1
5 / 2 = 2
0 % 2 = 0
0 / 2 = 0
11 % 2 = 1
11 / 2 = 5
8 % 2 = 0
8 / 2 = 4
12 % 2 = 0
12 / 2 = 6
9 % 2 = 1
9 / 2 = 4
2 % 2 = 0
2 / 2 = 1
13 % 2 = 1
13 / 2 = 6
14 % 2 = 0
14 / 2 = 7
15 % 2 = 1
15 / 2 = 7
16 % 2 = 0
16 / 2 = 8
17 % 2 = 1
17 / 2 = 8
18 % 2 = 0
18 / 2 = 9
19 % 2 = 1
19 / 2 = 9
20 % 2 = 0
20 / 2 = 10
21 % 2 = 1
21 / 2 = 10
22 % 2 = 0
22 / 2 = 11
23 % 2 = 1
23 / 2 = 11
24 % 2 = 0
24 / 2 = 12
But at least, when the non-gzip version says it is writing something to a file, there is something in the file.