4

I have the following function that randomly shuffle the values of one column of the dataframe and use RandomForestClassifier on the overall dataframe including that column that is being randomly shuffled to get the accuracy score.

And I would like to run this function concurrently to each column of the dataframe, as dataframe is pretty large and contains 500k rows and 1k columns. The key is to only randomly shuffle one column at a time.

However, I am struggling to understand why is ProcessPoolExecutor much slower than ThreadPoolExecutor. I thought ThreadPoolExecutor is only suppose to be faster for I/O task. In this case, it doesn't involve reading from or writing to any files.

Or have I done anything wrong here ? Is there a more efficient or better way to optimize this code to make it do things concurrently and run faster?

def randomShuffle(colname, X, y, fit):
    out = {'col_name': colname}
    X_= X.copy(deep = True)
    np.random.shuffle(X_[colname].values) # permutation of a single column
    pred = fit.predict(X_)
    out['scr'] = accuracy_score(y, pred)
    return out

def runConcurrent(classifier, X,y):
    skf = KFold(n_splits=5, shuffle = False)
    acc_scr0, acc_scr1 = pd.Series(), pd.DataFrame(columns = X.columns)
    # split data to training and validation
    for i, (train_idx, val_idx) in enumerate(skf.split(X,y)):
        X_train, y_train = X.iloc[train_idx,:], y.iloc[train_idx]
        X_val, y_val = X.iloc[val_idx,:], y.iloc[val_idx]
        
        fit = classifier.fit(X=X_train, y=y_train)
        # accuracy score
        pred = fit.predict(X_val)
        acc_scr0.loc[i] = accuracy_score(y_val, pred)
        
        # with concurrent.futures.ProcessPoolExecutor() as executor:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = [executor.submit(randomShuffle, colname = j, X= X_val, y= y_val, fit = fit, labels = classifier.classes_) for j in X.columns]
            for res in concurrent.futures.as_completed(results):
                acc_scr1.loc[i, res.result()['col_name']] = res.result()['acc_scr']
    return None
user1769197
  • 2,132
  • 5
  • 18
  • 32
  • 1
    [this has been asked once or twice before](https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python#:~:text=The%20difference%20is%20that%20threads,while%20processes%20have%20separate%20memory.) – sglmr Jun 26 '23 at 03:13
  • i don't think my function is sharing any objects between threads. And is my function is doing any I/O work ? – user1769197 Jun 26 '23 at 03:39
  • Processes can be slower than threads if the amount of data to pickle is large relative to the amount of computation. – Nick ODell Jun 26 '23 at 05:15

2 Answers2

2

It is hard to see without testing since the speed of multiprocessing is dependent on a lot of things. First the communication overhead, so if you need to send around a lot of data it is slow, but also the amount of tasks created is important.

Creating a task has quite some overhead and has to be seen in relation to how long a method called takes to return. If a method only takes a fraction of a second to finish and you call it thousand of times, the overhead of creating a task is significant. If, on the other hand, the function takes like multiple seconds to return, the overhead is negligible.

I can't really tell how fast the randomShuffle is, but what you can do and see if it speeds up anything using the map function and a setting a chunksize.

from functools import partial

...

with ProcessPoolExecutor() as executor:
    chunksize = len(points) // (executor._max_workers * 4)
    randomShuffleWrapper = partial(randomShuffle, X=X_val, y=y_val, fit=fit, labels=classifier.classes_)
    results = list(executor.map(randomShuffleWrapper, X.columns, chunksize=chunksize))

The only thing which changes in all calls to randomShuffle is the colname. So create a partial function to set all other parameters and your new function only takes the colname as the first argument. Now we also have to set an appropriate chunksize.

This is a bit of a hyper parameter and really there is no general good value and you maybe need to try different ones to find the best. It creates chunks of your iterable and wraps your function, so that one tasks calculates the outputs for all entries in a chunk.

So if you have 1000 entries and a chunksize of 100, only 10 tasks a created, every task is calculating 100 entries. This will lead to way less overhead from creating and finishing a task.

As a starting point, I use multiprocessing.pool.Pool if chunksize isn't given. ProcessPoolExecutor.map() sets the chunksize to 1 as a default, which basically ends up in what you are already doing, creating a task for every element.

I don't have any idea how big all the things you are passing to the function are. Namely X=X_val, y=y_val, fit=fit, labels=classifier.classes_. If they are big, there will be a lot of communication overhead, since all will always be serialized sent over and deserialized. So also check if they are big and if they have to be. You normally want to only send what is absolutely necessary, and the same with the return of the function. It also should be as small as possible.

This is why you propose using chunksize to chop things up. Is my understanding correct?

...

One other question: say I split the column names into 4 chunks, does it mean 4 processes will be created for these 4 chunks? And for each chunk, how are the data being processed? i.e., for loop or multiprocess / multithread?

So maybe I can explain a bit more what the chunksize actually does, since it is actually quite simple and can be seen directly in the code. I am going to reference code found in Anaconda Python 3.9 python3.9/concurrent/futures/process.py.

It has the following line of code for the ProcessPoolExecutor class.

class ProcessPoolExecutor(_base.Executor):
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), timeout=timeout)

The _get_chunks just divides the iterables into equal parts of size chunksize and possibly a smaller part if the length of the iterables is not dividable by chunksize.

partial(_process_chunk, fn) creates a partial function of _process_chunk, which looks like this:

def _process_chunk(fn, chunk):
    return [fn(*args) for args in chunk]

So all it does is to iterate over every element in a chunk and call a function, in your case randomShuffle. So it just means one task does not consist of one call to your randomShuffle but chunksize many calls. All results are collected in a list and later combined.

The super().map()call means the map function from the parent class Executor is used:

class Executor(object)
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
        ...

As you can see, at this point, also only the submit function is called for all iterables. At this point, the fn is the partial function created earlier partial(_process_chunk, fn) and iterables is what _get_chunks(*iterables, chunksize=chunksize) returned (the equally-sized chunks of the original iterables). So all the map function of the ProcessPoolExecutor does is wrap your function and divide your iterables into chunks for you, before the submit is called.

All is done with the goal to reduce the number of tasks created (submit calls) by having tasks do more, in this case calling a given function for every element of some iterables.

So how do tasks actually map to processes? By creating a ProcessPoolExecutor, you create a pool of processes. The number is defined by the number of your cores on your system or it is what you define via the max_workers argument.

When the submit is called, a worker is assigned to it, so the worker receives all data necessary to run the function and returns the output of the function to the main process. This data transfer is done by serializing and deserializing the data, usually with the pickle module. This is also where a lot of overhead comes from, since transferring data between processes is slow.

So if you created ProcessPoolExecutor with max_workers=10, you can have 10 tasks executed in parallel in theory (if you got 10 cores of course). The abstraction of the pool and tasks is so you do not have to worry what tasks runs where. You just submit all that has to be done and let the ProcessPoolExecutor figure out how to best assign tasks to processes.

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
Nopileos
  • 1,976
  • 7
  • 17
  • 1
    "This is in generally not a good thing to do if you have a lot of entries in your iterable. " the relevant factor is probably the amount of work per task rather than the number of tasks: if each task takes minutes, you can schedule them individually, the IPC will not be much of a factor. If each task takes a fraction of a second however IPC overhead will start showing up, and may overwhelm the actual runtime. – Masklinn Jun 26 '23 at 09:17
  • @Masklinn updated my answer. You are right it always has to be seen in relation. I kind assumed that the `randomShuffle`is fast but actually with the `fit.predict(X_)` it can be quite long, especially if there is a lot of data. – Nopileos Jun 26 '23 at 09:36
  • Thanks for the answer. The reason to switch back to `ProcessPoolExecutor` is because my function really isn't dealing with any I/O tasks but it's the overhead cost of spawning many processes (1 per column, and i have 1k columns) that is slowing the entire thing down. This is why you propose using `chunksize` to chop things up. Is my understanding correct ? – user1769197 Jun 27 '23 at 02:49
  • One other question: say i split the column names into 4 chunks, does it mean 4 processes will be created for these 4 chunks ? And for each chunk , how are the data being processed ? i.e. for loop or multiprocess / multithread ? – user1769197 Jun 27 '23 at 03:04
  • @user1769197 I tried to answer your questions in my answer, see the update. If you think my answer solved your problem please also accept it. – Nopileos Jun 27 '23 at 07:08
  • Thank you very much for the update. Yes so basically it's try to create batches. And within each batch, a for loop is being used. – user1769197 Jun 27 '23 at 07:32
  • One last question: if i change `executor.map` to `executor.submit` , would it result in any reduction in performance ? – user1769197 Jun 27 '23 at 08:24
  • It depends on all the different factors mentioned but in general manually calling submit for each element of e.g. a list is slower if you got a lot of elements in the list. In general if you can use map use it. It just does everything for you, from doing the whole chunk logic, to calling the submit and handling the future objects and having a timeout logic if one needs it. It returns a generator which you can convert into a list and then you got all results nicely in a list. No reason to build it yourself if there is a function that does all you need. – Nopileos Jun 27 '23 at 08:53
  • after some testing, i discovered a problem with your solution. What I originally had is that only a single column will be shuffled at a one. Your solution shuffled multiple columns at once, which is not what i intend to do. – user1769197 Jul 07 '23 at 16:38
  • Nonetheless, i still greatly appreciate your explanation. – user1769197 Jul 08 '23 at 11:17
-2

The performance difference between ProcessPoolExecutor and ThreadPoolExecutor can be attributed to the Global Interpreter Lock (GIL) in Python. The GIL allows only one thread to execute Python bytecode at a time, even on multi-core systems. This means that in CPU-bound tasks like training machine learning models, using multiple processes with ProcessPoolExecutor can actually slow down the execution due to the overhead of inter-process communication.

On the other hand, ThreadPoolExecutor can be more efficient since it utilizes multiple threads within a single process, allowing for better utilization of CPU resources. This is especially true when the tasks involve I/O operations or blocking calls.

In your case, since you are using scikit-learn's RandomForestClassifier for training the model, which is CPU-bound, using ThreadPoolExecutor is a better choice. The overhead of inter-process communication in ProcessPoolExecutor can outweigh the benefits of parallel execution.

To optimize the code further, you can consider the following suggestions:

  1. Use a smaller number of folds in KFold to reduce the number of iterations.

  2. Instead of creating a new fit object for each column, you can create it once outside the loop and pass it as an argument to the randomShuffle function.

  3. Consider using a more efficient algorithm or model that can handle large datasets more effectively, such as gradient boosting algorithms.

  • 2
    I suspect this answer is ChatGPT (i.e. it sort of looks like it, and you've posted at least one other answer that is convincingly ChatGPT https://stackoverflow.com/a/76674730/) – DavidW Jul 14 '23 at 09:41
  • I wrote this on my own with the knowledge i have. Aa newbie, i also didn't know that any kind of AI help is not accepted. I wrote things on my own based on google and other sites and then give the answers to chatgpt so that it corrects and organize my poor writing and arrangement. Sorry, as it seems nonsense. I was also not aware of that if someone don't like the explaination they downvote and criticise things. I posted 3 comments and people were so harsh. Sorry, if contaminate the environment. Strong apology. Due to downvote, i am unable to post comments. I am sad and feeling really stupid. – Abdullah Shafi Jul 14 '23 at 17:07