0

I've searched probably 10 threads on multiprocessing looking but nothing seems to fit perfectly to my usecase. Here is a general idea of what I want to parallelize.

class foo():
    def boo():
        filename = 'path to the data file'
        with reader(filename) as fileReader:
            for id, feature in fileReader:
                 boo2(id, feature)
    def boo2(id, feature):
        *process feature then save the output to a folder*

Here I want to parallelize the call to boo2() where fileReader is an iterator (a sequentialMatrixReader from pykaldi) with tens of thousands of rows of id and feature where id is a string and each feature is a matrix (hundreds of row x tens of columns). boo2 will compute a smaller matrix and save the result to a folder based on id. Each call to boo2 are independent from one another so I want to parallelize it.

From my understanding I can't use multiprocessing.Pool since boo2 is a class function and I can't pull that out of the class due to it's complexity.

I don't know how to use multiprocessing.Process since the number of cores is much less than the number of rows of the iterator and I am unsure how to queue new calls to boo2 once I've start() and join() processes (I've tried to split the fileReader into n batches and set a Process per batch however I'd much prefer to queue the calls in one-line vs multiple batchs)

I've also looked into pathos module since it doesn't have problems with class functions. However from sample use-cases the closest fit to my need is:

pathos.threading.ThreadPoolpool.imap(boo2, [feature for feature in fileReader])

But because of how large fileReader is I am unable to fit [feature for feature in fileReader] in memory.

Any and all help is appreciated. Thank you.

kkawabat
  • 1,530
  • 1
  • 14
  • 37

1 Answers1

1

You won't be able to use the multiprocessing because of the class members, you need a separate function for that -- you're right about that.

Regarding using threads, I'd suggest you not using a simple comprehension [feature for feature in fileReader], but read the features from fileReader in batches according to the CPU threads you have available, then run threads, wait for the completion and then read the next batch, etc.

Something like:

def make_next_batch( fileReader ) :
    batch = []
    for feature in fileReader :
        if len(batch) == BATCH_SIZE :
            yield batch
            batch = []
        batch.append( feature )
    if len(batch) :
        yield batch

Then you have to keep only BATCH_SIZE features in memory at the same time.

lenik
  • 23,228
  • 4
  • 34
  • 43
  • Thank you for the reply @lenik. However with this implementation I am bottlenecked by the slowest process during each batch. I was hoping to have each core process asynchronously if possible. – kkawabat Oct 23 '19 at 04:34
  • then you have to do what `threading.map()` does, but manually, without loading all the data into the memory. it's not that complicated, you have the bunch of threads, check if some of them are finished and spin the new ones with the next `feature` that comes from the generator. Maybe this will help: https://stackoverflow.com/questions/3329361/python-something-like-map-that-works-on-threads – lenik Oct 23 '19 at 05:14