0

I am going to parallelizing the function train() to make different clients run in parallel and return the output as an array with object [weight, loss]:

    pool = mp.Pool(mp.cpu_count())
    a = [pool.apply_async(train, (model[i], remote_torch[i], train_loader_ptr[i], epoch, args, train_data_length[i], clients[i], wglob)) for i in range(cnum)]
    output = [out.get() for out in a]

However, it throws the following error when I compile and getting stuck at the output array:

Starting Training 
--------------------------------------------------------------------------- TypeError                                 Traceback (most recent call last) Cell In [24], line 11
      9 pool = mp.Pool(mp.cpu_count())
     10 a = [pool.apply_async(train, (model[i], remote_torch[i], train_loader_ptr[i], epoch, args, train_data_length[i], clients[i], wglob)) for i in range(cnum)]
---> 11 output = [out.get() for out in a]
     12 for i in range(cnum):
     13     for wlocal, loss in output:

Cell In [24], line 11, in <listcomp>(.0)
      9 pool = mp.Pool(mp.cpu_count())
     10 a = [pool.apply_async(train, (model[i], remote_torch[i], train_loader_ptr[i], epoch, args, train_data_length[i], clients[i], wglob)) for i in range(cnum)]
---> 11 output = [out.get() for out in a]
     12 for i in range(cnum):
     13     for wlocal, loss in output:

File ~\anaconda\envs\env\lib\multiprocessing\pool.py:771, in ApplyResult.get(self, timeout)
    769     return self._value
    770 else:
--> 771     raise self._value

File ~\anaconda\envs\env\lib\multiprocessing\pool.py:537, in Pool._handle_tasks(taskqueue, put, outqueue, pool, cache)
    535     break
    536 try:
--> 537     put(task)
    538 except Exception as e:
    539     job, idx = task[:2]

File ~\anaconda\envs\env\lib\multiprocessing\connection.py:206, in
_ConnectionBase.send(self, obj)
    204 self._check_closed()
    205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))

File ~\anaconda\envs\env\lib\multiprocessing\reduction.py:51, in ForkingPickler.dumps(cls, obj, protocol)
     48 @classmethod
     49 def dumps(cls, obj, protocol=None):
     50     buf = io.BytesIO()
---> 51     cls(buf, protocol).dump(obj)
     52     return buf.getbuffer()

File stringsource:2, in zmq.backend.cython.socket.Socket.__reduce_cython__()

TypeError: no default __reduce__ due to non-trivial __cinit__

What happen? Does the output get the result too early to cause this problem?

  • The multiprocessing library normally uses serialisation via pickle for interprocess communication. It's common to see errors involving \_\_reduce\_\_ if an object isn't [pickeable](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled). See e.g. [this question](https://stackoverflow.com/q/52600240/984421). – ekhumoro Nov 05 '22 at 13:07

0 Answers0