3

A few days back I answered a question on SO regarding reading a tar file in parallel.

This was the gist of the question:

import bz2
import tarfile
from multiprocessing import Pool

tr = tarfile.open('data.tar')

def clean_file(tar_file_entry):
    if '.bz2' not in str(tar_file_entry):
        return
    with tr.extractfile(tar_file_entry) as bz2_file:
        with bz2.open(bz2_file, "rt") as bzinput:
            # Reading bz2 file
            ....
            .... 


def process_serial():
    members = tr.getmembers()
    processed_files = []
    for i, member in enumerate(members):
        processed_files.append(clean_file(member))
        print(f'done {i}/{len(members)}')


def process_parallel():
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


def main():
    process_serial() # No error
    process_parallel() # Error


if __name__ == '__main__':
    main()

We were able to make the error disappear by just opening the tar file inside the child process rather than in the parent, as mentioned in the answer.

I am not able to understand why did this work.

Even if we open the tarfile in the parent process, the child process will get a new copy. So why does opening the tarfile in the child process explicitly make any difference?

Does this mean that in the first case, the child processes were somehow mutating the common tarfile object and causing memory corruption due to concurrent writes?

Anmol Singh Jaggi
  • 8,376
  • 4
  • 36
  • 77
  • 1
    `open` creates a file handle that it bound to the process. On UNIX like systems it is simply a number. That number does not mean the same for an other process. – Klaus D. Apr 17 '21 at 05:37
  • You can find an interesting post on the subject [here](https://stackoverflow.com/questions/909064/portable-way-to-pass-file-descriptor-between-different-processes) – XtianP Apr 17 '21 at 06:06
  • When I responded to your original question, I posted code that showed how you can initialize each process in the pool to open the tarfile like you are attempting to do above so that it is only opened once by each process in the pool rather than for each member that is being extracted. Did you ever try running the code? – Booboo Apr 18 '21 at 12:59
  • @Booboo I am not the one who asked that question. I am the one who answered it. I tried your answer and it worked fine. Actually your and my answer is the same fundamentally. – Anmol Singh Jaggi Apr 18 '21 at 16:02
  • @AnmolSinghJaggi My apologies. In my answer to that question I had conjectured that moving `tr = tarfile.open('data.tar')` *above* function `clean_file` *might* work but I couldn't be sure. So you actually tried this presumably with your own tar file (you wouldn't have had access to the OP's **data.tar** file). This code certainly works on my Windows platform (except I did a `bz2_file.read()` instead of `bz2.open(bz2_file, "rt")`) because my members are not compressed. – Booboo Apr 18 '21 at 16:35
  • Why would I not have access to OP's data.tar file? The link is there in the question. – Anmol Singh Jaggi Apr 18 '21 at 16:40
  • 1
    @AnmolSinghJaggi I seemed to have missed that. It occurs to me that just as an OP is supposed to specify what language is being used when asking a question tagged with `regex`, an OP should specify what platform is being used when posting a question tagged with `multiprocessing`. My previous comment applied to platforms that used `spawn`, such as Windows. In my answer to the original question, I had also recommended that the OP use `spawn`. – Booboo Apr 18 '21 at 18:07

1 Answers1

1

FWIW, the answer in the comments wrt open is actually incorrect on UNIX-like systems regarding file handle numbers.

If multiprocessing uses fork() (which it does under Linux and similar, although I read there was an issue with forking on macOS), the file handles and everything else are happily copied to child processes (by "happily" I mean it's complicated in many edge cases such as forking threads, but still it works fine for file handles).

The following works fine for me:

import multiprocessing

this = open(__file__, 'r')


def read_file():
    print(len(this.read()))


def main():
    process = multiprocessing.Process(target=read_file)
    process.start()
    process.join()


if __name__ == '__main__':
    main()

The problem is likely that tarfile has an internal structure and/or buffering while reading, also you can simply run into conflicts by trying to seek and read different parts of the same archive simultaneously. I.e., I'm speculating that using a threadpool without any synchronization is likely to run into exactly the same issues in this case.

Edit: to clarify, extracting a file from a Tar archive is likely (I haven't checked the exact details) done as follows: (1) seek to the offset of the encapsulated part (file), (2) read a chunk of the encapsulated file, write the chunk to the destination file (or pipe, or w/e), (3) repeat (2) until the whole file is extracted.

By attempting to do this in a non-synchronized way from parallel processes using the same file handle, will likely result in mixing of these steps, i.e. starting to process file #2 will seek away from file #1, while we are in the middle of reading file #1, etc.

Edit2 answering the comment below: Memory representation is forked afresh for child processes, that's true; but resources managed on the kernel side (such as file handles, and kernel buffers) are shared.

To illustrate:

import multiprocessing

this = open(__file__, 'rb')


def read_file(worker):
    print(worker, this.read(80))


def main():
    processes = []

    for number in (1, 2):
        processes.append(
            multiprocessing.Process(target=read_file, args=(number,)))

    for process in processes:
        process.start()
    for process in processes:
        process.join()


if __name__ == '__main__':
    main()

Running this on Linux I get:

$ python3.8 test.py 
1 b"import multiprocessing\n\nthis = open(__file__, 'rb')\n\n\ndef read_file(worker):\n   "
2 b''

If seeking and reading were independent, both processes would print an identical result, but they don't. Since this is a small file, and Python opts to buffer a small amount of data (8 KiB), the first process reads to the EOF, and the second process has no data left to read (unless it of course seeks back).

Vytas
  • 754
  • 5
  • 14
  • But the memory representation of tarfile should have been copied afresh to every child process; so how will one seek interfere with another seek? Do you means the actual tarfile on disk? In that case, doesn't OSX (or any modern OS) guarantee concurrent read capability for a single file by multiple processes? Infact, that's why the second program worked without errors! – Anmol Singh Jaggi Apr 17 '21 at 17:35
  • @AnmolSinghJaggi see the updated answer; I mean the actual source tarfile on disk that is shared among the workers competing for read and seek operations. When answering, I did assume Unix Python that chooses to fork(), my answer may be invalid on macOS: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods – Vytas Apr 17 '21 at 18:34
  • You are right. When we use the fork method, file offsets are somehow shared among processes. However, under spawn, we get 2 completely distinct file handles. If you write `multiprocessing.set_start_method('spawn')` at the top of the program, you'll notice the different output. – Anmol Singh Jaggi Apr 18 '21 at 15:54