6

The following code works, but it is very slow due to passing the large data sets. In the actual implementation, the speed it takes to create the process and send the data is almost the same as calculation time, so by the time the second process is created, the first process is almost finished with the calculation, making parallezation? pointless.

The code is the same as in this question Multiprocessing has cutoff at 992 integers being joined as result with the suggested change working and implemented below. However, I ran into the common problem as others with I assume, pickling large data taking a long time.

I see answers using the multiprocessing.array to pass a shared memory array. I have an array of ~4000 indexes, but each index has a dictionary with 200 key/value pairs. The data is just read by each process, some calculation is done, and then an matrix (4000x3) (with no dicts) is returned.

Answers like this Is shared readonly data copied to different processes for Python multiprocessing? use map. Is it possible to maintain the below system and implement shared memory? Is there an efficient way to send the data to each process with an array of dicts, such as wrapping the dict in some manager and then putting that inside of the multiprocessing.array ?

import multiprocessing

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,200):
            data[str(i)] = i

    CalcManager(total,start=0,end=3000)

def CalcManager(myData,start,end):
    print 'in calc manager'
    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        print 'starting processes'
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print result

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    print 'started process'
    results = []
    temp = []
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    result_q.put(results)
    return

if __name__== '__main__':   
    main()

Solved

by just putting the list of dictionaries into a manager, the problem was solved.

manager=Manager()
d=manager.list(myData)

It seems that the manager holding the list also manages the dict contained by that list. The startup time is a bit slow, so it seems data is still being copied, but its done once at the beginning and then inside of the process the data is sliced.

import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,100):
            data[str(i)] = i

    CalcManager(total,start=0,end=500)

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    #print 'started process'
    results = []
    temp = []
    data = data[start:end]
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    print len(data)        
    result_q.put(results)
    return

if __name__ == '__main__':
    main()
Community
  • 1
  • 1
user-2147482637
  • 2,115
  • 8
  • 35
  • 56
  • Your code seems to indicate that for a range of input, you produce just one output. Is this what you want? – Hai Vu Sep 02 '14 at 15:02
  • To use shared memory, you'd have to turn your array of dicts into a `ctypes` object, and then use [`multiprocessing.sharedctypes`](https://docs.python.org/2.7/library/multiprocessing.html#module-multiprocessing.sharedctypes). I'm not sure if that's actually feasible for your use case. – dano Sep 02 '14 at 17:03
  • @HaiVu it returns an array of floats for each process. The results=range(0,(992)) is just sample list. – user-2147482637 Sep 02 '14 at 23:43
  • Let me rephrase my question: for each row of input, do you return one row of output? Your code indicate that for many rows of input, you return just one row of output. – Hai Vu Sep 03 '14 at 00:10
  • @HaiVu No, it returns a 22x3 matrix, and later I will need to return a 22x9 matrix with it, so it would be an array containing two matrices. – user-2147482637 Sep 03 '14 at 00:25
  • So, in your example, you have 50 inputs in your data, but I only see 3 matrices returned. did you mean to return 50 matrices? – Hai Vu Sep 03 '14 at 00:40
  • @HaiVu the 3 matrices returned would be because there are 3 processes. Each process returns a 22x3 matrix ie 22 sets of (x,y,z) values. The values are calculated by the input data which uses dicts to define names of places and values of them. Does that clarify it? – user-2147482637 Sep 03 '14 at 00:52
  • @dano Is it possible to use the dict manager explained here http://stackoverflow.com/questions/6832554/python-multiprocessing-how-do-i-share-a-dict-among-multiple-processes and put that inside of a sharedctype, or would that not solve the pickling/unpickling data transfer problem in terms of speed? – user-2147482637 Sep 03 '14 at 03:04
  • 1
    @user1938107 No, you can only put `ctypes` objects into a `multiprocessing.sharedctype` object. No `multiprocessing.Manager.dict` objects allowed. You *could* create a shared `list` using a `Manager`, and have shared `dict` instances inside of it, but I'm not sure if that will really give you much performance benefit. – dano Sep 03 '14 at 03:21
  • @dano That actually worked by just using the manager on the list, and not doing anything with the dict inside. There is some initial startup cost but in the end it is minimum 2-3 times faster. Could you put that as an answer and any insight you have to the updated working code. – user-2147482637 Sep 03 '14 at 07:41

2 Answers2

2

Looking at your question, I assume the following:

  • For each item in myData, you want to return an output (a matrix of some sort)
  • You created a JoinableQueue (tasks) probably for holding the input, but not sure how to use it

The Code

import logging
import multiprocessing


def create_logger(logger_name):
    ''' Create a logger that log to the console '''
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)

    # create console handler and set appropriate level
    ch = logging.StreamHandler()
    formatter = logging.Formatter("%(processName)s %(funcName)s() %(levelname)s: %(message)s")
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger

def main():
    global logger
    logger = create_logger(__name__)
    logger.info('Main started')
    data = []
    for i in range(0,100):
        data.append({str(i):i})

    CalcManager(data,start=0,end=50)
    logger.info('Main ended')

def CalcManager(myData,start,end):
    logger.info('CalcManager started')
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Add tasks
    for i in range(start, end):
        tasks.put(myData[i])

    # Create processes to do work
    nprocs = 3
    for i in range(nprocs):
        logger.info('starting processes')
        p = multiprocessing.Process(target=worker,args=(tasks,results))
        p.daemon = True
        p.start()

    # Wait for tasks completion, i.e. tasks queue is empty
    try:
        tasks.join()
    except KeyboardInterrupt:
        logger.info('Cancel tasks')

    # Print out the results
    print 'RESULTS'
    while not results.empty():
        result = results.get()
        print result

    logger.info('CalManager ended')

def worker(tasks, results):
    while True:
        try:
            task = tasks.get()  # one row of input
            task['done'] = True # simular work being done
            results.put(task)   # Save the result to the output queue
        finally:
            # JoinableQueue: for every get(), we need a task_done()
            tasks.task_done()


if __name__== '__main__':   
    main()

Discussion

  • For multiple process situation, I recommend using the logging module as it offer a few advantages:
    • It is thread- and process- safe; meaning you won't have situation where the output of one processes mingle together
    • You can configure logging to show the process name, function name--very handy for debugging
  • CalcManager is essentially a task manager which does the following
    1. Creates three processes
    2. Populate the input queue, tasks
    3. Waits for the task completion
    4. Prints out the result
  • Note that when creating processes, I mark them as daemon, meaning they will killed when the main program exits. You don't have to worry about killing them
  • worker is where the work is done
    • Each of them runs forever (while True loop)
    • Each time through the loop, they will get one unit of input, do some processing, then put the result in the output
    • After a task is done, it calls task_done() so that the main process knows when all jobs are done. I put task_done in the finally clause to ensure it will run even if an error occurred during processing
Hai Vu
  • 37,849
  • 11
  • 66
  • 93
  • Thank you for the answer, i guess you hadnt seen my updates which is why you kept asking the questions about returns. Would it change this strategy at all? – user-2147482637 Sep 03 '14 at 01:10
  • also for large data this seems to not work. It looks as if it has the same problem as my original code when the task and join are switched and the data is stuck. ...not that I really understand that well what is happening. – user-2147482637 Sep 03 '14 at 01:22
2

You may see some improvement by using a multiprocessing.Manager to store your list in a manager server, and having each child process access items from the dict by pulling them from that one shared list, rather than copying slices to each child process:

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    nprocs = 3 
    result_q = multiprocessing.Queue()
    procs = []

    interval = (end-start)/nprocs 
    new_start = start

    for i in range(nprocs):
        new_end = new_start + interval
        if new_end > end:
            new_end = end 
        p = multiprocessing.Process(target=multiProcess,
                                    args=(d, new_start, new_end, result_q, i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'        

    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

This copies your entire data list to a Manager process prior to creating any of your workers. The Manager returns a Proxy object that allows shared access to the list. You then just pass the Proxy to the workers, which means their startup time will be greatly reduced, since there's no longer any need to copy slices of the data list. The downside here is that accessing the list will be slower in the children, since the access needs to go to the manager process via IPC. Whether or not this will really help performance is very dependent on exactly what work you're doing on the list in your work processes, but its worth a try, since it requires very few code changes.

dano
  • 91,354
  • 19
  • 222
  • 219