3

First, I'd like to thank the StackOverflow community for the tremendous help it provided me over the years, without me having to ask a single question. I could not find anything that I can relate to my problem, though it is probably due to my lack of understanding of the subject, rather than the absence of a response on the website. My apologies in advance if this is a duplicate.

I am relatively new to multiprocess; some time ago I succeeded in using multiprocessing.pools in a very simple way, where I didn't need any feedback between the child processes. Now I am facing a much more complicated problem, and I am just lost in the documentation about multiprocessing. I hence ask for you help, your kindness and your patience.

I am trying to build a parallel tempering monte-carlo algorithm, from a class.

The basic class very roughly goes as follows:

import numpy as np

class monte_carlo:

    def __init__(self):
        self.x=np.ones((1000,3))
        self.E=np.mean(self.x)
        self.Elist=[]

    def simulation(self,temperature):
        self.T=temperature
        for i in range(3000):
            self.MC_step()
            if i%10==0:
                self.Elist.append(self.E)
        return

    def MC_step(self):
        x=self.x.copy()
        k = np.random.randint(1000)
        x[k] = (x[k] + np.random.uniform(-1,1,3))
        temp_E=np.mean(self.x)
        if np.random.random()<np.exp((self.E-temp_E)/self.T):
            self.E=temp_E
            self.x=x
        return

Obviously, I simplified a great deal (actual class is 500 lines long!), and built fake functions for simplicity: __init__ takes a bunch of parameters as arguments, there are many more lists of measurement else than self.Elist, and also many arrays derived from self.X that I use to compute them. The key point is that each instance of the class contains a lot of informations that I want to keep in memory, and that I don't want to copy over and over again, to avoid dramatic slowing down. Else I would just use the multiprocessing.pool module.

Now, the parallelization I want to do, in pseudo-code:

def proba(dE,pT):
    return np.exp(-dE/pT)  
          
Tlist=[1.1,1.2,1.3]
N=len(Tlist)
G=[]
for _ in range(N):
    G.append(monte_carlo())

for _ in range(5):

    for i in range(N): # this loop should be ran in multiprocess
        G[i].simulation(Tlist[i])
    
    for i in range(N//2): 
        dE=G[i].E-G[i+1].E
        pT=G[i].T + G[i+1].T
        p=proba(dE,pT) # (proba is a function, giving a probability depending on dE)
        if np.random.random() < p: 
             T_temp = G[i].T
             G[i].T = G[i+1].T
             G[i+1].T = T_temp

Synthesis: I want to run several instances of my monte-carlo class in parallel child processes, with different values for a parameter T, then periodically pause everything to change the different T's, and run again the child processes/class instances, from where they paused. Doing this, I want each class-instance/child-process to stay independent from one another, save its current state with all internal variables while it is paused, and do as few copies as possible. This last point is critical, as the arrays inside the class are quite big (some are 1000x1000), and a copy will therefore very quickly become quite time-costly.

Thanks in advance, and sorry if I am not clear...

Edit: I am using a distant machine with many (64) CPUs, running on Debian GNU/Linux 10 (buster).

Edit2: I made a mistake in my original post: in the end, the temperatures must be exchanged between the class-instances, and not inside the global Tlist.

Edit3: Charchit answer works perfectly for the test code, on both my personal machine and the distant machine I am usually using for running my codes. I hence check this as the accepted answer. However, I want to report here that, inserting the actual, more complicated code, instead of the oversimplified monte_carlo class, the distant machine gives me some strange errors:

Unable to init server: Could not connect: Connection refused

(CMC_temper_all.py:55509): Gtk-WARNING **: ##:##:##:###: Locale not supported by C library.
    Using the fallback 'C' locale.
Unable to init server: Could not connect: Connection refused

(CMC_temper_all.py:55509): Gdk-CRITICAL **: ##:##:##:###: 

gdk_cursor_new_for_display: assertion 'GDK_IS_DISPLAY (display)' failed

(CMC_temper_all.py:55509): Gdk-CRITICAL **: ##:##:##:###: gdk_cursor_new_for_display: assertion 'GDK_IS_DISPLAY (display)' failed

The "##:##:##:###" are (or seems like) IP adresses. Without the call to set_start_method('spawn') this error shows only once, in the very beginning, while when I use this method, it seems to show at every occurrence of result.get()... The strangest thing is that the code seems otherwise to work fine, does not crash, produces the datafiles I then ask it to, etc...

I think this would deserve to publish a new question, but I put it here nonetheless in case someone has a quick answer. If not, I will resort to add one by one the variables, methods, etc... that are present in my actual code but not in the test example, to try and find the origin of the bug. My best guess for now is that the memory space required by each child-process with the actual code, is too large for the distant machine to accept it, due to some restrictions implemented by the admin.

Aves
  • 33
  • 6
  • Can you expand on "do as few copies as possible"?. I am unsure what you mean by that. Do you want that after the child processes have "paused" you want something like a quick resume (after altering variables within them), which saves you the need to create new child processes? – Charchit Agarwal Jun 27 '22 at 13:20
  • That's exactly right. I mention copies because it seems to be the solution that people usually propose for these kind of problems, in particular using a pool; namely, running a pool, retrieve all the variables issued by each instance, do the "communication between instances" task I want, then declare class instances again, inserting formerly retrieved arrays. But the loop 'for _ in range(5)' will have to be ran several thousand times, so copying twice each time the huge arrays (that I need to keep in memory for my class to function) will accumulate a important time cost. And is not elegant... – Aves Jun 27 '22 at 13:31
  • Are the number of instances you need to create of `monte_carlo` a very big number too? – Charchit Agarwal Jun 27 '22 at 13:43
  • Well, I'd wish it to be between 20 and 30; 10 at worse. I specify that I have access to a workstation with 64 CPU. – Aves Jun 27 '22 at 13:46
  • Are you ok with using libraries outside the builtins? – Charchit Agarwal Jun 27 '22 at 13:52
  • I'm not sure about this... I do not have admin permissions on this workstation. But if you tell me what library you're thinking about, I can immediately check if the admin installed it already (which is not impossible). – Aves Jun 27 '22 at 13:59
  • [multiprocess](https://pypi.org/project/multiprocess/), a fork of multiprocessing. Without this your idea would be needlessly complicated and inefficient – Charchit Agarwal Jun 27 '22 at 14:00
  • Well, this library isn't installed in the workstation... I will try to convince the admin. Meanwhile, without this library, I suppose you would suggest to resort to copying each time 'monte_carlo' as finished its job? – Aves Jun 27 '22 at 14:15
  • Without this library, you most likely won't be able to pickle instances of `monte_carlo`. If you can't pickle them, you won't be able to send them to different processes and parallelize work (unless you are ok with **creating** the instances in another process as well, and using queues to pass and change values inside; not very efficient). – Charchit Agarwal Jun 27 '22 at 14:22
  • Why is that last solution not very efficient? – Aves Jun 27 '22 at 14:31
  • because you will have to work with queues if need to change any value in the instances. So if you want to access any value, first you would want a thread running inside the created processes that continuously polls the queue to check for new commands. Then you put a command in the queue to get a particular value from inside the main process, and then wait for a response. The thread inside the child process will receive that and put the value inside the queue. Compare all that to simply getting the value using `G[i].E` (for example), see how much better and less complicated that is? – Charchit Agarwal Jun 27 '22 at 15:11
  • Multiprocessing has shared memory options as well but it is much more restrictive in the data it can hold – Charchit Agarwal Jun 27 '22 at 15:12
  • Well, thanks anyway for your help! As I see it, the solution I have left (if my admin refuse to install the multiprocess fork) is to create each instance in the main program, run each one in parallel processes (which create temporary ghost class-instances), then copy the outcomes into the global instances, change the T's, and loop again... But this amounts to copy, at each iteration, a great number of huge arrays (self.x and others, 2*20 times)... Exactly what I wanted to avoid, but I dont have much choice, have I? Did I understand correctly? – Aves Jun 27 '22 at 15:57
  • I think using comments to explain this is getting messy, I'll try and write an answer – Charchit Agarwal Jun 27 '22 at 16:42
  • Good news (for me and maybe other who might be interested in the answer): I just learnt that I was able to install libraries on my personal session of the workstation! I am hence opened to the use of the fork of multiprocessing, multiprocess. Sorry for the late realization... – Aves Jun 27 '22 at 18:58
  • which os are you using? I'll post an answer that will work with both multiprocess and multiprocessing – Charchit Agarwal Jun 28 '22 at 11:38
  • The distant machine I am using runs on Debian GNU/Linux 10 (buster). I edit my post accordingly. Thanks in advance – Aves Jun 28 '22 at 12:16

1 Answers1

2

What you are looking for is sharing state between processes. As per the documentation, you can either create shared memory, which is restrictive about the data it can store and is not thread-safe, but offers better speed and performance; or you can use server processes through managers. The latter is what we are going to use since you want to share whole objects of user-defined datatypes. Keep in mind that using managers will impact speed of your code depending on the complexity of the arguments that you pass and receive, to and from the managed objects.

Managers, proxies and pickling

As mentioned, managers create server processes to store objects, and allow access to them through proxies. I have answered a question with better details on how they work, and how to create a suitable proxy here. We are going to use the same proxy defined in the linked answer, with some variations. Namely, I have replaced the factory functions inside the __getattr__ to something that can be pickled using pickle. This means that you can run instance methods of managed objects created with this proxy without resorting to using multiprocess. The result is this modified proxy:

from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result

Solution

Now we only need to make sure that when we are creating objects of monte_carlo, we do so using managers and the above proxy. For that, we create a class constructor called create. All objects for monte_carlo should be created with this function. With that, the final code looks like this:

from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result


class monte_carlo:

    def __init__(self, ):
        self.x = np.ones((1000, 3))
        self.E = np.mean(self.x)
        self.Elist = []
        self.T = None

    def simulation(self, temperature):
        self.T = temperature
        for i in range(3000):
            self.MC_step()
            if i % 10 == 0:
                self.Elist.append(self.E)
        return

    def MC_step(self):
        x = self.x.copy()
        k = np.random.randint(1000)
        x[k] = (x[k] + np.random.uniform(-1, 1, 3))
        temp_E = np.mean(self.x)
        if np.random.random() < np.exp((self.E - temp_E) / self.T):
            self.E = temp_E
            self.x = x
        return

    @classmethod
    def create(cls, *args, **kwargs):
        # Register class
        class_str = cls.__name__
        BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
        # Start a manager process
        manager = BaseManager()
        manager.start()

        # Create and return this proxy instance. Using this proxy allows sharing of state between processes.
        inst = eval("manager.{}(*args, **kwargs)".format(class_str))
        return inst


def proba(dE,pT):
    return np.exp(-dE/pT)


if __name__ == "__main__":
    Tlist = [1.1, 1.2, 1.3]
    N = len(Tlist)
    G = []

    # Create our managed instances
    for _ in range(N):
        G.append(monte_carlo.create())

    for _ in range(5):

        #  Run simulations in the manager server
        results = []
        with Pool(8) as pool:

            for i in range(N):  # this loop should be ran in multiprocess
                results.append(pool.apply_async(G[i].simulation, (Tlist[i], )))

            # Wait for the simulations to complete
            for result in results:
                result.get()

        for i in range(N // 2):
            dE = G[i].E - G[i + 1].E
            pT = G[i].T + G[i + 1].T
            p = proba(dE, pT)  # (proba is a function, giving a probability depending on dE)
            if np.random.random() < p:
                T_temp = Tlist[i]
                Tlist[i] = Tlist[i + 1]
                Tlist[i + 1] = T_temp

    print(Tlist)

This meets the criteria you wanted. It does not create any copies at all, rather, all arguments to the simulation method call are serialized inside the pool and sent to the manager server where the object is actually stored. It gets executed there, and the results (if any) are serialized and returned in the main process. All of this, with only using the builtins!

Output

[1.2, 1.1, 1.3]

Edit

Since you are using Linux, I encourage you to use multiprocessing.set_start_method inside the if __name__ ... clause to set the start method to "spawn". Doing this will ensure that the child processes do not have access to variables defined inside the clause.

Charchit Agarwal
  • 2,829
  • 2
  • 8
  • 20
  • Incredible! I yet have to try it on the larger version of the code, but it seems to be exactly what I was looking for. I however realize now that the dummy code I put in my question had a important mistake: the three lines where the temperatures are exchanged. In fact, they need to be exchanged inside the class instances! It should be clearer looking at my edit. I think it does not change the efficiency of your solution, except maybe for a slight slow down due to exchange of a bunch of float between processes... – Aves Jun 28 '22 at 13:21
  • Is there a particular reason why you set to 8 the number of pools? It seems to me that there are here only 3 child-processes – Aves Jun 28 '22 at 13:36
  • (Sorry for multiple comments) Also, I am curious about the importance of setting in `__init__` the parameter `self.T=None`, when it will anyway be declared later in the method `simulation`; is it simply a matter of good practice, to avoid the call by mistake of an undeclared variable (which should not happen if one uses the class in the intended manner, but still), or is there a deeper reason? – Aves Jun 28 '22 at 13:44
  • 1. The edit in your code should not really change anything, just correct it in this answer. 2. I set it to 8 from habit, it would be more apt to set it to 3 for this example. But if you have more than 8 tasks that need to be run then set it to the number of cores your pc has. 3. It's just good practice to define all instance attributes in the `__init__` function to help other's looking at your code understand what all variables the class uses. – Charchit Agarwal Jun 28 '22 at 14:55
  • Hi again! The above code works just fine, both on my personal machine and on the remote server I am usually using. Of course, the gain in performance relative to not multiprocessing can only be seen when the number of calls to 'MC_step' is large enough; nothing surprising here. I hence check this as accepted answer. However, I report that, when I replace the simplified class by my actual, much more complicated class, on the remote server (but not on my personal machine), I get some strange errors. This is too long for a comment, so I edit my question to include this. – Aves Jun 29 '22 at 11:25
  • @Aves seems like a separate problem regarding a different section of your code. It's better to ask a new question imo. – Charchit Agarwal Jun 29 '22 at 14:40