0

I'm currently setting up a automated simulation pipeline for OpenFOAM (CFD library) using the PyFoam library within Python to create a large database for machine learning purposes. The database will have around 500k distinct simulations. To run this pipeline on multiple machines, I'm using the multiprocessing.Pool.starmap_async(args) option which will continually start a new simulation once the old simulation has completed.

However, since some of the simulations might / will crash, I want to generate a textfile with all cases which have crashed.

I've already found this thread which implements the multiprocessing.Manager.Queue() and adds a listener but I failed to get it running with starmap_async(). For my testing I'm trying to print the case name for any simulation which has been completed but currently only one entry is written into the text file instead of all of them (the simulations all complete successfully).

So my question is how can I write a message to a file for each simulation which has completed.

The current code layout looks roughly like this - only important snipped has been added as the remaining code can't be run without OpenFOAM and additional customs scripts which were created for the automation.

Any help is highly appreciated! :)

from PyFoam.Execution.BasicRunner import BasicRunner
from PyFoam.Execution.ParallelExecution import LAMMachine

import numpy as np
import multiprocessing
import itertools
import psutil

# Defining global variables
manager = multiprocessing.Manager()
queue = manager.Queue()

def runCase(airfoil, angle, velocity):
    # define simulation name
    newCase = str(airfoil) + "_" + str(angle) + "_" + str(velocity)


    '''
        A lot of pre-processing commands to prepare the simulation
        which has been removed from snipped such as generate geometry, create mesh etc...
    '''


    # run simulation
    machine = LAMMachine(nr=4) # set number of cores for parallel execution
    simulation = BasicRunner(argv=[solver, "-case", case.name], silent=True, lam=machine, logname="solver")
    simulation.start() # start simulation

    # check if simulation has completed
    if simulation.runOK():  
        # write message into queue
        queue.put(newCase)
    if not simulation.runOK():
        print("Simulation did not run successfully")
        
def listener(queue):
    fname = 'errors.txt'
    msg = queue.get()

    while True:
        with open(fname, 'w') as f:
            if msg == 'complete':
                break
            f.write(str(msg) + '\n')
    
def main():
    # Create parameter list
    angles = np.arange(-5, 0, 1)
    machs = np.array([0.15])
    nacas = ['0012']
    paramlist = list(itertools.product(nacas, angles, np.round(machs, 9)))

    # create number of processes and keep 2 cores idle for other processes
    nCores = psutil.cpu_count(logical=False) - 2
    nProc = 4
    nProcs = int(nCores / nProc)

    with multiprocessing.Pool(processes=nProcs) as pool:
        pool.apply_async(listener, (queue,)) # start the listener
        pool.starmap_async(runCase, paramlist).get() # run parallel simulations

    queue.put('complete') 

    pool.close()
    pool.join()


if __name__ == '__main__':
        main()
montju
  • 3
  • 2
  • For starters, it doesn't look like you have any `async` code. So you should use `apply` and `starmap` instead of the `async` versions. I'm not sure what the `get()` method on `starmap_async` is doing. – craigb Oct 23 '22 at 00:08
  • There are issues with `listener()` too: it only reads the queue once, so that should be inside the loop. Also, there's a race condition: "complete" is sent immediately, before the `runCase` messages get sent, so you need a better way to finish up; eg: just count the number of messages and return when it is equal to `len(paramList)`, but you'll need to write an error message to the queue so every `runCase()` sends exactly one message, even in the error case. – craigb Oct 23 '22 at 00:18
  • what is your os? – Charchit Agarwal Oct 23 '22 at 07:42
  • @craigb async versions of starmap and apply are the same as doing the non-async version and inlining the .get() call. There doesn't have to be actual async code for them to be used. Also, the listener is already in a loop, it will only break if the msg is put is "complete". Not sure what race condition you are referring to here either since the line `queue.put("complete")` will only execute after all simulations have finished (because of the inline-ed `.get()` call). – Charchit Agarwal Oct 23 '22 at 07:49
  • @CharchitAgarwal the code will run on ubuntu and scientific linux servers. The code is currently tested on Ubuntu 22.04.1 LTS – montju Oct 23 '22 at 10:42
  • @craigb the `get()` method on `starmap_async` is used to create the simulation folders for each case - so for each `runCase` i will clone a template directory and run the CFD code in the cloned directory before I move the new directory off the cluster onto a separate disk to avoid storage limitations on the cluster. Without the `get()` option I didn't see the folders being generated right away but as far as I understand temporarily saved on the local storage or RAM and only `get()` would start writing the folders to the target directory. – montju Oct 23 '22 at 10:45
  • @CharchitAgarwal That's what I also thought when writing the code; that the listener will only stop after the simulations have all ran through, only then the `queue.put('complete')` will be executed. – montju Oct 23 '22 at 10:52
  • @montju try creating the manager and queue inside `main`, and explicitly pass the queue to `runCase` as an argument and see if that works – Charchit Agarwal Oct 23 '22 at 11:02
  • also, assign the `apply_async` call to a variable and, after the simulations have completed (after `queue.put`) do variable.get() to check whether the listener process completed successfully or raised an error. In the later case the error won't be thrown without the `.get()` call – Charchit Agarwal Oct 23 '22 at 15:59

1 Answers1

0

First, when your with multiprocessing.Pool(processes=nProcs) as pool: exits, there will be an implicit call to pool.terminate(), which will kill all pool processes and with it any running or queued up tasks. There is no point in calling queue.put('complete') since nobody is listening.

Second, your 'listener" task gets only a single message from the queue. If is "complete", it terminates immediately. If it is something else, it just loops continuously writing the same message to the output file. This cannot be right, can it? Did you forget an additional call to queue.get() in your loop?

Third, I do not quite follow your computation for nProcs. Why the division by 4? If you had 5 physical processors nProcs would be computed as 0. Do you mean something like:

nProcs = psutil.cpu_count(logical=False) // 4
if nProcs == 0:
    nProcs = 1
elif nProcs > 1:
    nProcs -= 1 # Leave a core free

Fourth, why do you need a separate "listener" task? Have your runCase task return whatever message is appropriate according to how it completes back to the main process. In the code below, multiprocessing.pool.Pool.imap is used so that results can be processed as the tasks complete and results returned:

from PyFoam.Execution.BasicRunner import BasicRunner
from PyFoam.Execution.ParallelExecution import LAMMachine

import numpy as np
import multiprocessing
import itertools
import psutil

def runCase(tpl):
    # Unpack tuple:
    airfoil, angle, velocity = tpl

    # define simulation name
    newCase = str(airfoil) + "_" + str(angle) + "_" + str(velocity)

    ... # Code omitted for brevity

    # check if simulation has completed
    if simulation.runOK():
        return '' # No error
    # Simulation did not run successfully:
    return f"Simulation {newcase} did not run successfully"

def main():
    # Create parameter list
    angles = np.arange(-5, 0, 1)
    machs = np.array([0.15])
    nacas = ['0012']
    # There is no reason to convert this into a list; it
    # can be lazilly computed:
    paramlist = itertools.product(nacas, angles, np.round(machs, 9))

    # create number of processes and keep 1 core idle for main process

    nCores = psutil.cpu_count(logical=False) - 1
    nProc = 4
    nProcs = int(nCores / nProc)

    with multiprocessing.Pool(processes=nProcs) as pool:
        with open('errors.txt', 'w') as f:
            # Process message results as soon as the task ends.
            # Use method imap_unordered if you do not care about the order
            # of the messages in the output.
            # We can only pass a single argument using imap, so make it a tuple:
            for msg in pool.imap(runCase, zip(paramlist)):
                if msg != '': # Error completion
                    print(msg)
                    print(msg, file=f)
    pool.join() # Not really necessary here


if __name__ == '__main__':
    main()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Hi Booboo. Thanks for your feedback - really appreciate it! Yes I noticed that later that `multiprocessing.Pool` will call a terminate process when finished so that extra put statement in the listener was useless. – montju Oct 25 '22 at 22:27
  • Regarding the separate "listener" - yes totally agree that I don't need that here; didn't have much experience with `multiprocessing` before so had some misunderstandings which now cleared up. Thank you for your input. – montju Oct 25 '22 at 22:32
  • For clarification; is there a particular reason why you went for `multiprocessing.imap` rather than `multiprocessing.starmap_async`? When reading the doc they do point out that `imap` is faster and more efficient than `map` so that this also transfer to the comparison with `starmap_async`? – montju Oct 25 '22 at 22:41
  • @montju `imap` has several advantages and one disadvantage (which can be overcome). When you use `map` or `starmap` (async or otherwise) the iterable(s) being passed, if not something for which a length can be readily calculated (for example, a generator function), it must first be converted into a list. Likewise `map` and `starmap` return lists. So `imap` will be more memory efficient if you have very large input and/or results. (more...) – Booboo Oct 25 '22 at 22:48
  • @montju For example, if I had a large file where each line contained a value representing a task to be submitted, then using `map` I have to read the entire file into memory. Using `imap` I can read the file line by line in a generator that yields each value one by one. Of course, you can do the same think if you use method `apply_async`. The disadvantage of `imap` is that for very large inputs you will manually have to specify a *chunksize* argument where with `map` a suitable default will be computed from the length of the iterable (that is why `map` will need to get the length). (more...) – Booboo Oct 25 '22 at 22:51
  • @montju I used `imap`, however, primarily so that the processing the main process needs to do with returned results can overlap the generation of these results by the pool. – Booboo Oct 25 '22 at 22:52
  • I see. thanks for taking time to clarify this! Really appreciated. – montju Oct 26 '22 at 10:03