0

I'm using a master-slaves structure to implement a parallel computation. A single master process (0) loads data, and distributes relevant chunks and instructions to slave processes (1-N) which do the heavy lifting, using large objects... blah blah blah. The issue is memory usage, which I'm monitoring using resource.getrusage(resource.RUSAGE_SELF).ru_maxrss on each slave process.

The first task uses about 6GB of memory, as expected, but when the slave receives the second task, it balloons up to just over 10GB --- as if the previous memory wasn't being collected. My understanding was that as soon as a variable looses its references (in the below code, when the _gwb variable is reset) garbage collection should clean house. Why isn't this happening?

Would throwing in a del _gwb at the end of each loop help?
What about a manual call to gc.collect()?
Or do I need to spawn subprocesses as described in this answer?

I'm using mpi4py on a SLURM managed cluster.

The master process looks something like:

for jj, tt in enumerate(times):

    for ii, sim in enumerate(sims):

        search = True
        # Find a slave to give this task to
        while search:
            # Repackage HDF5 data into dictionary to work with MPI
            sim_dat = ...  # load some data

            # Look for available slave process
            data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
            src = stat.Get_source()

            # Store Results
            if tag == TAGS.DONE:
                _store_slave_results(data, ...)
                num_done += 1
            elif tag == TAGS.READY:
                # Distribute tasks
                comm.send(sim_data, dest=src, tag=TAGS.START)
                # Stop searching, move to next task
                search = False

            cycles += 1

And the slaves:

while True:
    # Tell Master this process is ready
    comm.send(None, dest=0, tag=TAGS.READY)
    # Receive ``task`` ([number, gravPot, ndensStars])
    task = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat)
    tag = stat.Get_tag()

    if tag == TAGS.START:
        _gwb = Large_Data_Structure(task)
        data = _gwb.do_heavy_lifting(task)
        comm.send(data, dest=0, tag=TAGS.DONE)
    elif tag == TAGS.EXIT:
        break

    cycles += 1

Edit: Some other strange subtleties (in case they might be relevant):
1) only some processes show the memory growing, other stay roughly the same;
2) The specific amount of memory active is different on the different slave processes (differing by 100s of MB ... even though they should necessarily be running the same code!

Community
  • 1
  • 1
DilithiumMatrix
  • 17,795
  • 22
  • 77
  • 119

1 Answers1

1

del _gwb should make a big difference. With _gwb = Large_Data_Structure(task) the new data is generated and then assigned to _gwd. Only then is the old data released. A specific del will get rid of the object early. You may still see a memory increase for the second loop - python releases the object into its heap but there's nothing to say that the next allocation will get exactly the same bunch of memory.

The garbage collector only comes into play in cases where regular reference counting isn't sufficient to trigger freeing of the memory. Assuming do_heavy_lifting isn't doing anything funky, it won't make a difference.

You mention subprocess... another option on linux systems is os.fork. The child process gets a copy-on-write view of the parent address space. The big object is generated in the child memory and that goes away on exit. I can't guarantee this will work but would be an interesting experiment.

while True:
    # Tell Master this process is ready
    comm.send(None, dest=0, tag=TAGS.READY)
    # Receive ``task`` ([number, gravPot, ndensStars])
    task = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat)
    tag = stat.Get_tag()

    if tag == TAGS.START:
        pid = os.fork()
        if pid:
            # parent waits for child
            os.waitpid(pid)
        else:
            # child does work, sends results and exits
            _gwb = Large_Data_Structure(task)
            data = _gwb.do_heavy_lifting(task)
            comm.send(data, dest=0, tag=TAGS.DONE)
            os._exit()
    elif tag == TAGS.EXIT:
        break

    cycles += 1
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • Thanks! I'll give this a try. A couple of strange subtleties (in case they might be relevant): 1) only some processes show the memory growing, other stay roughly the same; 2) The specific amount of memory active is *different* on the different slave processes... even though they should necessarily be running the same code! – DilithiumMatrix Oct 13 '16 at 20:10
  • I can't say why that's the case. Your computed data may be sensitive to the incoming parameters (e.g., `range(count)` is different depending on whether count is 1 or 10000000). Just a guess. – tdelaney Oct 13 '16 at 20:23
  • Hmm, I can't find any place that data sizes should change between processes. I tried it with `del _gwb` and there is no change to the memory behavior... The `_gwb` object does store some references to external objects, e.g. `_gwb = Large_Data_Structure(task, other_obj)` and in the constructor of `_gwb`: `(self.other = other_obj)` ... could that keep the memory from being collected? – DilithiumMatrix Oct 13 '16 at 20:27
  • `_gwd` and `data` should both be deleted. Perhaps the `comm` stuff also needs to have data released. But unless this stuff is going into other containers I don't see in your code sample, that should be enough. – tdelaney Oct 13 '16 at 20:48