1

I try to use python ProcessPoolExecutor to calculate some FFT parallel, see following code:

import concurrent.futures
import numpy as np
from scipy.fft import fft

def fuc(sig):
    C = fft(sig,axis=-1) 
    return C

def main()
    P, M, K = 20, 30, 1024
    FKP = np.array([P,M,K],dtype='cdouble')
    fkp = np.array([P,M,K],dtype='float32')
    fkp = np.random.rand(P,M,K)
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as ex:
        results = ex.map(fuc,(fkp[p,m].reshape(1,K) for p in range(P) for m in range(M)))
    FKP = list(results)

if __name__ == '__main__':
    main()

questions:

  1. why the kernel keeps busy, but I did not see 4 workers from windows task manager?
  2. do I use the right way to get parallel calculated results in line "FKP = list(results)"?
Wu-O
  • 21
  • 4

1 Answers1

0

Q1 :
" why the kernel keeps busy, but I did not see 4 workers from windows task manager? "

A1 :
Let's solve this in code itself :

import os
import time
...
def fuc( sig ):
    print( ( "INF[{0:}]: fuc() starts   "
           + "running in process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )
    C = fft( sig, axis = -1 )
    print( ( "INF[{0:}]: fuc() FFT done "
           + "running in process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )
    return C

This code will self-document, when, what, how long actually computes the FFT-part of the plan.


Q2 :
" do I use the right way to get parallel calculated results in line "FKP = list(results)"? "

A2 :
Yes, yet at a set of remarkable add-on overhead costs for each SER/COMMS/DES process-to-process border-crossing, where all data gets SER/DES coded ( pickle.dumps()-alike CPU/RAM costs in [TIME]- + [SPACE]-Domains + nonzero ipc-p2p-transfer times ) :

def Pinf():
    print( ( "NEW[{0:}]: ProcessPoolExecutor process-pool has "
           + "started process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )

def main():
    ...
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    print( ( "INF[{0:}]: context-manager"
           + 30*"_" + " entry point"
             ).format( time.get_perf_ns()
           )
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    with concurrent.futures.ProcessPoolExecutor( max_workers = 4,
                                                 initializer = Pinf
                                                 ) as ex:
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        print( ( "INF[{0:}]: context-manager"
               + " is to start .map()"
                 ).format( time.get_perf_ns()
               )
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        results = ex.map( fuc,
                          ( fkp[p,m].reshape( 1, K )
                            for p   in range( P )
                            for   m in range( M )
                            )
                          )
        ...
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        print( ( "INF[{0:}]: context-manager"
               + " .map() returned / __main__ has received all <_results_>"
                 ).format( time.get_perf_ns()
               )
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        pass
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    print( ( "INF[{0:}]: context-manager"
           + 30*"_" + " exited"
             ).format( time.get_perf_ns()
           )
    ...
    print( type( results ) )
    ...

For the actual add-on costs of each process-pool process instantiation, see the reported ns-traces. Details are platform specific as { MacOS | Linux | Windows }-methods for spawning new processes differ a lot. The same is valid for Python-versions, as more recent Py3 versions do well different scope of calling Python-interpreter process copying, than was common in Py2 and earlier versions of Py3.x - some copying the whole, stateful copy of the calling Python-interpreter, with its complete replica of data, file-descriptors and likes - bearing thus even larger process-instantiation costs, due to all associated RAM-allocations for storing the n-many replicas of the calling Python-interpreter.

Given the scaling :

>>> len( [ ( p, m ) for p in range( P ) for m in range( M ) ] )
600

efficiency matters. Passing just one tuple of ( p_start, p_end, m_start, m_end ) with indices of sub-ranges to 4 processes, where the FFT-processing of signal-sections shall take place and return sub-lists of FFT-results thereof, will avoid passing the same, static data many times in small chunks and completely avoid 596x passing the ( CPU- RAM- and latency-wise ) expensive SER/COMMS/DES-SED/COMMS/DES ipc-p2p DATA-passing corridor at all.

For more details you may like to re-read this and this.

user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    Many thanks for your detailed answers! I use Windows10, python 3.10.2. I copy above code, but it gives error (I am new in python) : Input In [10] with concurrent.futures.ProcessPoolExecutor(max_workers = 4,initializer = Pinf) as ex: ^ SyntaxError: invalid syntax – Wu-O Mar 01 '22 at 22:46