5

I'm trying to use a cluster of computers to run millions of small simulations. To do this I tried to set up two "servers" on my main computer, one to add input variables in a queue to the network and one to take care of the result.

This is the code for putting stuff into the simulation variables queue:

"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


class MultiComputers(Process):
    def __init__(self, sim_name, queue):
        self.sim_name = sim_name
        self.queue = queue
        super(MultiComputers, self).__init__()

    def get_sim_obj(self, offset, db):
        """returns a list of lists from a database query"""

    def handle_queue(self):
        self.sim_nr = 0
        sims = self.get_sim_obj()
        self.total = len(sims)
        while len(sims) > 0:
            if self.queue.qsize() > 100:
                self.queue.put(sims[0])
                self.sim_nr += 1
                print(self.sim_nr, round(self.sim_nr/self.total * 100, 2), self.queue.qsize())
                del sims[0]

    def run(self):
        self.handle_queue()

if __name__ == '__main__':
    freeze_support()
    queue = Queue()
    w = MultiComputers('seed_1_hundred', queue)
    w.start()
    QueueManager.register('get_queue', callable=lambda: queue)
    m = QueueManager(address=('', 8001), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

And then is this queue run to take care of the results of the simulations:

__author__ = 'axa'
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.managers import BaseManager
import time


class QueueManager(BaseManager):
    pass


class SaveFromMultiComp(Process):
    def __init__(self, sim_name, queue):
        self.sim_name = sim_name
        self.queue = queue
        super(SaveFromMultiComp, self).__init__()

    def run(self):
        res_got = 0
        with open('sim_type1_' + self.sim_name, 'a') as f_1:
            with open('sim_type2_' + self.sim_name, 'a') as f_2:
                while True:
                    if self.queue.qsize() > 0:
                        while self.queue.qsize() > 0:
                            res = self.queue.get()
                            res_got += 1
                            if res[0] == 1:
                                f_1.write(str(res[1]) + '\n')
                            elif res[0] == 2:
                                f_2.write(str(res[1]) + '\n')
                            print(res_got)
                    time.sleep(0.5)


if __name__ == '__main__':
    queue = Queue()
    w = SaveFromMultiComp('seed_1_hundred', queue)
    w.start()
    m = QueueManager(address=('', 8002), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

These scripts works as expected for handling the first ~7-800 simulations, after that I get the following error in the terminal running the receiving result script:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python35\lib\multiprocessing\managers.py", line 177, in accepter
    t.start()
  File "C:\Python35\lib\threading.py", line 844, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

Can anyone give som insights in where and how the threads are spawned, is a new thread spawned every time I call queue.get() or how does it work? And I would be very glad if someone knows what I can do to avoid this failure? (i'm running the script with Python3.5-32)

Jeremy Friesner
  • 70,199
  • 15
  • 131
  • 234
axel_ande
  • 359
  • 1
  • 4
  • 20
  • Why have you chosen not to use a queue instance returned from a call to `queue = manager.Queue()` within a `with multiprocessing.Manager() as manager:` block? – Booboo Aug 25 '20 at 10:09
  • You may give a try using pool: https://stackoverflow.com/a/62396445/3890560 – fghoussen Aug 30 '20 at 16:38
  • You may give a try using pool: https://stackoverflow.com/a/62396445/3890560 – fghoussen Aug 30 '20 at 16:40

2 Answers2

4

All signs point to your system being out of resources it needs to launch a thread (probably memory, but you could be leaking threads or other resources). You could use OS system monitoring tools (top for Linux, Resource Monitor for windows) to look at the number of threads and memory usage to track this down, but I would recommend you just use an easier, more efficient programming pattern.

While not a perfect comparison, you generally are seeing the C10K problem and it states that blocking threads waiting for results do not scale well and can be prone to leaking errors like this. The solution was to implement Async IO patterns (one blocking thread that launches other workers) and this is pretty straight forward to do in Web Servers.

A framework like pythons aiohttp should be a good fit for what you want. You just need a handler that can get the ID of the remote code and the result. The framework should hopefully take care of the scaling for you.

So in your case you can keep your launching code, but after it starts the process on the remote machine, kill the thread. Have the remote code then send an HTTP message to your server with 1) its ID and 2) its result. Throw in a little extra code to ask it to try again if it does not get a 200 'OK' Status code and you should be in much better shape.

Liam Kelly
  • 3,524
  • 1
  • 17
  • 41
0

I think you have to many Threads running for your system. I would first check your system ressources and then rethink my Program. Try limiting your threads and use as few as possible.