0

I am passing the key and value of a dictionary for parallel processing

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": data_preprocess.dataset_1,
        "Dataset_2": data_preprocess.dataset_2,}
    pool = mp.Pool(8)
    pool.starmap(main, zip(DATASETS.keys(), DATASETS.values()))
    pool.close()
# As I am not joining any result and I am directly saving the output 
# in CSV file from  (main function) I did not used pool.join()

The main function

def main(dataset_name, generate_dataset):
    REGRESSORS = {
        "LinReg": LinearRegression(),
        "Lasso": Lasso(),}
    ROOT = Path(__file__).resolve().parent
    dataset_name = dataset_name
    generate_dataset = generate_dataset
    dfs = []
    for reg_name, regressor in REGRESSORS.items():
        df = function_calling(
            generate_dataset=generate_dataset,
            regressor=regressor,
            reg_name=reg_name,)
        print(df)
        dfs.append(df)
    df = pd.concat(dfs, axis=0, ignore_index=True)
    filename = dataset_name + "_result.csv"
    outfile = str(PATH) + "/" + filename
    df.to_csv(outfile)

I am getting an error AssertionError: daemonic processes are not allowed to have children. Could you tell me why I am getting the error? How can I resolve this?

Opps_0
  • 408
  • 4
  • 19
  • This is not a [minimal, reproducible example](https://stackoverflow.com/help/minimal-reproducible-example) and you also appear to have indentation problems. But is it possible that `function_calling` is creating processes of its own? – Booboo Jul 08 '21 at 19:17
  • @Booboo Yes you are right. Inside `function_calling` I have used `*process_map` and `repeat`. Would you mind to have a look at my this question (https://stackoverflow.com/questions/68305077/how-to-run-a-script-in-parallel-for-different-datasets?noredirect=1#comment120719470_68305077) – Opps_0 Jul 08 '21 at 19:20
  • @Booboo One more thing, not only `function_calling` I have another loop that also create process. Is there no any simple solution to run the whole program differently for different datasets and saving the result? – Opps_0 Jul 08 '21 at 19:22
  • You can look at https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic as a possible solution for creating your multiprocessing pool in such a way that its processes are not daemon processes. You should then explicitly call `pool.terminate()` . Alternatively, if you know you don't have significantly more tasks to submit (key/value pairs) than CPU cores, you could just create individual Process instances instead of using a pool. I could demonstrate this. – Booboo Jul 08 '21 at 19:30
  • I would be grateful, if you could demonstrate. Thank you – Opps_0 Jul 08 '21 at 19:33

1 Answers1

1

To just create your own Process instances:

import multiprocessing as mp

def main(dataset_name, generate_dataset):
    print(dataset_name, generate_dataset, flush=True)
    ... # etc.

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": 1,
        "Dataset_2": 2,}
    processes = [mp.Process(target=main, args=(k, v)) for k, v in DATASETS.items()]
    for process in processes:
        process.start()
    # wait for termination:
    for process in processes:
        process.join

Prints:

Dataset_1 1
Dataset_2 2

The issue is suppose you have 8 CPU cores and DATASETS had 100 key/value pairs. You would be creating 100 processes. Assuming these processes were CPU-intensive, you could not expect more than 8 of them to really be doing anything productive. Yet you incurred the CPU and storage overhead of having created all those processes. But as long as the number of processes you will be creating are not excessively greater than the number of CPU cores you have and your function main does not need to return a value back to your main process, this should be OK.

There is also a way of implementing your own multiprocessing pool with these Process instances and a Queue instance, but that's a bit more complicated:

import multiprocessing as mp

def main(dataset_name, generate_dataset):

    print(dataset_name, generate_dataset, flush=True)
    ... # etc.

def worker(queue):
    while True:
        arg = queue.get()
        if arg is None:
            # signal to terminate
            break
        # unpack
        dataset_name, generate_dataset = arg
        main(dataset_name, generate_dataset)

if __name__ == "__main__":
    DATASETS = {
        "Dataset_1": 1,
        "Dataset_2": 2,}
    queue = mp.Queue()
    items = list(DATASETS.items())
    for k, v in items:
        # put the arguments on the queue
        queue.put((k, v))
    # number of processors we will be using:
    n_processors = min(mp.cpu_count(), len(items))
    for _ in range(n_processors):
        # special value to tell main there is no nore work: one for each task
        queue.put(None)
    processes = [mp.Process(target=worker, args=(queue,)) for _ in range(n_processors)]
    for process in processes:
        process.start()
    for process in processes:
        process.join
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • I am very grateful to you. This worked. I spent whole day and was getting error! However, still I have 6 datasets (so 6 key/value). I am also assuming I can also run this code in the Cluster. Right? – Opps_0 Jul 08 '21 at 20:09
  • Out of curiosity, dataset_1 and dataset_2 they are running parallel, right? But I am getting the output for `dataset_1` and then `dataset_2`. It seems sequentially. – Opps_0 Jul 08 '21 at 20:27
  • Assuming you are running the first code version, the two processes are running in parallel if you have at least two cores available to you (unfortunately, I have no experience with cluster computing). What do you mean "sequentially"? You can print out start and stop times in `main` to see what is going on. How many cores do you have allocated? – Booboo Jul 08 '21 at 20:53
  • Yes, I am running first version of the answer. In my personal machine I have 32 cores. – Opps_0 Jul 08 '21 at 21:24