1

I'm doing a lot of calculations writing the results to a file. Using multiprocessing I'm trying to parallelise the calculations.

Problem here is that I'm writing to one output file, which all the workers are writing too. I'm quite new to multiprocessing, and wondering how I could make it work.

A very simple concept of the code is given below:

from multiprocessing import Pool

fout_=open('test'+'.txt','w')

def f(x):
    fout_.write(str(x) + "\n")


if __name__ == '__main__':
    p = Pool(5)
    p.map(f, [1, 2, 3])

The result I want would be a file with:

1 2 3

However now I get an empty file. Any suggestions? I greatly appreciate any help :)!

Tim D
  • 111
  • 1
  • 2
  • 7
  • 1
    Either you need a lock when you access the file or use one file for each worker and summarise it later serially – vumaasha Mar 18 '18 at 11:11
  • Hi, I had tried your code without any modifications, and I am getting a file `test.txt` with `1, 2, 3` written to it. I am not able to reproduce your error – nj2237 Mar 18 '18 at 11:32
  • I'm using Python 3.6, if that should matter – nj2237 Mar 18 '18 at 11:34
  • nj2237, very strange. I have the problem both in python 2.7 as 3.6 (only having an empty file). – Tim D Mar 18 '18 at 11:40
  • with 'w' flag every worker will overwrite the file... Try to change it to 'a' flag. Also, you either need to make every worker to write to unique file and combine them later, or multiprocess calculations and then write to a single file. Third option would be to write to some database with concurrent access. – Denis Rasulev Mar 18 '18 at 12:46

2 Answers2

1

Multiprocessing.pool spawns processes, writing to a common file without lock from each process can cause data loss. As you said you are trying to parallelise the calculation, multiprocessing.pool can be used to parallelize the computation.

Below is the solution that do parallel computation and writes the result in file, hope it helps:

from multiprocessing import Pool

# library for time 
import datetime

# file in which you want to write 
fout = open('test.txt', 'wb')

# function for your calculations, i have tried it to make time consuming
def calc(x):
    x = x**2
    sum = 0
    for i in range(0, 1000000):
        sum += i
    return x

# function to write in txt file, it takes list of item to write
def f(res):
    global fout
    for x in res:
        fout.write(str(x) + "\n")

if __name__ == '__main__':
    qs = datetime.datetime.now()
    arr = [1, 2, 3, 4, 5, 6, 7]
    p = Pool(5)
    res = p.map(calc, arr)
    # write the calculated list in file
    f(res)
    qe = datetime.datetime.now()
    print (qe-qs).total_seconds()*1000
    # to compare the improvement using multiprocessing, iterative solution
    qs = datetime.datetime.now()
    for item in arr:
        x = calc(item)
        fout.write(str(x)+"\n")
    qe = datetime.datetime.now()
    print (qe-qs).total_seconds()*1000
vsri293
  • 541
  • 4
  • 7
  • Thank you for your explaination vsri293! It was very clarifing, however I took the solution of user atru because of the extensive explanation. Hope you understand! – Tim D Mar 18 '18 at 13:45
  • Thanks @TimD, atru's solution is good but atru's solution assumes that number of pool processes and numbers for which you need to do calculation can be same. What if you can only pool 5 processes and you have to do calculations for 100 numbers(more than 5)? You have to write extra code to zip the numbers to output filename. – vsri293 Mar 18 '18 at 18:24
1

You shouldn't be letting all the workers/processes write to a single file. They can all read from one file (which may cause slow downs due to workers waiting for one of them to finish reading), but writing to the same file will cause conflicts and potentially corruption.

Like said in the comments, write to separate files instead and then combine them into one on a single process. This small program illustrates it based on the program in your post:

from multiprocessing import Pool

def f(args):
    ''' Perform computation and write
    to separate file for each '''
    x = args[0]
    fname = args[1]
    with open(fname, 'w') as fout:
        fout.write(str(x) + "\n")

def fcombine(orig, dest):
    ''' Combine files with names in 
    orig into one file named dest '''
    with open(dest, 'w') as fout:
        for o in orig:
            with open(o, 'r') as fin:
                for line in fin:
                    fout.write(line)

if __name__ == '__main__':
    # Each sublist is a combination
    # of arguments - number and temporary output
    # file name
    x = range(1,4)
    names = ['temp_' + str(y) + '.txt' for y in x]
    args = list(zip(x,names))

    p = Pool(3)
    p.map(f, args)

    p.close()
    p.join()

    fcombine(names, 'final.txt')

It runs f for each argument combination which in this case are value of x and temporary file name. It uses a nested list of argument combinations since pool.map does not accept more than one arguments. There are other way to go around this, especially on newer Python versions.

For each argument combination and pool member it creates a separate file to which it writes the output. In principle your output will be longer, you can simply add another function that computes it to the f function. Also, no need to use Pool(5) for 3 arguments (though I assume that only three workers were active anyway).

Reasons for calling close() and join() are explained well in this post. It turns out (in the comment to the linked post) that map is blocking, so here you don't need them for the original reasons (wait till they all finish and then write to the combined output file from just one process). I would still use them in case other parallel features are added later.

In the last step, fcombine gathers and copies all the temporary files into one. It's a bit too nested, if you for instance decide to remove the temporary file after copying, you may want to use a separate function under the with open('dest', ).. or the for loop underneath - for readability and functionality.

atru
  • 4,699
  • 2
  • 18
  • 19
  • Thanks a bunch. Also, clarifing the need for .close() and .join() items made me better understand the code. Have a good one. – Tim D Mar 18 '18 at 13:46
  • Glad it helped. I used this approach the other day ;) Will add it later, have to go now literally. – atru Mar 18 '18 at 14:00
  • Edited, actually - it turns out that you don't need them with the map. But like said in the edit, I would still leave it there. Reasons as explained in the cited code are exception messages, but also a habit since parallel features are not always non-blocking. On the other hand, in C/C++ with MPI I'd never use a barrier if I don't have to, I guess I need to make up my mind when it comes to coding habits ;) – atru Mar 18 '18 at 17:30