2

I have a simple script in which a function does some calculation on a Pandas.Series object which I want to parallel process. I have made the Pandas.Series object as a shared memory object so that different processess can use it.

My code is given below.

from multiprocessing import shared_memory
import pandas as pd
import numpy as np
import multiprocessing

s = pd.Series(np.random.randn(50))
s = s.to_numpy()

# Create a shared memory variable shm which can be accessed by other processes
shm_s = shared_memory.SharedMemory(create=True, size=s.nbytes)
b = np.ndarray(s.shape, dtype=s.dtype, buffer=shm_s.buf)
b[:] = s[:]

# create a dictionary to store the results and which can be accessed after the processes works
mgr = multiprocessing.Manager()
pred_sales_all = mgr.dict()

forecast_period =1000

# my sudo function to run parallel process
def predict_model(b,model_list_str,forecast_period,pred_sales_all):
    c = pd.Series(b)
    temp_add = model_list_str + forecast_period
    temp_series = c.add(model_list_str)
    pred_sales_all[str(temp_add)] = temp_series


    # parallel processing with shared memory
if __name__ == '__main__':
    model_list = [1, 2, 3, 4]

    all_process = []
    for model_list_str in model_list:
        # setup a process to run
        process = multiprocessing.Process(target=predict_model, args=(b,model_list_str, forecast_period, pred_sales_all))
        # start the process we need to join() them separately else they will finish execution before moving to next process
        process.start()
        # Append all process together
        all_process.append(process)

    # Finish execution of all process

    for p in all_process:
        p.join()

This code is working in ubuntu I checked. But when I run this in windows I am getting the following error.

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
        

Also I tried the solution mentioned here in the stack-overflow question

What is wrong with the code and can someone solve the issue ? Is my parallelization code wrong ?

Danish Xavier
  • 1,225
  • 1
  • 10
  • 21
  • Defining methods is fine, but you really don't want to do any processing outside of `if __name__ == '__main__':` when multiprcessing. – JonSG Sep 04 '21 at 14:31
  • I am running multiprocessing inside the `if __name__=='__main__':` only. – Danish Xavier Sep 04 '21 at 14:44
  • First, `mgr = multiprocessing.Manager()` creates a process so if you are running under Windows, that statement needs to be moved to inside the `if __name__ == '__main__':` block. Also, `buffer = s` will clobber the previous assignment to `buffer`. You have other statements at global scope that should also be moved. I don't really think you are sharing anything. – Booboo Sep 04 '21 at 14:46
  • will s = `s.to_numpy()` `shm_s = shared_memory.SharedMemory(create=True, size=s.nbytes)` `b=np.ndarray(s.shape,dtype=s.dtype,buffer=shm_s.buf)` `b[:]=a[:]`. Then passing `b` in the args of the pocess solve my shared memory issue ? – Danish Xavier Sep 04 '21 at 18:12
  • @Booboo Can you check my code, Is it correct i am copying the data to `b` and passing b. Is it sharable now ? I am getting the output though. – Danish Xavier Sep 04 '21 at 18:22
  • "But when I run this in windows I am getting the following error." It looks like there are a couple of paragraphs of explanation there. In particular, there is a bit that reads: `This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module`. You seem to be asking why the error occurs; why exactly do you not find the part I quoted to be a sufficient answer to the question? – Karl Knechtel Sep 04 '21 at 18:23
  • "Also I tried the solution mentioned here in the stack-overflow question" Okay, and *what happened* when you tried that? How exactly did you apply that advice (what did your code end up as instead), what happened when you ran that code, and how is that different from what is supposed to happen? – Karl Knechtel Sep 04 '21 at 18:24
  • In Windows, python has to reimport modules to get the code into the created process memory. Everything defined at module level executes in each process. The assignments to s, shm_s, b, and etc... happen again in the created process. – tdelaney Sep 04 '21 at 18:26
  • @KarlKnechtel I have added the ` if __name__=='__main__':` statement and called the multiprocessing from there. for the former question since I am executing parallel processing under the If statement. Why am I still getting the same error. – Danish Xavier Sep 04 '21 at 18:42
  • @DanishXavier You still haven't moved `mgr = multiprocessing.Manager()`. So aren't you still getting the same exception? And when you have a statement such as `shm_s = shared_memory.SharedMemory(create=True, size=s.nbytes` at global scope, each subprocess you create will be executing that statement. You need to move all the statements that create the dataframe and shared memory to inside the `if __name__ == '__main__':` block also. – Booboo Sep 04 '21 at 21:05

1 Answers1

1

See my comments about moving statements at global scope to within the if __name__ == '__main__': block. Otherwise they will be executed by each subprocess as part of their initialization and there is no point in that. Moreover, the statement mgr = multiprocessing.Manager() has to be moved because this results in the creation of a new process.

from multiprocessing import shared_memory
import pandas as pd
import numpy as np
import multiprocessing

# my sudo function to run parallel process
def predict_model(b,model_list_str,forecast_period,pred_sales_all):
    c = pd.Series(b)
    temp_add = model_list_str + forecast_period
    temp_series = c.add(model_list_str)
    pred_sales_all[str(temp_add)] = temp_series


    # parallel processing with shared memory
if __name__ == '__main__':
    forecast_period =1000
    s = pd.Series(np.random.randn(50))
    s = s.to_numpy()

    # Create a shared memory variable shm which can be accessed by other processes
    shm_s = shared_memory.SharedMemory(create=True, size=s.nbytes)
    b = np.ndarray(s.shape, dtype=s.dtype, buffer=shm_s.buf)
    b[:] = s[:]

    # create a dictionary to store the results and which can be accessed after the processes works
    mgr = multiprocessing.Manager()
    pred_sales_all = mgr.dict()

    model_list = [1, 2, 3, 4]

    all_process = []
    for model_list_str in model_list:
        # setup a process to run
        process = multiprocessing.Process(target=predict_model, args=(b,model_list_str, forecast_period, pred_sales_all))
        # start the process we need to join() them separately else they will finish execution before moving to next process
        process.start()
        # Append all process together
        all_process.append(process)

    # Finish execution of all process

    for p in all_process:
        p.join()

    print(pred_sales_all)

Prints:

{'1004': 0     4.084857
1     2.871219
2     5.644114
3     2.146666
4     3.395946
5     3.362894
6     2.366361
7     3.209334
8     4.226132
9     3.158135
10    4.090616
11    5.299314
12    3.155669
13    5.602719
14    3.107825
15    1.809457
16    3.938050
17    1.144159
18    3.286502
19    4.302809
20    3.917498
21    5.055629
22    2.230594
23    3.255307
24    2.459930
25    3.591691
26    3.248188
27    3.635262
28    4.547589
29    4.883547
30    2.635874
31    5.551306
32    2.434944
33    5.358516
34    4.606322
35    5.383417
36    2.886735
37    4.267562
38    2.053871
39    3.863734
40    3.233764
41    4.089593
42    4.754793
43    4.125400
44    2.174840
45    7.207996
46    2.925736
47    4.604850
48    4.067672
49    4.397330
dtype: float64, '1001': 0     1.084857
1    -0.128781
2     2.644114
3    -0.853334
4     0.395946
5     0.362894
6    -0.633639
7     0.209334
8     1.226132
9     0.158135
10    1.090616
11    2.299314
12    0.155669
13    2.602719
14    0.107825
15   -1.190543
16    0.938050
17   -1.855841
18    0.286502
19    1.302809
20    0.917498
21    2.055629
22   -0.769406
23    0.255307
24   -0.540070
25    0.591691
26    0.248188
27    0.635262
28    1.547589
29    1.883547
30   -0.364126
31    2.551306
32   -0.565056
33    2.358516
34    1.606322
35    2.383417
36   -0.113265
37    1.267562
38   -0.946129
39    0.863734
40    0.233764
41    1.089593
42    1.754793
43    1.125400
44   -0.825160
45    4.207996
46   -0.074264
47    1.604850
48    1.067672
49    1.397330
dtype: float64, '1002': 0     2.084857
1     0.871219
2     3.644114
3     0.146666
4     1.395946
5     1.362894
6     0.366361
7     1.209334
8     2.226132
9     1.158135
10    2.090616
11    3.299314
12    1.155669
13    3.602719
14    1.107825
15   -0.190543
16    1.938050
17   -0.855841
18    1.286502
19    2.302809
20    1.917498
21    3.055629
22    0.230594
23    1.255307
24    0.459930
25    1.591691
26    1.248188
27    1.635262
28    2.547589
29    2.883547
30    0.635874
31    3.551306
32    0.434944
33    3.358516
34    2.606322
35    3.383417
36    0.886735
37    2.267562
38    0.053871
39    1.863734
40    1.233764
41    2.089593
42    2.754793
43    2.125400
44    0.174840
45    5.207996
46    0.925736
47    2.604850
48    2.067672
49    2.397330
dtype: float64, '1003': 0     3.084857
1     1.871219
2     4.644114
3     1.146666
4     2.395946
5     2.362894
6     1.366361
7     2.209334
8     3.226132
9     2.158135
10    3.090616
11    4.299314
12    2.155669
13    4.602719
14    2.107825
15    0.809457
16    2.938050
17    0.144159
18    2.286502
19    3.302809
20    2.917498
21    4.055629
22    1.230594
23    2.255307
24    1.459930
25    2.591691
26    2.248188
27    2.635262
28    3.547589
29    3.883547
30    1.635874
31    4.551306
32    1.434944
33    4.358516
34    3.606322
35    4.383417
36    1.886735
37    3.267562
38    1.053871
39    2.863734
40    2.233764
41    3.089593
42    3.754793
43    3.125400
44    1.174840
45    6.207996
46    1.925736
47    3.604850
48    3.067672
49    3.397330
dtype: float64}
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Apologies for the intrusion, Can you help me with a [variation of your previous solution](https://stackoverflow.com/questions/69055834/python-indexerror-list-index-out-of-range-after-modifying-code) please –  Sep 05 '21 at 11:05