I have a problem similar to the one mentioned in Python - How to parallel consume and operate on files in a directory.
Problem: I have 100k+ files in a directory. In my case process_file() takes a text file, does some processing and dumps an xml file.
Unlike in the above thread I want to run pool map with batches of files.
Reason for running in batches: Processing of each file takes on an average a minute. So it will take several days to finish processing of entire list of files. But as the files are getting processed, I want to start using the processed files for another program. For this I want to ensure that I have say first 100 files ready and then the next 100 and so on.
I have done the following:
- Sort the files in the directory.
inputFileArr
is the list of files. Run the program in batches:
for i in range(int(math.ceil(len(inputFileArr) * 1.0 / batch_size))): start_index = i * batch_size end_index = (i + 1) * batch_size print("Batch #{0}: {1}".format(i, inputFileArr[start_index:end_index])) p = Pool(n_process) p.map(process_file, inputFileArr[start_index:end_index]) print("Batch #{0} completed".format(i))
python documentation of pool.map mentions
It blocks until the result is ready.
I assumed that it means only after the processing of files batch #i gets over than batch #(i+1) will start.
But that doesn't seems to be the case. When I see the timestamp of the xml files generated, it shows that the ordering of batch is not maintained. I see that some of the files of a batch getting processed before files of previous batch. To be sure I had printed the filenames of each batch.
process_file()
This calls a python script using subprocess.Popen().
subprocess.Popen(command)
command contains something like python script.py input_args
And that python script calls a java program using subprocess.Popen()
Here's the code inside the python script which is called by my python code:
m_process = subprocess.Popen(command, stdout=subprocess.PIPE)
while m_process.poll() is None:
stdout = str(m_process.stdout.readline())
if 'ERROR' in stdout:
m_process.terminate()
error = stdout.rstrip()
output = str(output_file.read())
What should I do to ensure that my program processes in batch sequence?
Environment: Python 2.7