35

I have the following code that writes the md5sums to a logfile

for file in files_output:
    p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
  1. Will these be written in parallel? i.e. if md5sum takes a long time for one of the files, will another one be started before waiting for a previous one to complete?

  2. If the answer to the above is yes, can I assume the order of the md5sums written to logfile may differ based upon how long md5sum takes for each file? (some files can be huge, some small)

imagineerThat
  • 5,293
  • 7
  • 42
  • 78

3 Answers3

30
  1. Yes, these md5sum processes will be started in parallel.
  2. Yes, the order of md5sums writes will be unpredictable. And generally it is considered a bad practice to share a single resource like file from many processes this way.

Also your way of making p.wait() after the for loop will wait just for the last of md5sum processes to finish and the rest of them might still be running.

But you can modify this code slightly to still have benefits of parallel processing and predictability of synchronized output if you collect the md5sum output into temporary files and collect it back into one file once all processes are done.

import subprocess
import os

processes = []
for file in files_output:
    f = os.tmpfile()
    p = subprocess.Popen(['md5sum',file],stdout=f)
    processes.append((p, f))

for p, f in processes:
    p.wait()
    f.seek(0)
    logfile.write(f.read())
    f.close()
dkz
  • 921
  • 6
  • 10
  • So I guess the ordering here is preserved because processes[] keeps track of it? ie. process.append((p,f)) executes before md5sum finishes, in the order of files_output. – imagineerThat May 09 '13 at 00:46
  • 2
    Yes, `processes[]` will preserve the original order of `files_output[]` and makes sure every md5sum process is finished. But if you're concerned about OS' resources, you should consider thread pool with task queue and synchronous md5sum run in each thread with `subprocess.check_output()` as @Alfe proposed. – dkz May 09 '13 at 01:20
25

All sub processes are run in parallel. (To avoid this one has to wait explicitly for their completion.) They even can write into the log file at the same time, thus garbling the output. To avoid this you should let each process write into a different logfile and collect all outputs when all processes are finished.

q = Queue.Queue()
result = {}  # used to store the results
for fileName in fileNames:
  q.put(fileName)

def worker():
  while True:
    fileName = q.get()
    if fileName is None:  # Sentinel?
      return
    subprocess_stuff_using(fileName)
    wait_for_finishing_subprocess()
    checksum = collect_md5_result_for(fileName)
    result[fileName] = checksum  # store it

threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
  thread.start()
  q.put(None)  # one Sentinel marker for each thread

After this the results should be stored in result.

Alfe
  • 56,346
  • 20
  • 107
  • 159
  • Thank you. However, I have 1000's of md5sums. I'd rather not open a separate file for each. – imagineerThat May 08 '13 at 22:21
  • 4
    No, you shouldn't. Create a `Queue.Queue` and a thread pool of some dozens of threads, let each thread read an element from the queue and start a subprocess for this element, wait for the completion of this subprocess, get the result (the md5 checksum), store the result in a mapping. If the queue is empty, the threads should terminate. – Alfe May 08 '13 at 22:27
  • New to Python still. Do I need to use Queue.Queue to write to a mapping simultaneously? If not, what does Queue.Queue do for me? – imagineerThat May 08 '13 at 22:36
  • Python dicts are thread-safe (in fact all python data structures are which in turn is also a performance problem Python sometimes suffers from, see Global Interpreter Lock for details). No Queue.Queue necessary for writing into one. – Alfe May 08 '13 at 22:40
  • `Queue.Queue` is a thread-safe queue which you can use to pass a list of values from one thread to a list of other threads. It ensures that each value is passed to exactly one thread etc. – Alfe May 08 '13 at 22:42
  • I tried this implementation and the md5's return in reverse order of fileNames list. I can simply reverse the list of the filesName but I'd like to know what is going on. – imagineerThat May 15 '13 at 22:58
  • 2
    I don't know without having a look at your code directly. The code currently states this: Put all tasks (in original order) in the queue and tell twenty workers to each do this: Take a task from the queue and process it, continue until you got an EOF (None) from the queue. Because the workers are working parallel, this of course can mean that the worker who is last to get his first task (the twentieth task) can be the first to finish his task. This will then change the order in which the results arrive. But this depends on the time the tasks need. – Alfe May 16 '13 at 09:05
  • How can I modify this to preserve the order of the list filesName regardless of individual processing times? – imagineerThat May 16 '13 at 18:57
  • 1
    The results in this code are stored in the result mapping, so after all results are collected in this you can iterate through your original list (which is in the order you want) and get the matching results out of the result dictionary. – Alfe May 17 '13 at 10:01
12

A simple way to collect output from parallel md5sum subprocesses is to use a thread pool and write to the file from the main process:

from multiprocessing.dummy import Pool # use threads
from subprocess import check_output

def md5sum(filename):
    try:
        return check_output(["md5sum", filename]), None
    except Exception as e:
        return None, e

if __name__ == "__main__":
    p = Pool(number_of_processes) # specify number of concurrent processes
    with open("md5sums.txt", "wb") as logfile:
        for output, error in p.imap(md5sum, filenames): # provide filenames
            if error is None:
               logfile.write(output)
  • the output from md5sum is small so you can store it in memory
  • imap preserves order
  • number_of_processes may be different from number of files or CPU cores (larger values doesn't mean faster: it depends on relative performance of IO (disks) and CPU)

You can try to pass several files at once to the md5sum subprocesses.

You don't need external subprocess in this case; you can calculate md5 in Python:

import hashlib
from functools import partial

def md5sum(filename, chunksize=2**15, bufsize=-1):
    m = hashlib.md5()
    with open(filename, 'rb', bufsize) as f:
        for chunk in iter(partial(f.read, chunksize), b''):
            m.update(chunk)
    return m.hexdigest()

To use multiple processes instead of threads (to allow the pure Python md5sum() to run in parallel utilizing multiple CPUs) just drop .dummy from the import in the above code.

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • Sorry, still learning here. I don't understand why Queues isn't used here. If multiple processes are writing to logfile, won't there problems? If I'm mistaken, how is there any synchronization? – imagineerThat May 15 '13 at 21:53
  • Looks like `Pool` supports asynchronous calls. Does this mean it maintains the order of md5's written (in the order of `filenames`)? Unlike simply starting x number of threads? – imagineerThat May 15 '13 at 21:59
  • 1
    `Pool` provides higher-level interface. It uses `Queue`s itself internally. `logfile` file is accessed only from the main thread (only `md5sum()` function is executed in child threads). `imap()` returns results in order (as I explicitly mentioned already) – jfs May 16 '13 at 01:03
  • Can you point me to something that can assist me with learning theses topics? I've tried Googling around but haven't found anything comprehensive and introductory enough on multiprocessing. – imagineerThat May 16 '13 at 01:44
  • It is a vast topic (try looking up concurrent/parallel/distributed programming/computing). What particular aspects you are interested in? – jfs May 16 '13 at 06:53
  • The examples on this page differ a lot with what parallel constructs are used. E.g. you're using Pool and imap. Another answer had Processes and Queues. There could have been an answer with threads, or something else. I wish there was a guide that went over the differences between these, especially for someone new to multiprocessing. – imagineerThat May 17 '13 at 18:56
  • BTW, thanks for your answer. With hindsight, it looks like the most concise approach. – imagineerThat May 17 '13 at 18:58