1

I'm coding a ZeroMQ-based server which needs to instantiate some workers (defined by their own scripts) with whom I'm communicating through multiprocessing.Queue-s.

Basically, I have :
- one main class handling all the communication with the other environments in which there is a lot of things, including in particular :
- a list of workers, each of them getting some instructions through a queue.

The fun part is : I need to have several processes able to communicate to these workers in parallel. (let's say, to implement a "safe stop" function for instance)

Everything works perfectly fine on Linux, but I get a lot of issue on windows. In particular I'm getting this error which seems to come from multiprocessing.spawn.py :

no default __reduce__ due to non-trivial __cinit__

I reproduced the error with this minimum working code

from multiprocessing import Process
import numpy as np
import zmq
import time

class myClass():
    def __init__(self):
        self.context = zmq.Context()
        #many zmq stuff
    def foo(self, bar):
        print( bar )
    def run(self):
        while True :
            time.sleep(1)
            a = np.array([1,2,3])
            dico = {"a":a}
            Process(target=self.foo, args=(dico,)).start()

if __name__ == "__main__":
    b = myClass()
    b.run()

I've looked it up, and somehow its seems that I need to redefine the context each time I call a "run", which can't be done because I need to send a lot of data through these queues at high speed.

If anyone has a clue on what to do...

user3666197
  • 1
  • 6
  • 50
  • 92
jpaparone
  • 11
  • 1

1 Answers1

0

If anyone has a clue on what to do...

First, welcome to the domain of Zen-of-Zero

In case one has never worked with ZeroMQ,
one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"
before diving into further details


If problem appears in Windows only ( not in linux-class O/S-es ) one would suffer a temptation to propose the most straightforward step, not to ...

Yet, I will resist such a temptation and will start with root-cause isolation steps:

Windows-class O/S-es use other form of process-instantiations in multiprocessing

The Best Next Step : evaluate your whole code-execution eco-system :

may use the below placed testing-template to add further on your class-related experiments.

The ZeroMQ part is bothering me. Given the Context()-instance gets created during .__init__() the multiprocessing-spawned-Process() shall on Win-class O/S-es do a top-down full-copy of the calling-process ( yes, the whole python-interpreter-process, copying all the vars, interpreter's resources' state-full data, ... windows-mandatory spawn-method ( not any non-win available efficiency of fork, forserver available in win ) simply copies everything - Context()-instance included... )

If Process-instances remain semi-persistent ( and may remain such for potentially future re-uses ) throughout the life-time of the __main__, the Context()-instance remains in the state of the pre-spawned .__init__()-call and is trying to command a "shared"-Context()-thread replicas ( hidden inside each of the __main__ process copies ). So far no collision, yet the #many zmq stuff may cause the problem, as ZeroMQ .Socket()-instances are not thread-safe ( as if often warned in the API documentation ) but are also State-Full-entities, so "replicated"-full-copies may easily turn things wreck havoc

ToDo : try to document and isolate the problem-evolution -

best by POSACK-reporting (almost)-each line executed, getting down to the very last-line executed & reported before the crash ( the above presented post-mortem Traceback is too ambiguous to decide. __cinit__ may relate to zillions places, where it did actually fail )

this template may help doing that:

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect                                      # 

def test_function( i = -1 ):
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                                                callerframerecord = inspect.stack()[1] # 1 represents line at caller
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )
    pass;                                                               _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i ),
           "invoked from {0:}()-LINE[{1:_>4d}]".format(                 _CALLER_.function, _CALLER_.lineno )
            )
    time.sleep( 10 )
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )                 # 1 represents line at caller
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i )
            )

if __name__ == '__main__':
    print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
    print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
    print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
    print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
    print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
    #------mp.Pool()-----------------------------------------------------
    pool = mp.Pool( mp.cpu_count() )
    print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
    #---.map()--------------------------------------?
    #---.map(                                       ?
    pool.map( test_function, [i for i in range(4) ] )
    #---.map(                                       ?
    #---.map()--------------------------------------?
    print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
    pool.close()
    #---.close()
    print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
    pool.join()
    #---.join()
    print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic()          ) )
    print( "EXECUTED on {0:}".format(              platform.version()        ) )
    print( "USING: python-{0:}:".format(           platform.python_version() ) )

might look about something like this on linux-class O/S:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d

EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:
user3666197
  • 1
  • 6
  • 50
  • 92
  • Thanks for your answer ! It does crash precisely whilst launching Process(...).start(). The function is not even called, the sole fact of starting the process causes the crash. Maybe I'm not using the right tool. Basically I need to have many processes able to put scripts in the queue of many processes. Maybe I need to start all over again with another lib ? ZeroMQ is only used at a higher level from now on. – jpaparone Nov 15 '19 at 10:38
  • @jpaparone It was my initial impression that ZeroMQ is not the root-cause of the issue. The SER/DES-Pickle phase is my suspect to search further - may read more insights about the Windows troubles with spawn()-method of instantiation sub-processes here : https://stackoverflow.com/a/58869967/3666197 – user3666197 Nov 15 '19 at 12:01