0

I am new to the world of parallelization, and encountered a very odd bug as I was trying to run a function trying to load the same npy file running on several cores.

My code is of the form:

import os
from pathlib import Path
from joblib import Parallel, delayed
import multiprocessing
num_cores = multiprocessing.cpu_count()

mydir = 'path/of/your/choice'
myfile = 'myArray.npy' 

mydir=Path(mydir)
myfile=mydir/myfile
os.chdir(mydir)

myarray = np.zeros((12345))
np.save(myfile, myarray)

def foo(myfile, x):
    # function loading a myArray and working with it 
    arr=np.load(myfile)
    return arr+x

if __name__=='__main__':
    foo_results = Parallel(n_jobs=num_cores, backend="threading")(\
                           delayed(foo)(myfile,i) for i in range(10))

In my case, this script would run fine about 40% of the way, then return

--> 17 arr=np.load(mydir/'myArray.npy')
ValueError: cannot reshape array of size 0 into shape (12345,)

What blows my mind is that if I enter %pdb debug mode and actually try to run arr=np.load(mydir/'myArray.npy'), this works! So I assume that the issue stems from all the parallel processes running foo trying to load the same numpy array at the same time (as in debug mode, all the processes are paused and only the code that I execute actually runs).

This very minimal example actually works, presumably because the function is very simple and joblib handles this gracefully, but my code would be too long and complicated to be posted here - first of all, has anyone encountered a similar issue in the past? If no one manages to identify my issue, I will post my whole script.

Thanks for your help!

-------------------- EDIT ------------------

Given that there doesn't seem to be an easy answer with the toy code that I posted, here are the full error logs. I played around with the backends following @psarka recommendation and for some reason, the following error arises with the default loky backend (again, no problem to run the code in a non-parallel manner):

/media/maxime/ut_data/Dropbox/NeuroPyxels/npyx/corr.py in ccg_stack(dp, U_src, U_trg, cbin, cwin, normalize, all_to_all, name, sav, again, periods)
    541 
    542             ccg_results=Parallel(n_jobs=num_cores)(\
--> 543                 delayed(ccg)(*ccg_inputs[i]) for i in tqdm(range(len(ccg_inputs)), desc=f'Computing ccgs over {num_cores} cores'))
    544             for ((i1, u1, i2, u2), CCG) in zip(ccg_ids,ccg_results):
    545                 if i1==i2:

~/miniconda3/envs/npyx/lib/python3.7/site-packages/joblib-1.0.1-py3.7.egg/joblib/parallel.py in __call__(self, iterable)
   1052 
   1053             with self._backend.retrieval_context():
-> 1054                 self.retrieve()
   1055             # Make sure that we get a last message telling us we are done
   1056             elapsed_time = time.time() - self._start_time

~/miniconda3/envs/npyx/lib/python3.7/site-packages/joblib-1.0.1-py3.7.egg/joblib/parallel.py in retrieve(self)
    931             try:
    932                 if getattr(self._backend, 'supports_timeout', False):
--> 933                     self._output.extend(job.get(timeout=self.timeout))
    934                 else:
    935                     self._output.extend(job.get())

~/miniconda3/envs/npyx/lib/python3.7/site-packages/joblib-1.0.1-py3.7.egg/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    540         AsyncResults.get from multiprocessing."""
    541         try:
--> 542             return future.result(timeout=timeout)
    543         except CfTimeoutError as e:
    544             raise TimeoutError from e

~/miniconda3/envs/npyx/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    426                 raise CancelledError()
    427             elif self._state == FINISHED:
--> 428                 return self.__get_result()
    429 
    430             self._condition.wait(timeout)

~/miniconda3/envs/npyx/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

ValueError: Cannot load file containing pickled data when allow_pickle=False

but this arises with the threading backend, which is more informative (which was originally used in my question) - again, it is possible to actually run train = np.load(Path(dprm,fn)) in debug mode:

/media/maxime/ut_data/Dropbox/NeuroPyxels/npyx/corr.py in ccg_stack(dp, U_src, U_trg, cbin, cwin, normalize, all_to_all, name, sav, again, periods)
    541 
    542             ccg_results=Parallel(n_jobs=num_cores, backend='threading')(\
--> 543                 delayed(ccg)(*ccg_inputs[i]) for i in tqdm(range(len(ccg_inputs)), desc=f'Computing ccgs over {num_cores} cores'))
    544             for ((i1, u1, i2, u2), CCG) in zip(ccg_ids,ccg_results):
    545                 if i1==i2:

~/miniconda3/envs/npyx/lib/python3.7/site-packages/joblib-1.0.1-py3.7.egg/joblib/parallel.py in __call__(self, iterable)
   1052 
   1053             with self._backend.retrieval_context():
-> 1054                 self.retrieve()
   1055             # Make sure that we get a last message telling us we are done
   1056             elapsed_time = time.time() - self._start_time

~/miniconda3/envs/npyx/lib/python3.7/site-packages/joblib-1.0.1-py3.7.egg/joblib/parallel.py in retrieve(self)
    931             try:
    932                 if getattr(self._backend, 'supports_timeout', False):
--> 933                     self._output.extend(job.get(timeout=self.timeout))
    934                 else:
    935                     self._output.extend(job.get())

~/miniconda3/envs/npyx/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

~/miniconda3/envs/npyx/lib/python3.7/multiprocessing/pool.py in worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
    119         job, i, func, args, kwds = task
    120         try:
--> 121             result = (True, func(*args, **kwds))
    122         except Exception as e:
    123             if wrap_exception and func is not _helper_reraises_exception:

~/miniconda3/envs/npyx/lib/python3.7/site-packages/joblib-1.0.1-py3.7.egg/joblib/_parallel_backends.py in __call__(self, *args, **kwargs)
    593     def __call__(self, *args, **kwargs):
    594         try:
--> 595             return self.func(*args, **kwargs)
    596         except KeyboardInterrupt as e:
    597             # We capture the KeyboardInterrupt and reraise it as

~/miniconda3/envs/npyx/lib/python3.7/site-packages/joblib-1.0.1-py3.7.egg/joblib/parallel.py in __call__(self)
    261         with parallel_backend(self._backend, n_jobs=self._n_jobs):
    262             return [func(*args, **kwargs)
--> 263                     for func, args, kwargs in self.items]
    264 
    265     def __reduce__(self):

~/miniconda3/envs/npyx/lib/python3.7/site-packages/joblib-1.0.1-py3.7.egg/joblib/parallel.py in <listcomp>(.0)
    261         with parallel_backend(self._backend, n_jobs=self._n_jobs):
    262             return [func(*args, **kwargs)
--> 263                     for func, args, kwargs in self.items]
    264 
    265     def __reduce__(self):

/media/maxime/ut_data/Dropbox/NeuroPyxels/npyx/corr.py in ccg(dp, U, bin_size, win_size, fs, normalize, ret, sav, verbose, periods, again, trains)
    258         if verbose: print("File {} not found in routines memory.".format(fn))
    259         crosscorrelograms = crosscorrelate_cyrille(dp, bin_size, win_size, sortedU, fs, True,
--> 260                                                    periods=periods, verbose=verbose, trains=trains)
    261         crosscorrelograms = np.asarray(crosscorrelograms, dtype='float64')
    262         if crosscorrelograms.shape[0]<len(U): # no spikes were found in this period

/media/maxime/ut_data/Dropbox/NeuroPyxels/npyx/corr.py in crosscorrelate_cyrille(dp, bin_size, win_size, U, fs, symmetrize, periods, verbose, trains)
     88     U=list(U)
     89 
---> 90     spike_times, spike_clusters = make_phy_like_spikeClustersTimes(dp, U, periods=periods, verbose=verbose, trains=trains)
     91 
     92     return crosscorr_cyrille(spike_times, spike_clusters, win_size, bin_size, fs, symmetrize)

/media/maxime/ut_data/Dropbox/NeuroPyxels/npyx/corr.py in make_phy_like_spikeClustersTimes(dp, U, periods, verbose, trains)
     46         for iu, u in enumerate(U):
     47             # Even lists of strings can be dealt with as integers by being replaced by their indices
---> 48             trains_dic[iu]=trn(dp, u, sav=True, periods=periods, verbose=verbose) # trains in samples
     49     else:
     50         assert len(trains)>1

/media/maxime/ut_data/Dropbox/NeuroPyxels/npyx/spk_t.py in trn(dp, unit, sav, verbose, periods, again, enforced_rp)
    106     if op.exists(Path(dprm,fn)) and not again:
    107         if verbose: print("File {} found in routines memory.".format(fn))
--> 108         train = np.load(Path(dprm,fn))
    109 
    110     # if not, compute it

~/miniconda3/envs/npyx/lib/python3.7/site-packages/numpy-1.21.0rc2-py3.7-linux-x86_64.egg/numpy/lib/npyio.py in load(file, mmap_mode, allow_pickle, fix_imports, encoding)
    443             # Try a pickle
    444             if not allow_pickle:
--> 445                 raise ValueError("Cannot load file containing pickled data "
    446                                  "when allow_pickle=False")
    447             try:

ValueError: Cannot load file containing pickled data when allow_pickle=False

The original error ValueError: cannot reshape array of size 0 into shape (12345,) doesn't show up anymore for some reason.

Maxime Beau
  • 688
  • 10
  • 6
  • could you load it just once and then duplicate it from the loaded one? -> load it before you create all the parallel threads – Mahrkeenerh Oct 07 '21 at 08:52
  • Not really, as in practice 'foo' is nested inside two other functions and there are hundreds of different 'myfile' are actually passed to foo – Maxime Beau Oct 07 '21 at 08:57
  • you could use a MUTEX to only allow one of the threads to load the file. – Mahrkeenerh Oct 07 '21 at 08:58
  • I do not believe that multiple processes reading the same file could be an issue. Sounds very very unlikely. – psarka Oct 07 '21 at 09:59
  • Wait, what, `backend="threading"`? These are not separate processes then, so ignore my above comment. I don't know joblib deals with threads, but I would not use them. Go for multiprocessing if possible. – psarka Oct 07 '21 at 10:02
  • Does your actual code also `np.save()` the data before you try to `np.load()` it? – Nils Werner Oct 13 '21 at 14:54
  • Thanks for your help - following @psarka advice, I got rid of the 'backend='threading' ' argument. Now, this error doesn't show up anymore, but one stating that I cannot load a file with pickled data with allow_pickle=False (and I am sure to save simple numpy arrays without pickled data!). Again, this error doesn't occurs when trying to load the array in debug mode. – Maxime Beau Oct 18 '21 at 10:50
  • @NilsWerner nope, no np.save in the actual loop – Maxime Beau Oct 18 '21 at 11:13
  • 1
    Make a new test case for the pickle load and post a new question. And look around, maybe there are some related answers, like this one: https://stackoverflow.com/questions/55890813/how-to-fix-object-arrays-cannot-be-loaded-when-allow-pickle-false-for-imdb-loa/56062555 – psarka Oct 18 '21 at 11:27
  • I didn't find the solution online, but will definitely pose a more focused question. Thanks a lot! – Maxime Beau Oct 18 '21 at 13:11

0 Answers0