6

I am doing a function optimization using an evolutionary algorithm (CMAES). To run it faster I am using the multiprocessing module. The function I need to optimize takes large matrices as inputs (input_A_Opt, and input_B_Opt) in the code below.

They are several GBs of size. When I run the function without multiprocessing, it works well. When I use multiprocessing there seems to be a problem with memory. If I run it with small inputs it works well, but when I run with the full input, I get the following error:

File "<ipython-input-2-bdbae5b82d3c>", line 1, in <module>
opt.myFuncOptimization()

File "/home/joe/Desktop/optimization_folder/Python/Optimization.py", line 45, in myFuncOptimization
**f_values = pool.map_async(partial_function_to_optmize, solutions).get()**
File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 385, in _handle_tasks
put(task)
File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))

File "/usr/lib/python3.5/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)

error: 'i' format requires -2147483648 <= number <= 2147483647

And here's a simplified version of the code (again, if I run it with the input 10 times smaller, all works fine):

import numpy as np
import cma
import multiprocessing as mp
import functools
import myFuncs
import hdf5storage



def myFuncOptimization ():

    temp = hdf5storage.loadmat('/home/joe/Desktop/optimization_folder/matlab_workspace_for_optimization')    

    input_A_Opt  = temp["input_A"]
    input_B_Opt  = temp["input_B"]

    del temp

    numCores = 20

    # Inputs
   #________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________
    P0 = np.array([            4.66666667, 2.5,    2.66666667, 4.16666667, 0.96969697,     1.95959596,     0.44088176,     0.04040404,     6.05210421,     0.58585859,     0.46464646,         8.75751503,         0.16161616,             1.24248497,         1.61616162,                 1.56312625,         5.85858586,                 0.01400841, 1.0,            2.4137931,      0.38076152, 2.5,    1.99679872      ])
    LBOpt = np.array([         0.0,        0.0,    0.0,        0.0,        0.0,            0.0,            0.0,            0.0,            0.0,            0.0,            0.0,                0.0,                0.0,                    0.0,                0.0,                        0.0,                0.0,                        0.0,        0.0,            0.0,            0.0,        0.0,    0.0,            ])
    UBOpt = np.array([         10.0,       10.0,   10.0,       10.0,       10.0,           10.0,           10.0,           10.0,           10.0,           10.0,           10.0,               10.0,               10.0,                   10.0,               10.0,                       10.0,               10.0,                       10.0,       10.0,           10.0,           10.0,       10.0,   10.0,           ])
    initialStdsOpt = np.array([2.0,        2.0,    2.0,        2.0,        2.0,            2.0,            2.0,            2.0,            2.0,            2.0,            2.0,                2.0,                2.0,                    2.0,                2.0,                        2.0,                2.0,                        2.0,        2.0,            2.0,            2.0,        2.0,    2.0,            ])
    minStdsOpt = np.array([    0.030,      0.40,   0.030,      0.40,       0.020,          0.020,          0.020,          0.020,          0.020,          0.020,          0.020,              0.020,              0.020,                  0.020,              0.020,                      0.020,              0.020,                      0.020,      0.050,          0.050,          0.020,      0.40,   0.020,          ]) 

    options = {'bounds':[LBOpt,UBOpt], 'CMA_stds':initialStdsOpt, 'minstd':minStdsOpt, 'popsize':numCores}
    es = cma.CMAEvolutionStrategy(P0, 1, options)

    pool = mp.Pool(numCores)

    partial_function_to_optmize = functools.partial(myFuncs.func1, input_A=input_A_Opt, input_B=input_B_Opt)

    while not es.stop():
        solutions = es.ask(es.popsize)            
        f_values = pool.map_async(partial_function_to_optmize, solutions).get()   
        es.tell(solutions, f_values)
        es.disp(1)
        es.logger.add()

    return es.result_pretty()

Any suggestions on how to solve this issue? am I not coding properly (new to python) or should I use other multiprocessing package like scoop?

alex_milhouse
  • 891
  • 1
  • 13
  • 31
Joe
  • 185
  • 6
  • You're using way too much memory! Look into shared memory for values that don't need to be copied (https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes). – tcooc Nov 16 '16 at 21:41
  • Related: [Use numpy array in shared memory for multiprocessing](https://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory-for-multiprocessing) – robyschek Nov 16 '16 at 22:04

1 Answers1

2

Your objects are too big to pass between processes. You're passing along more than 2147483647 bytes - that's over 2GB! The protocol isn't made for this, and the sheer overhead of serializing and deserializing such large chunks of data can be a serious performance overhead.

Reduce the size of data passed to each process. If you workflow allows it, read in the data in the separate process, and pass along only the results.

MisterMiyagi
  • 44,374
  • 10
  • 104
  • 119
  • Thanks MisterMiyagi. Yes, each matrix I pass has more than 2GB. This data is needed though. Will try your suggestion of reading the data in each process, it actually can be done. It will make increase the time needed for each iteration, as instead one reading at the beginning of the process I will need to read every time I do one evaluation (thousands of times). Probably the increased time is ~25%, kind of acceptable. – Joe Nov 17 '16 at 01:09
  • btw, do you know if there is any other parallel processing framework that can work with this amount of data (e.g. scoop)? – Joe Nov 17 '16 at 01:10
  • follow up question - can it be a problem having 20 parallel processes reading the same file at the same time? – Joe Nov 17 '16 at 04:43
  • @Joe You might be better off not having a pool of fire-and-forget processes, but individual processes each strictly working on a chunk. Then you can avoid having to re-read the data. If your data may grow even larger, you might want to look at batch processing, though. – MisterMiyagi Nov 17 '16 at 08:42
  • @Joe I/O is mainly dictated by the media you are reading from. A modern, consumer grade HDD is suitable for only two parallel readers. An SSD can go to 20 or more concurrent readers. RAID with mirroring practically scales linearly with number of devices. Distributed, parallel file systems in a cluster are the way to go if you are in the TB range or above. – MisterMiyagi Nov 17 '16 at 08:51
  • How about creating global variables for the input (read only) data before launching the pool and map it? This should eliminate the need of passing the large matrixes... – Joe Nov 17 '16 at 09:02
  • @Joe Global variables are *process global*. They are not shared across processes, only the threads of a single process. If you have global variables, each process creates its own "copy" of the variable. You can look at shared memory, but then each process has to actually *read* that, and you must manage. You're better off having the OS use the memory to cache your I/O - it knows what it does. – MisterMiyagi Nov 17 '16 at 10:49
  • Thanks for your answers. I implemented your suggestion. I found additional problems. See post here and let me know if you have any suggestion – Joe Nov 19 '16 at 02:50