0

I have a problem similar to the one mentioned in Python - How to parallel consume and operate on files in a directory.

Problem: I have 100k+ files in a directory. In my case process_file() takes a text file, does some processing and dumps an xml file.

Unlike in the above thread I want to run pool map with batches of files.

Reason for running in batches: Processing of each file takes on an average a minute. So it will take several days to finish processing of entire list of files. But as the files are getting processed, I want to start using the processed files for another program. For this I want to ensure that I have say first 100 files ready and then the next 100 and so on.

I have done the following:

  1. Sort the files in the directory. inputFileArr is the list of files.
  2. Run the program in batches:

    for i in range(int(math.ceil(len(inputFileArr) * 1.0 / batch_size))):
    
     start_index = i * batch_size
     end_index = (i + 1) * batch_size
     print("Batch #{0}: {1}".format(i, inputFileArr[start_index:end_index]))
    
     p = Pool(n_process)
     p.map(process_file, inputFileArr[start_index:end_index])
     print("Batch #{0} completed".format(i))
    

python documentation of pool.map mentions

It blocks until the result is ready.

I assumed that it means only after the processing of files batch #i gets over than batch #(i+1) will start.

But that doesn't seems to be the case. When I see the timestamp of the xml files generated, it shows that the ordering of batch is not maintained. I see that some of the files of a batch getting processed before files of previous batch. To be sure I had printed the filenames of each batch.

process_file()

  1. This calls a python script using subprocess.Popen().

    subprocess.Popen(command)

    command contains something like python script.py input_args

  2. And that python script calls a java program using subprocess.Popen()

Here's the code inside the python script which is called by my python code:

        m_process = subprocess.Popen(command, stdout=subprocess.PIPE)
        while m_process.poll() is None:
            stdout = str(m_process.stdout.readline())
            if 'ERROR' in stdout:
                m_process.terminate()
                error = stdout.rstrip()
        output = str(output_file.read())

What should I do to ensure that my program processes in batch sequence?

Environment: Python 2.7

Kaushik Acharya
  • 1,520
  • 2
  • 16
  • 25

2 Answers2

0

EDIT: Old answer below, new answer at the top

It's a bit inefficient to wait for the first 100 files to be done and only then do the next (because you could have started processing the next ones while you have idle workers when the last files in the batch are running).

Nevertheless, if you really want the processing to continue to the next 100 only after the first 100 are done, just call the map on a batch of 100 files at a time.

files = sorted(...)
for i in range(0, len(files), 100):
    pool.map(files[i:i+100])

Depending on how many workers you have, I suggest to increase the batch size to be more than 100, to reduce the time in which you have idle workers (as detailed above).


Assuming you just want groups of 100 consecutive files, but not necessarily from the beginning, you can try the following.

By the math suggested, I'd say that you can divide the files into groups of 100, and then process each group in a separate worker (So the parallelization is on groups, but once each group is done, you know 100 consecutive files are processed).

files = sorted(...)
file_groups = [[files[i + j] for j in range(min(100, len(files) - i))]
               for i in range(0, len(files), 100]

def process_batch(batch):
    group_index, group_files = batch
    for f in group_files:
        process_file(f)
    print('Group %d is done' % group_index)

pool.map(process_batch, enumerate(file_groups))

Assuming you just want groups of 100 consecutive files, but not necessarily from the beginning, you can try the following.

By the math suggested, I'd say that you can divide the files into groups of 100, and then process each group in a separate worker (So the parallelization is on groups, but once each group is done, you know 100 consecutive files are processed).

files = sorted(...)
file_groups = [[files[i + j] for j in range(min(100, len(files) - i))]
               for i in range(0, len(files), 100]

def process_batch(batch):
    group_index, group_files = batch
    for f in group_files:
        process_file(f)
    print('Group %d is done' % group_index)

pool.map(process_batch, enumerate(file_groups))
Barak Itkin
  • 4,872
  • 1
  • 22
  • 29
  • I would want to have the groups done from beginning i.e. 1st process initial 100 files. Once that is complete then process next 100 files and so on. – Kaushik Acharya Jan 12 '19 at 15:33
  • @KaushikAcharya - See modified answer. This is essentially similar to your code and should work, if it doesn't then you have a different problem – Barak Itkin Jan 12 '19 at 15:41
  • You are creating the pool object inside the for loop as I have done? Updated my question with the code I am currently using. – Kaushik Acharya Jan 12 '19 at 15:44
  • I'm not creating my pool inside the loop (as can be seen in the code, I'm not even creating it - I assume it was already created before that). Will look at the provided code in a second – Barak Itkin Jan 12 '19 at 15:51
  • Your code seems to be OK up to indentation. Consider using `pool.apply` and not `pool.map` if you don't need the return value from the processing function (i.e. if each call to to `process_file` writes the output to disk). If you still have issues with file timestamps, make sure you are closing the files explicitly instead of having them closed by the garbage collector – Barak Itkin Jan 12 '19 at 15:55
  • **process_file** does not return any value. Inside this function I execute a python script using subprocess.Popen. This python script internally executes a java program which writes the output in xml file. "It blocks until the result is ready." I hope my understanding of this statement for **pool.map** is correct? – Kaushik Acharya Jan 12 '19 at 17:29
  • **java** program prints the input arguments. Input text filename is one of the argument. In these print statements also, I can see that some files of batch #2 coming in between the files of batch #1. – Kaushik Acharya Jan 12 '19 at 17:41
  • Are you using wait on the `Popen` call? – Barak Itkin Jan 12 '19 at 20:10
  • 1. I am calling the python script with subprocess.Popen() from my process_file() 2. I have updated my question with details of process_file(). – Kaushik Acharya Jan 13 '19 at 08:39
0

Resolved the issue by replacing subprocess.Popen(command) with subprocess.call(command).

Thanks to @Barak Itkin for the help and pointing out to using wait. Followed the solution(using subprocess.call) provided in Python popen command. Wait until the command is finished

Mentioned the solution here in case any other user faces a similar issue.

Kaushik Acharya
  • 1,520
  • 2
  • 16
  • 25