3

I have a tarfile containing bz2-compressed files. I want to apply the function clean_file to each of the bz2 files, and collate the results. In series, this is easy with a loop:

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import Pool

def clean_file(member):
    if '.bz2' in str(member):

        f = tr.extractfile(member)

        with bz2.open(f, "rt") as bzinput:
            dicts = []
            for i, line in enumerate(bzinput):
                line = line.replace('"name"}', '"name":" "}')
                dat = json.loads(line)
                dicts.append(dat)

        bzinput.close()
        f.close()
        del f, bzinput

        processed = dicts[0]
        return processed

    else:
        pass


# Open tar file and get contents (members)
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)


# Apply the clean_file function in series
i=0
processed_files = []
for m in members:
    processed_files.append(clean_file(m))
    i+=1
    print('done '+str(i)+'/'+str(num_files))
    

However, I need to be able to do this in parallel. The method I'm trying uses Pool like so:

# Apply the clean_file function in parallel
if __name__ == '__main__':
   with Pool(2) as p:
      processed_files = list(p.map(clean_file, members))

But this returns an OSError:

Traceback (most recent call last):
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "parse_data.py", line 19, in clean_file
    for i, line in enumerate(bzinput):
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/bz2.py", line 195, in read1
    return self._buffer.read1(size)
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 68, in readinto
    data = self.read(len(byte_view))
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 103, in read
    data = self._decompressor.decompress(rawblock, size)
OSError: Invalid data stream
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "parse_data.py", line 53, in <module>
    processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/site-packages/tqdm/std.py", line 1167, in __iter__
    for obj in iterable:
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 735, in next
    raise value
OSError: Invalid data stream

So I guess this way isn't properly accessing the files from within data.tar or something. How can I apply the function in parallel?

I'm guessing this will work with any tar archive containing bz2 files but here's my data to reproduce the error: https://github.com/johnf1004/reproduce_tar_error

John F
  • 994
  • 10
  • 26
  • Please add the entire traceback to your qustion (not just the `OSError`). When asking questions here, you should provide a [mre] which means getting rid of extraneous stuff like the use of `tqdm` in the sample code. – martineau Apr 11 '21 at 16:57

2 Answers2

3

You didn't specify what platform you are running on but I suspect that it is Windows because you have ...

if __name__ == '__main__':
    main()

... which would be required for code that creates processes on platforms that use OS function spawn for creating new processes. But that also means that when a new process is created (e.g. all the processes in the process pool you are creating), each process begins by re-executing the source program from the very top of the program. This means that the following code is being executed by each pool process:

tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)

However, I don't see why this would in itself cause an error, but I can't be sure. The problem may be, however, that this is executing after the call to your worker function, clean_file is being called and so tr has not been set. If this code preceded clean_file it might work, but this is just a guess. Certainly extracting the members with members = tr.getmembers() in each pool process is wasteful. Each process needs to open the tar file, ideally just once.

But what is clear is that the stacktrace you published does not match your code. You show:

Traceback (most recent call last):
  File "parse_data.py", line 53, in <module>
    processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))

Yet your code does not have any reference to tqdm or using method imap. Now it becomes more difficult to analyze what your problem actually is when the code you post doesn't quite match the code that produces the exception.

On the off-chance you are running on a Mac, which might be using fork to create new processes, this can be problematic when the main process has created multiple threads (which you don't necessarily see, perhaps by the tarfile module) and you then create a new process, I have specified code to ensure that spawn is used to create new processes. Anyway, the following code should work. It also introduces a few optimizations. If it doesn't, please post a new stacktrace.

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import get_context

def open_tar():
    # open once for each process in the pool
    global tr
    tr = tarfile.open('data.tar')

def clean_file(member):
    f = tr.extractfile(member)

    with bz2.open(f, "rt") as bzinput:
        for line in bzinput:
            line = line.replace('"name"}', '"name":" "}')
            dat = json.loads(line)
            # since you are returning just the first occurrence:
            return dat

def main():
    with tarfile.open('data.tar') as tr:
        members = tr.getmembers()
    # just pick members where '.bz2' is in member:
    filtered_members = filter(lambda member: '.bz2' in str(member), members)
    ctx = get_context('spawn')
    # open tar file just once for each process in the pool:
    with ctx.Pool(initializer=open_tar) as pool:
        processed_files = pool.map(clean_file, filtered_members)
        print(processed_files)

# required for when processes are created using spawn:
if __name__ == '__main__':
    main()
Booboo
  • 38,656
  • 3
  • 37
  • 60
1

It seems some race condition was happening. Opening the tar file separately in every child process solves the issue:

import json
import bz2
import tarfile
import logging
from multiprocessing import Pool


def clean_file(member):
    if '.bz2' not in str(member):
        return
    try:
        with tarfile.open('data.tar') as tr:
            with tr.extractfile(member) as bz2_file:
                with bz2.open(bz2_file, "rt") as bzinput:
                    dicts = []
                    for i, line in enumerate(bzinput):
                        line = line.replace('"name"}', '"name":" "}')
                        dat = json.loads(line)
                        dicts.append(dat)
                        return dicts[0]
    except Exception:
        logging.exception(f"Error while processing {member}")


def process_serial():
    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    processed_files = []
    for i, member in enumerate(members):
        processed_files.append(clean_file(member))
        print(f'done {i}/{len(members)}')


def process_parallel():
    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


def main():
    process_parallel()


if __name__ == '__main__':
    main()

EDIT:

Note that another way to solve this problem is to just use the spawn start method:

multiprocessing.set_start_method('spawn')

By doing this, we are instructing Python to "deep-copy" file handles in child processes. Under the default "fork" start method, the file handles of parent and child share the same offsets.

Anmol Singh Jaggi
  • 8,376
  • 4
  • 36
  • 77