0

I am using the multiprocessing module in Python 3.7. My code is not working as expected (see this question here). Someone suggested to set maxtasksperchild, which I set to 1. Then, while reading the documentation, I figured that it was best to set the chunksize to 1 as well. This is the relevant code part:

# Parallel Entropy Calculation
# ============================
node_combinations = [(i, j) for i in g.nodes for j in g.nodes]
pool = Pool(maxtaskperchild=1)
start = datetime.datetime.now()
logging.info("Start time: %s", start)
print("Start time: ", start)
results = pool.starmap(g._log_probability_path_ij, node_combinations, chunksize=1)
end = datetime.datetime.now()
print("End time: ", end)
print("Run time: ", end - start)
logging.info("End time: %s", end)
logging.info("Total run time: %s", start)
pool.close()
pool.join()

This backfired enormously. Setting only maxtasksperchild or only chunksize got the job done in the expected time (for a smaller dataset that I am using to test the code). Setting both just wouldn't finish and nothing was really running after a few seconds (I checked with htop to see if the cores where working).

Questions

  1. Do maxtasksperchild and chunksize conflict when setting them together?

  2. Do they do the same thing? maxtasksperchild at the Pool() level and chunksize at the Pool methods level?

======================================================

EDIT

I understand that debugging may be impossible from the extract of code presented, please find the full code below. The modules graph and graphfile are just little libraries written by me available in GitHub. If you wish to run the code, you can use any of the files in the data/ directory in the mentioned GitHub repository. Short tests are better run using F2, but F1 and F3 are the ones causing trouble in the HPC.

import graphfile
import graph
from multiprocessing.pool import Pool
import datetime
import logging


def remove_i_and_f(edges):
    new_edges = dict()
    for k,v in edges.items():
        if 'i' in k:
            continue
        elif 'f' in k:
            key = (k[0],k[0])
            new_edges[key] = v
        else:
            new_edges[k] = v
    return new_edges



if __name__ == "__main__":
    import sys

    # Read data
    # =========
    graph_to_study = sys.argv[1]
    full_path = "/ComplexNetworkEntropy/"
    file = graphfile.GraphFile(full_path + "data/" + graph_to_study + ".txt")
    edges = file.read_edges_from_file()

    # logging
    # =======
    d = datetime.date.today().strftime("%Y_%m_%d")
    log_filename = full_path + "results/" + d + "_probabilities_log_" + graph_to_study + ".log"
    logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s === %(message)s')
    logging.info("Graph to study: %s", graph_to_study)
    logging.info("Date: %s", d)

    # Process data
    # ==============
    edges = remove_i_and_f(edges)
    g = graph.Graph(edges)

    # Parallel Entropy Calculation
    # ============================
    node_combinations = [(i, j) for i in g.nodes for j in g.nodes]
    pool = Pool(maxtasksperchild=1)
    start = datetime.datetime.now()
    logging.info("Start time: %s", start)
    print("Start time: ", start)
    results = pool.starmap(g._log_probability_path_ij, node_combinations, chunksize=1)
    end = datetime.datetime.now()
    print("End time: ", end)
    print("Run time: ", end - start)
    logging.info("End time: %s", end)
    logging.info("Total run time: %s", start)
    pool.close()
    pool.join()
YamiOmar88
  • 1,336
  • 1
  • 8
  • 20

1 Answers1

1

maxtasksperchild ensures a worker is restarted after a certain amount of tasks. In other words, it kills the process after it runs maxtaskperchild iteration of your given function. It is provided to contain resource leakages caused by poor implementations on long running services.

chunksize groups a given collection/iterator in multiple tasks. It then ships over the internal pipe the whole group to reduce inter-process communication (IPC) overhead. The collection elements will still be processed 1 by 1. chunksize is useful if you have a large collection of small elements and the IPC overhead is significant in relation to the processing of the elements themselves. One side effect is that the same process will process a whole chunk.

Setting both parameters to 1 dramatically increases process rotation and IPC which are both quite resource-heavy especially on machines with high number of cores.

noxdafox
  • 14,439
  • 4
  • 33
  • 45
  • Thank you. I understand what `maxtasksperchild` and `chunksize` do. What I don't understand is why, at a certain point, if these were set to 1, the program is not progressing. Nothing happens. No core is running anything. No error is raised. – YamiOmar88 Feb 12 '20 at 12:15
  • Please read SO [recommendations](https://stackoverflow.com/help/how-to-ask) on how to ask questions. Your question is asking about the meaning and interaction of the two parameters. You are neither asking the right question nor providing enough meaningful information to help debug the root cause. – noxdafox Feb 12 '20 at 12:25
  • What is `g._log_probability_path_ij` doing? What modules are employed within that function? Does the documentation of such module mention anything about concurrency? Have you tried to run the logic sequentially? What OS are you using? – noxdafox Feb 12 '20 at 12:27
  • Thank you for your comment. I have edited my question to make all code available. – YamiOmar88 Feb 13 '20 at 08:46