0

This is my first post to stack overflow. I'll try to include all the necessary information, but please let me know if there's more info I can provide to clarify my question.

I'm trying to multithread a costly function for an astrophysical code in python using pool.map. The function takes as an input a list of objects. The basic code structure is like this:

There's a class of Stars with physical properties:

Class Stars:
    def __init__(self,mass,metals,positions,age):
        self.mass = mass
        self.metals = metals
        self.positions = positions
        self.age = age
    def info(self):
        return(self.mass,self.metals,self.positions,self.age)

and there's a list of these objects:

stars_list = []
for i in range(nstars):
                stars_list.append(Stars(mass[i],metals[i],positions[i],age[i]))

(where mass, metals, positions and age are known from another script).

There's a costly function that I run with these star objects that returns a spectrum for each one:

def newstars_gen(stars_list):
   ....
   return stellar_nu,stellar_fnu

where stellar_nu and stellar_fnu are numpy arrays

What I would like to do is break the list of star objects (stars_list) up into chunks, and then run newstars_gen on these chunks on multiple threads to gain a speedup. So, to do this, I split the list up into three sublists, and then try to run my function through pool.map:

p = Pool(processes = 3)
nchunks = 3
chunk_start_indices = []
chunk_start_indices.append(0) #the start index is 0

delta_chunk_indices = nstars / nchunks

for n in range(1,nchunks):
    chunk_start_indices.append(chunk_start_indices[n-1]+delta_chunk_indices)

for n in range(nchunks):
    stars_list_chunk = stars_list[chunk_start_indices[n]:chunk_start_indices[n]+delta_chunk_indices]
    #if we're on the last chunk, we might not have the full list included, so need to make sure that we have that here
    if n == nchunks-1: 
        stars_list_chunk = stars_list[chunk_start_indices[n]:-1]


    chunk_sol = p.map(newstars_gen,stars_list_chunk)

But when I do this, I get as the error:

File "/Users/[username]/python2.7/multiprocessing/pool.py", line 250, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/[username]/python2.7/multiprocessing/pool.py", line 554, in get
    raise self._value
AttributeError: Stars instance has no attribute '__getitem__'

So, I'm confused as to what sort of attribute I should include with the Stars class. I've tried reading about this online and am not sure how to define an appropriate __getitem__ for this class. I'm quite new to object oriented programming (and python in general).

Any help is much appreciated!

nop.bot
  • 11
  • 1

2 Answers2

1

So, it looks like there may be a couple things wrong here and that could be cleaned up or made more pythonic. However, the key problem is that you are using pool.multiprocessing.Pool.map incorrectly for what you have. Your newstars_gen function expects a list, but p.map is going to break up the list you give it into chunks and hand it one Star at a time. You should probably rewrite newstars_gen to operate on one star at a time and then throw away all but the first and last lines of your last code block. If the calculations in newstars_gen aren't independent between Stars (e.g., the mass of one impacts the calculation for another), you will have to do a more dramatic refactoring.

It also looks like it would behoove you to learn about list comprehensions. Be aware that the other built in structures (e.g., set, dict) have equivalents, and also look into generator comprehensions.

Community
  • 1
  • 1
desfido
  • 787
  • 6
  • 16
  • thanks, both for the advice and solution to the problem. - you're solution worked for my problem! thanks also for the recommendation for tools to pick up for my coding! – nop.bot Apr 17 '14 at 00:28
0

I've written a function for distributing the processing of an iterable (like your list of stars objects) among multiple processors, which I'm pretty sure will work well for you.

from multiprocessing import Process, cpu_count, Lock
from sys import stdout 
from time import clock

def run_multicore_function(iterable, function, func_args = [], max_processes = 0):
    #directly pass in a function that is going to be looped over, and fork those 
    #loops onto independant processors. Any arguments the function needs must be provided as a list.     
    if max_processes == 0:
        cpus = cpu_count()
        if cpus > 7:
            max_processes = cpus - 3
        elif cpus > 3:
            max_processes = cpus - 2
        elif cpus > 1:
            max_processes = cpus - 1
        else:
            max_processes = 1

    running_processes = 0
    child_list = []
    start_time = round(clock())
    elapsed = 0
    counter = 0
    print "Running function %s() on %s cores" % (function.__name__,max_processes)
    #fire up the multi-core!!
    stdout.write("\tJob 0 of %s" % len(iterable),)
    stdout.flush()
    for next_iter in iterable:
       if type(iterable) is dict:
           next_iter = iterable[next_iter]
       while 1:     #Only fork a new process when there is a free processor. 
            if running_processes < max_processes:
                #Start new process                  
                stdout.write("\r\tJob %s of %s (%i sec)" % (counter,len(iterable),elapsed),)
                stdout.flush()                  
                if len(func_args) == 0: 
                    p = Process(target=function, args=(next_iter,))
                else:
                    p = Process(target=function, args=(next_iter,func_args))
                p.start()
                child_list.append(p)
                running_processes += 1
                counter += 1
                break
            else:
                #processor wait loop
                while 1:
                    for next in range(len(child_list)):
                        if child_list[next].is_alive():
                            continue
                        else:
                            child_list.pop(next)
                            running_processes -= 1
                            break
                    if (start_time + elapsed) < round(clock()):
                        elapsed = round(clock()) - start_time
                        stdout.write("\r\tJob %s of %s (%i sec)" % (counter,len(iterable),elapsed),)
                        stdout.flush()

                    if running_processes < max_processes:
                        break

    #wait for remaining processes to complete --> this is the same code as the processor wait loop above
    while len(child_list) > 0:
        for next in range(len(child_list)):
            if child_list[next].is_alive():
                continue
            else:
                child_list.pop(next)
                running_processes -= 1
                break  #need to break out of the for-loop, because the child_list index is changed by pop 
        if (start_time + elapsed) < round(clock()):
            elapsed = round(clock()) - start_time
            stdout.write("\r\tRunning job %s of %s (%i sec)" % (counter,len(iterable),elapsed),)
            stdout.flush()

    print " --> DONE\n"
    return  

As a usage example, let's use your star_list, and send the result of newstars_gen to a shared file. Start by setting up your iterable, file, and a file lock

   star_list = []
   for i in range(nstars):
        stars_list.append(Stars(mass[i],metals[i],positions[i],age[i]))

   outfile = "some/where/output.txt"
   file_lock = Lock()

Define your costly function like so:

def newstars_gen(stars_list_item,args):   #args = [outfile,file_lock]
    outfile,file_lock = args

        ....

    with file_lock:
        with open(outfile,"a") as handle:
             handle.write(stellar_nu,stellar_fnu)

Now send your list of stars into run_multicore_function()

run_multicore_function(star_list, newstars_gen, [outfile,file_lock])

After all of your items have been calculated, you can go back into the output file to grab the data and carry on. Instead of writing to a file, you can also share the state with multiprocessing.Value or multiprocessing.Array, but I've ran into the occasional issue with data getting lost if my list is large and the function I'm calling is fairly fast. Maybe someone else out there can see why that's happening.

Hopefully this all makes sense!
Good luck,
-Steve

Steve Bond
  • 229
  • 2
  • 5