1

I would like to pass messages out from my function running in a process pool while the function is still running.

My application uses asyncio and multiprocessing queues to receive and distribute messages to a worker pool using asyncio.run_in_executor(). I manually created the pool so I could provide an initializer.

The problem I have is that I would like the functions that are running in the executor pool to be able to send messages out to the asyncio loop. This is how I started my new application process:

self._application = Application(self.outgoing_queue, self.incoming_queue, application_cores, log_level=logging.INFO)
self._application_process = mp.Process(target=self._application.run)
self._application_process.start()

the queues are from:

self.outgoing_queue = mp.Queue()
self.incoming_queue = mp.Queue()

I can't use my asyncio queue, or multiprocessing queue since those can't be passed to the process by this method:

async def run_operation():
    kwargs = {
        'out_queue': self._work_pool_queue
    }
    func = functools.partial(attribute, *args, **kwargs)

    result = await self._loop.run_in_executor(self._work_pool, func)
    result_msg = common.messages.MessageResult(result, msg.reply_id, msg.cpu_cost)
    await self.outgoing_send(result_msg)

asyncio.create_task(run_operation())

My self._work_pool is created with:

self._work_pool = concurrent.futures.ProcessPoolExecutor(max_workers=self._cores, initializer=_work_pool_init)

since the following traceback results:

Task exception was never retrieved
future: <Task finished coro=<Application.message_router.<locals>.run_operation() done, defined at c:\users\brian\gitlab\rf-applications\rfapplications\common\application.py:95> exception=RuntimeError('Queue objects should only be shared between processes through inheritance')>
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "c:\Program Files\Python37\lib\multiprocessing\queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "c:\Program Files\Python37\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "c:\Program Files\Python37\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "c:\Program Files\Python37\lib\multiprocessing\context.py", line 356, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
"""

I was looking at using a Manager().Queue() (https://docs.python.org/3/library/multiprocessing.html#managers) since those can be sent to the process pool (Python multiprocessing Pool Queues communication). However, these queues seem to open up the possibility of remote connections, which I would like to avoid (I use secure websockets to communicate between remote machines right so far).

Brian
  • 642
  • 7
  • 18
  • Hi @Brian. Have you consider to use Airflow to your process? It resolves the manly problems with processing architeture and it's realy easy to manipulate. Take a look: https://airflow.apache.org/docs/stable/start.html – Michel Guimarães Apr 19 '20 at 15:54
  • I'll take a look at it, thanks. I'm still on the steep part of the multiprocessing learning curve! – Brian Apr 19 '20 at 16:02
  • @MichelGuimarães that has nothing to do with the question or helps in any way. OP can you post full traceback and the code – gold_cy Apr 19 '20 at 16:16
  • I added more detail as requested. I can provide more specifics if you let me know. – Brian Apr 19 '20 at 16:24

0 Answers0