2

I've experienced some difficulties when using multiprocessing Pool in python3. I want to do BIG array calculation by using pool.map. Basically, I've a 3D array which I need to do computation for 10 times and it generates 10 output files sequentially. This task can be done 3 times i,e, in the output we get 3*10=30 output files(*.txt). To do this, I've prepared the following script for small array calculation (a sample problem). However, when I use this script for a BIG array calculation or array come out from a series of files, then this piece of code (maybe pool) capture the memory, and it does not save any .txt file at the destination directory. There is no error message when I run the file with command mpirun python3 sample_prob_func.py Can anybody suggest what is the problem in the sample script and how to write code to get rid of stuck? I've not received any error message, but don't know where the problem occurs. Any help is appreciated. Thanks!

import numpy as np
import multiprocessing as mp
from scipy import signal
import matplotlib.pyplot as plt
import contextlib
import os, glob, re
import random
import cmath, math
import time
import pdb

#File Storing path
save_results_to = 'File saving path'

arr_x = [0, 8.49, 0.0, -8.49, -12.0, -8.49, -0.0, 8.49, 12.0]
arr_y = [0, 8.49, 12.0, 8.49, 0.0, -8.49, -12.0, -8.49, -0.0]
N=len(arr_x)

np.random.seed(12345)
total_rows = 5000
arr = np.reshape(np.random.rand(total_rows*N),(total_rows, N))
arr1 = np.reshape(np.random.rand(total_rows*N),(total_rows, N))
arr2 = np.reshape(np.random.rand(total_rows*N),(total_rows, N))

# Finding cross spectral density (CSD)
def my_func1(data):
    # Do something here
    return  array1


t0 = time.time()
my_data1 = my_func1(arr)
my_data2 = my_func1(arr1)
my_data3 = my_func1(arr2)

print('Time required {} seconds to execute CSD--For loop'.format(time.time()-t0))
mydata_list  = [my_data1,my_data3,my_data3]


def my_func2(data2):
    # Do something here
    return from_data2



start_freq = 100
stop_freq  = 110
freq_range= np.around(np.linspace(start_freq,stop_freq,11)/10, decimals=2)
no_of_freq = len(freq_range)

list_arr =[]

def my_func3(csd):
    list_csd=[]
    for fr_count in range(start_freq, stop_freq):
        csd_single = csd[:,:, fr_count]
        list_csd.append(csd_single)
    print('Shape of list is :', np.array(list_csd).shape)
    return list_csd

def parallel_function(BIG_list_data):
    with contextlib.closing(mp.Pool(processes=10)) as pool:
       dft= pool.map(my_func2, BIG_list_data)
       pool.close()
       pool.join()
    data_arr = np.array(dft)
    print('shape of data :', data_arr.shape)
    return data_arr

count_day = 1
count_hour =0
for count in range(3):
    count_hour +=1
    list_arr = my_func3(mydata_list[count])  # Load Numpy files
    print('Array shape is :', np.array(arr).shape)
    t0 = time.time()
    data_dft = parallel_function(list_arr)
    print('The hour number={} data is processing... '.format(count_hour))
    print('Time in parallel:', time.time() - t0)
    for i in range(no_of_freq-1): # (11-1=10)
        jj = freq_range[i]
        #print('The hour_number {} and frequency number {} data is processing... '.format(count_hour, jj))
        dft_1hr_complx = data_dft[i,:,:]
        np.savetxt(save_results_to + f'csd_Day_{count_day}_Hour_{count_hour}_f_{jj}_hz.txt',  dft_1hr_complx.view(float))
CEB
  • 106
  • 1
  • 11
  • 2
    you can have a try with `if __name__ == '__main__':` this and if you wish check the link https://stackoverflow.com/questions/27065237/attributeerror-pool-object-has-no-attribute-exit – pluto Jun 18 '22 at 18:40
  • 1
    On HPC machines, processes and threads are often bound to cores by the job scheduler. If you create processes at runtime like you do and the scheduler is not aware of that, then there is a high risk for all process in the same node to be bounded on the same core resulting in a code a bit slower than a sequential one. To test this hypothesis, you need to check where processes are executed over time on a node. Executing a top should be enough to check that I guess. Alternatively, hwloc-ls may help you too. – Jérôme Richard Jun 19 '22 at 09:14
  • @JérômeRichard, thanks for your expert opinion. You are right, at a specific node, all the CPUs are bounded in the same core and run as serial process not the parallel. `ncpus = int(os.getenv('SLURM_CPUS_PER_TASK', 1))` with the command now, my code get the `SLURM` path and run as usual. – CEB Jun 26 '22 at 10:16

1 Answers1

2

As @JérômeRichard suggested,to aware your job scheduler you need to define the number of processors will engage to perform this task. So, the following command could help you: ncpus = int(os.getenv('SLURM_CPUS_PER_TASK', 1))

You need to use this line inside your python script. Also, inside the parallel_function use with contextlib.closing(mp.Pool(ncpus=10)) as pool: instead of with contextlib.closing(mp.Pool(processes=10)) as pool:. Thanks

pluto
  • 88
  • 8