117

I am trying to solve a big numerical problem which involves lots of subproblems, and I'm using Python's multiprocessing module (specifically Pool.map) to split up different independent subproblems onto different cores. Each subproblem involves computing lots of sub-subproblems, and I'm trying to effectively memoize these results by storing them to a file if they have not been computed by any process yet, otherwise skip the computation and just read the results from the file.

I'm having concurrency issues with the files: different processes sometimes check to see if a sub-subproblem has been computed yet (by looking for the file where the results would be stored), see that it hasn't, run the computation, then try to write the results to the same file at the same time. How do I avoid writing collisions like this?

Big Dogg
  • 2,564
  • 5
  • 21
  • 22
  • 7
    Check out an example from the documentation of using [`multiprocessing.Lock`](http://docs.python.org/2/library/multiprocessing.html#synchronization-between-processes) to synchronize multiple processes. – John Vinyard Nov 19 '12 at 01:22
  • 19
    You could have a only single process writing results, with a Queue as input that could be fed by the other worker processes. I believe it would be safe to have all the worker processes read-only. – GP89 Nov 19 '12 at 01:27
  • 1
    I should have mentioned that, to make things more complicated, I'm running multiple different big main problems at the same time on a cluster, with each one writing results to sub-subproblems on the same networked file system. Thus I can get collisions from processes running on separate machines entirely (so I don't think solutions using things like multiprocessing.Lock will work). – Big Dogg Nov 19 '12 at 01:45
  • Is the problem you're having with file write collisions, or is it just that you don't want to duplicate work in situations where one worker starts solving a sub-subproblem while another is already working on it? The latter is a bit more difficult to solve (more synchronization is required). – Blckknght Nov 19 '12 at 01:58
  • Well originally I was having file write collisions, but I find that checking for the file's existence immediately before writing (instead of relying on the check I do before I start computing the sub-subproblem) took care of that. Now it's more the latter; I'd like to avoid duplicate work if possible (and can imagine others in the same situation). – Big Dogg Nov 19 '12 at 02:15
  • 3
    If your networked files system supports file locking, you can use the os specific file create method to exclusively create the file and hold an exclusive lock on it until the results are ready, then close it. Any process that failed to "win" the create race would try to open it and re-try (with a delay) until the were able to open it, then they can read the results. – JimP Nov 19 '12 at 02:57
  • Ah, thanks JimP! That sounds like exactly what I need. I'll look into it. – Big Dogg Nov 19 '12 at 03:06
  • 13
    You're essentially programming a database server here. Have you considered using an existing one? – georg Nov 19 '12 at 09:06

4 Answers4

176

@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()
Steven C. Howell
  • 16,902
  • 15
  • 72
  • 97
MikeHunter
  • 4,144
  • 1
  • 19
  • 14
  • 2
    Hey Mike, thanks for the answer. I think this would work for the question as I phrased it, but I'm not so sure if it will solve the full problem as outlined in the comments to the question, specifically how I have several main programs running across several machines on a networked filesystem, all of which might have processes that will try to write to the same file. (FWIW, I got around my personal problem in a hacky way a while ago but am commenting in case others have similar issues.) – Big Dogg Nov 25 '12 at 05:14
  • 6
    I really would like to upvote this many times. This has been helpful so many times for me. Once more today. – Eduardo Feb 19 '14 at 10:44
  • 1
    Thanks Mike - I'd been struggling with how to use MP Queues. Your example makes it very clear and straightforward. – Anurag Mar 24 '14 at 14:50
  • 14
    I had to add a `pool.join()` below `pool.close()`. Otherwise my workers would finish before the listener and the process would just stop. – herrherr May 20 '14 at 13:49
  • Many thanks for this! Note that I had to include herrherr's suggestion, lest it may cause a hard-to-detect flaw in at least my scenario. – Joel Sjöstrand Feb 23 '15 at 13:16
  • 3
    What about when the consumer is greatly outnumbered and causes memory issues? How would you implement multiple consumers all writing to the same file? – ccdpowell Mar 01 '16 at 18:56
  • 27
    why `mp.cpu_count() + 2` when setting number of processes? – JenkinsY Jan 02 '18 at 09:31
  • After adopting this code, my program exits before the listener finishes its work, how could I fix that? – zyxue Jul 28 '18 at 03:32
  • This works great, except that it puts my outputs in a random order to disk, instead of doing it in the order I push data through. I'm using map rather than async for the worker threads. Unsure how to solve that issue. – Will Aug 08 '18 at 18:05
  • 1
    Tested on Linux, need to change `f = open(fn, 'wb') ` to `f = open(fn, 'w') ` to store the result, other wise the output file will be blank while the code can run like a charm. – Jia Gao Feb 20 '19 at 14:14
  • [Here](https://gist.github.com/StevenCHowell/c40c8879b71ee9979231a40d6fda1cbe) is an expanded version of this example. – Steven C. Howell Aug 16 '19 at 21:06
  • this freezes on `pool.join()`, copied this exactly.. any idea why? – Rafael Apr 10 '20 at 17:11
  • 1
    @JenkinsY `mp.cpu_count()+2` is just common practice or "rule of thumb" to ensure that the pool will be saturated. – pedrosaurio Oct 12 '20 at 13:49
3

It looks to me that you need to use Manager to temporarily save your results to a list and then write the results from the list to a file. Also, use starmap to pass the object you want to process and the managed list. The first step is to build the parameter to be passed to starmap, which includes the managed list.

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

From this point you need to decide how you are going to handle the list. If you have tons of RAM and a huge data set feel free to concatenate using pandas. Then you can save of the file very easily as a csv or a pickle.

        df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')
ggorlen
  • 44,755
  • 7
  • 76
  • 106
fizix137
  • 110
  • 6
  • 6
    Can I get some feedback on why this was down-voted? I see that the accepted answer is way better. I just want to learn. – fizix137 Jun 17 '20 at 20:22
  • 1
    what is params here? I cannot see it being initialised anywhere. Also, mgr.list([]), would it be a empty list? you are appending tuple row and param to params, param contain the object to be processed, what row contains? – akshit bhatia Apr 11 '21 at 16:23
  • 4
    It might be downvoted since in your code all the process outputs are stored in memory, this doesn't solve the issue. OP asks about writing each process output to a file while processing. The main problem to solve here is to avoid collision, e.g. multiple processes trying to access the file at the same time. – Kerem T Apr 15 '21 at 04:45
1

In response to the comments saying that this is being run on a cluster, a simple option, which doesn't rely on inter-process communication, is to lock the memoization file using fcntl from the Python standard library.

This works on MacOS and I expect it will work on most unix systems, though it will need to be tested on your particular networked storage implementation:

safe.py

import fcntl
import time

def myprint(*args):
    print(time.ctime(), *args)


def main():
    with open("safe.txt", "r+") as file:

        myprint("locking")

        # this will block (unless LOCK_EX | LOCK_NB is used)
        fcntl.lockf(file, fcntl.LOCK_EX)

        lines = file.readlines()

        # make race conditions more likely
        time.sleep(1)
        
        # "1" or one more than the the previous entry
        newval = int(lines[-1])+1 if lines else 1

        print(newval)

        file.write(str(newval) + "\n")
        file.flush()

        myprint("unlocking")

        fcntl.lockf(file, fcntl.F_UNLCK)


if __name__ == '__main__':
    main()

You can check that it works locally, by running this in a terminal:

touch safe.txt  # this needs to already exist

for x in 1 2 3 4 5
do
  python safe.py &
done

cat safe.txt  # should have 1-5 inside

If you combine this with multiprocessing, each process probably needs its own file descriptor (so run open() separately in each process).

FiddleStix
  • 3,016
  • 20
  • 21
0

I thought I post my solution to a somewhat simpler problem as well, since whenever I'm looking for my problem this page comes up.

I somewhat based this loosely on @MikeHunter's solution above. The reason why I needed something slightly different is that the arrays that I want to write at the end of each process are fairly large meaning putting them into the queue and getting them from the queue and writing them using a different process means a lot of pickling and unpickling of extremely large arrays. This does not handle the problem of checking many sub-problems and sub-sub problems as requested by OP, but it handle the "title" of the question!

So what do I do?

I parse a lock that all processes have access to and write to the file inside a Lock.acquire() and Lock.release() wrapper. That way none of the processes can write when any of the other ones is. All of this to handle writing to HDF5 files in serial without the MPI compilation requirement.


from multiprocessing import Process, Queue, Lock
import h5py
import numpy as np
from time import sleep, time


def func(i, l, filename, subfilename):

    # Reading from the subfile
    with h5py.File(subfilename, 'r') as ds:
        array = ds['array'][:]

    sleeptime = np.random.rand(1)*4 + 1
    sleep(sleeptime[0])

    # Print array loaded to compare to output in the summary file
    print(i, f'{sleeptime[0]:.3f}', array)

    # Lock out any other process from writing to the summary file
    l.acquire()

    with h5py.File(filename, 'r+') as ds:
        ds['array'][i, :] = array

    # Release the lock
    l.release()


if __name__ == '__main__':

    N = 10
    Nsample = 5

    subfilenames = [f'sub_{i:>02d}.h5' for i in range(N)]

    for i in range(N):
        with h5py.File(subfilenames[i], 'w') as ds:
            disp = ds.create_dataset(
                'array', data=np.random.randint(0, 5, size=(5,)), dtype='f')

    filename = 'test.h5'

    with h5py.File(filename, 'w') as ds:
        disp = ds.create_dataset('array', (N, Nsample), dtype='f')

    # Create a lock that is communicated to the workers
    l = Lock()

    # Start the timer
    t0 = time()

    # Distribute the work to the workers
    processes = []

    print(" T  sleeptime     array", flush=True)
    print("-----------------------", flush=True)

    for i in range(N):
        p = Process(target=func, args=(
            i, l, filename, subfilenames[i]))
        p.start()
        processes.append(p)

    # Wait for the workers to finish
    for p in processes:
        p.join()

    # Print time taken
    print(f'Total time taken: {time()-t0:.2f} s')


If you save the script as hello.py you can run and sort the output like so:

python hello.py | sort

Which should generate something like this:

 T  sleeptime     array
-----------------------
0 4.336 [4. 1. 1. 0. 2.]
1 2.363 [2. 1. 1. 1. 3.]
2 2.741 [1. 2. 2. 4. 3.]
3 1.078 [1. 4. 4. 3. 0.]
4 1.327 [4. 4. 4. 4. 1.]
5 4.174 [1. 3. 1. 0. 4.]
6 2.095 [4. 1. 0. 3. 0.]
7 1.091 [3. 4. 4. 0. 4.]
8 1.601 [4. 3. 3. 1. 4.]
9 4.550 [3. 3. 3. 4. 0.]
Total time taken: 4.94 s

Check against written HDF5 file:

h5dump test.h5

which should results in something like this

HDF5 "test.h5" {
GROUP "/" {
   DATASET "array" {
      DATATYPE  H5T_IEEE_F32LE
      DATASPACE  SIMPLE { ( 10, 5 ) / ( 10, 5 ) }
      DATA {
      (0,0): 4, 1, 1, 0, 2,
      (1,0): 2, 1, 1, 1, 3,
      (2,0): 1, 2, 2, 4, 3,
      (3,0): 1, 4, 4, 3, 0,
      (4,0): 4, 4, 4, 4, 1,
      (5,0): 1, 3, 1, 0, 4,
      (6,0): 4, 1, 0, 3, 0,
      (7,0): 3, 4, 4, 0, 4,
      (8,0): 4, 3, 3, 1, 4,
      (9,0): 3, 3, 3, 4, 0
      }
   }
}
}

Note on the update

I first was using a queue for my use-case, but I realized that a simple multiprocessing.Lock would do the trick. No need for a complicated Queue.put Queue.get wrap.


Note there are better ways of doing this using mpi4py, but I needed the user not to worry about MPI.

lsawade
  • 11
  • 4