I would like to pass messages out from my function running in a process pool while the function is still running.
My application uses asyncio and multiprocessing queues to receive and distribute messages to a worker pool using asyncio.run_in_executor()
. I manually created the pool so I could provide an initializer.
The problem I have is that I would like the functions that are running in the executor pool to be able to send messages out to the asyncio loop. This is how I started my new application process:
self._application = Application(self.outgoing_queue, self.incoming_queue, application_cores, log_level=logging.INFO)
self._application_process = mp.Process(target=self._application.run)
self._application_process.start()
the queues are from:
self.outgoing_queue = mp.Queue()
self.incoming_queue = mp.Queue()
I can't use my asyncio queue, or multiprocessing queue since those can't be passed to the process by this method:
async def run_operation():
kwargs = {
'out_queue': self._work_pool_queue
}
func = functools.partial(attribute, *args, **kwargs)
result = await self._loop.run_in_executor(self._work_pool, func)
result_msg = common.messages.MessageResult(result, msg.reply_id, msg.cpu_cost)
await self.outgoing_send(result_msg)
asyncio.create_task(run_operation())
My self._work_pool
is created with:
self._work_pool = concurrent.futures.ProcessPoolExecutor(max_workers=self._cores, initializer=_work_pool_init)
since the following traceback results:
Task exception was never retrieved
future: <Task finished coro=<Application.message_router.<locals>.run_operation() done, defined at c:\users\brian\gitlab\rf-applications\rfapplications\common\application.py:95> exception=RuntimeError('Queue objects should only be shared between processes through inheritance')>
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "c:\Program Files\Python37\lib\multiprocessing\queues.py", line 236, in _feed
obj = _ForkingPickler.dumps(obj)
File "c:\Program Files\Python37\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "c:\Program Files\Python37\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "c:\Program Files\Python37\lib\multiprocessing\context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
"""
I was looking at using a Manager().Queue() (https://docs.python.org/3/library/multiprocessing.html#managers) since those can be sent to the process pool (Python multiprocessing Pool Queues communication). However, these queues seem to open up the possibility of remote connections, which I would like to avoid (I use secure websockets to communicate between remote machines right so far).