3

I have a function "function" that I want to call 10 times using 2 times 5 cpus with multiprocessing.

Therefore I need a way to synchronize the processes as described in the code below.

Is this possible without using a multiprocessing pool? I get strange errors if I do so (for example "UnboundLocalError: local variable 'fd' referenced before assignment" (I don't have such a variable)). Also the processes seem to terminate randomly.

If possible I would like to do this without a pool. Thanks!

number_of_cpus = 5
number_of_iterations = 2

# An array for the processes.
processing_jobs = []

# Start 5 processes 2 times.
for iteration in range(0, number_of_iterations):

    # TODO SYNCHRONIZE HERE

    # Start 5 processes at a time.
    for cpu_number in range(0, number_of_cpus):

        # Calculate an offset for the current function call.
        file_offset = iteration * cpu_number * number_of_files_per_process

        p = multiprocessing.Process(target=function, args=(file_offset,))
        processing_jobs.append(p)
        p.start()

    # TODO SYNCHRONIZE HERE

This is an (anonymized) traceback of the errors I get when I run the code in a pool:

Process Process-5:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "python_code_3.py", line 88, in function_x
    xyz = python_code_1.function_y(args)
  File "/python_code_1.py", line 254, in __init__
    self.WK =  file.WK(filename)
  File "/python_code_2.py", line 1754, in __init__
    self.__parse__(name, data, fast_load)
  File "/python_code_2.py", line 1810, in __parse__
    fd.close()
UnboundLocalError: local variable 'fd' referenced before assignment

Most of the processes crash like that but not all of them. More of them seem to crash when I increase the number of processes. I also thought this might be due to memory limitations...

Ohumeronen
  • 1,769
  • 2
  • 14
  • 28
  • 1
    Why don't you want to use a `Pool`? – dano Aug 18 '14 at 17:45
  • I get errors that refer to lines of code in the multiprocessing library itself and to the line where I call the pool. That gives me no hint how to debug this. No erros occur in serial processing mode. – Ohumeronen Aug 18 '14 at 17:47
  • 1
    This task is really best suited to a `Pool`, if you could get it working without errors, would you be willing to use one? – dano Aug 18 '14 at 17:48
  • Sure but first I want to see if I can fix this somehow else ;-) Do you have a hint what the problem with the pool could be? I call it like this: results = [bool for bool in pool.imap(function, pool_args)] – Ohumeronen Aug 18 '14 at 17:49
  • 1
    It'd be helpful if you provided the complete traceback. The code runs fine for me on Linux. What platform are you using? – dano Aug 18 '14 at 17:50
  • I just see if I can get the traceback... – Ohumeronen Aug 18 '14 at 17:55
  • 1
    Also, what do you mean by "synchronize"? Do you just mean wait for the first set of 5 processes to complete prior to starting the second set? Do you need to return anything from `function` back to the parent? – dano Aug 18 '14 at 17:58
  • I use Linux (Ubuntu) too. How to post the traceback best? Just in the comment? – Ohumeronen Aug 18 '14 at 18:01
  • 1
    Edit it into your question. – dano Aug 18 '14 at 18:01
  • I want the 5 processes to finish their job without returning anything but I want to write to a file. Then I want to call the next 5 processes. – Ohumeronen Aug 18 '14 at 18:05
  • That traceback is in your code. It looks like maybe the `function` you're executing can't be safely run simultaneously by multiple processes. – dano Aug 18 '14 at 18:14
  • I had the very same idea just now. Thanks! I guess it depends on the data I process and it only occurs in some chunks of the data. Thanks again! – Ohumeronen Aug 18 '14 at 18:18

2 Answers2

1

A Pool can be very easy to use. Here's a full example:

source

import multiprocessing

def calc(num):
    return num*2

if __name__=='__main__':  # required for Windows
    pool = multiprocessing.Pool()   # one Process per CPU
    for output in pool.map(calc, [1,2,3]):
        print 'output:',output

output

output: 2
output: 4
output: 6
johntellsall
  • 14,394
  • 4
  • 46
  • 40
  • 2
    You should probably use an `if __name__ == "__main__":` guard so that this will work on Windows. – dano Aug 18 '14 at 17:59
  • 1
    @shavenwarthog is on the right track. You, @user2177047, need to come up with a better way to share the mutual resource that is being opened in `function`. The best method would be to open the resource in your parent thread, farm work out to your spawned processes and report back to the parent (who then writes to the file). Alternatively, this SO post may help give you some ideas: http://stackoverflow.com/questions/659865/python-multiprocessing-sharing-a-large-read-only-object-between-processes – Paul Seeb Aug 18 '14 at 18:21
1

Here's how you can do the synchronization you're looking for without using a pool:

import multiprocessing

def function(arg):
    print ("got arg %s" % arg)

if __name__ == "__main__":
    number_of_cpus = 5
    number_of_iterations = 2

    # An array for the processes.
    processing_jobs = []

    # Start 5 processes 2 times.
    for iteration in range(1, number_of_iterations+1):  # Start the range from 1 so we don't multiply by zero.

        # Start 5 processes at a time.
        for cpu_number in range(1, number_of_cpus+1):

            # Calculate an offset for the current function call.
            file_offset = iteration * cpu_number * number_of_files_per_process

            p = multiprocessing.Process(target=function, args=(file_offset,))
            processing_jobs.append(p)
            p.start()

        # Wait for all processes to finish.
        for proc in processing_jobs:
            proc.join()

        # Empty active job list.
        del processing_jobs[:]

        # Write file here
        print("Writing")

Here it is with a Pool:

import multiprocessing

def function(arg):
    print ("got arg %s" % arg)

if __name__ == "__main__":
    number_of_cpus = 5
    number_of_iterations = 2

    pool = multiprocessing.Pool(number_of_cpus)
    for i in range(1, number_of_iterations+1): # Start the range from 1 so we don't multiply by zero
        file_offsets = [number_of_files_per_process * i * cpu_num for cpu_num in range(1, number_of_cpus+1)] 
        pool.map(function, file_offsets)
        print("Writing")
        # Write file here

As you can see, the Pool solution is nicer.

This doesn't solve your traceback problem, though. It's hard for me to say how to fix that without understanding what's actually causing that. You may need to use a multiprocessing.Lock to synchronize access to the resource.

dano
  • 91,354
  • 19
  • 222
  • 219
  • Thanks. I will look both at the traceback and your solution. One question. Don't I have to reinstantiate processing_jobs after deleting it in the outer for-loop? – Ohumeronen Aug 18 '14 at 18:32
  • @user2177047 We're just deleting all the *contents* of the `processing_jobs` list, not the object itself. So you can just start appending to it again on the next iteration. – dano Aug 18 '14 at 18:36