I am going to parallelizing the function train() to make different clients run in parallel and return the output as an array with object [weight, loss]:
pool = mp.Pool(mp.cpu_count())
a = [pool.apply_async(train, (model[i], remote_torch[i], train_loader_ptr[i], epoch, args, train_data_length[i], clients[i], wglob)) for i in range(cnum)]
output = [out.get() for out in a]
However, it throws the following error when I compile and getting stuck at the output array:
Starting Training
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In [24], line 11
9 pool = mp.Pool(mp.cpu_count())
10 a = [pool.apply_async(train, (model[i], remote_torch[i], train_loader_ptr[i], epoch, args, train_data_length[i], clients[i], wglob)) for i in range(cnum)]
---> 11 output = [out.get() for out in a]
12 for i in range(cnum):
13 for wlocal, loss in output:
Cell In [24], line 11, in <listcomp>(.0)
9 pool = mp.Pool(mp.cpu_count())
10 a = [pool.apply_async(train, (model[i], remote_torch[i], train_loader_ptr[i], epoch, args, train_data_length[i], clients[i], wglob)) for i in range(cnum)]
---> 11 output = [out.get() for out in a]
12 for i in range(cnum):
13 for wlocal, loss in output:
File ~\anaconda\envs\env\lib\multiprocessing\pool.py:771, in ApplyResult.get(self, timeout)
769 return self._value
770 else:
--> 771 raise self._value
File ~\anaconda\envs\env\lib\multiprocessing\pool.py:537, in Pool._handle_tasks(taskqueue, put, outqueue, pool, cache)
535 break
536 try:
--> 537 put(task)
538 except Exception as e:
539 job, idx = task[:2]
File ~\anaconda\envs\env\lib\multiprocessing\connection.py:206, in
_ConnectionBase.send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
File ~\anaconda\envs\env\lib\multiprocessing\reduction.py:51, in ForkingPickler.dumps(cls, obj, protocol)
48 @classmethod
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
File stringsource:2, in zmq.backend.cython.socket.Socket.__reduce_cython__()
TypeError: no default __reduce__ due to non-trivial __cinit__
What happen? Does the output get the result too early to cause this problem?