0

I have a generator that returns me a certain string, how can I use it together with this code?

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

Instead of the array that is passed above, I use a generator, since the number of values does not fit in the array, and I do not need to store them.

Additional question: In this case, the transition to the next iteration should be carried out inside the function?

Bilbab
  • 35
  • 1
  • 4

1 Answers1

1

Essentially the same question that was posed here.

The essence is that multiprocessing will convert any iterable without a __len__ method into a list.

There is an open issue to add support for generators but for now, you're SOL.

If your array is too big to fit into memory, consider reading it in in chunks, processing it, and dumping the results to disk in chunks. Without more context, I can't really provide a more concrete solution.

UPDATE:

Thanks for posting your code. My first question, is it absolutely necessary to use multiprocessing? Depending on what my_function does, you may see no benefit to using a ThreadPool as python is famously limited by the GIL so any CPU bound worker function wouldn't speed up. In this case, maybe a ProcessPool would be better. Otherwise, you are probably better off just running results = map(my_function, generator).

Of course, if you don't have the memory to load the input data, it is unlikely you will have the memory to store the results.

Secondly, you can improve your generator by using itertools

Try:

import itertools
import string

letters = string.ascii_lowercase
cod = itertools.permutations(letters, 6)

def my_function(x):
    return x

def dump_results_to_disk(results, outfile):
    with open(outfile, 'w') as fp:
        for result in results:
            fp.write(str(result) + '\n')

def process_in_chunks(generator, chunk_size=50):
    accumulator = []
    chunk_number = 1
    for item in generator:
        if len(accumulator) < chunk_size:
            accumulator.append(item)
        else:
            results = list(map(my_function, accumulator))
            dump_results_to_disk(results, "results" + str(chunk_number) + '.txt')
            chunk_number += 1
            accumulator = []
            
    dump_results_to_disk(results, "results" + str(chunk_number))

process_in_chunks(cod)

Obviously, change my_function() to whatever your worker function is and maybe you want to do something instead of dumping to disk. You can scale chunk_size to however many entries can fit in memory. If you don't have the disk space or the memory for the results, then there's really no way for you process the data in aggregate

myz540
  • 537
  • 4
  • 7
  • The problem with a large array is that I have to generate six-digit codes consisting of numbers and letters, something like: aaaaaa, aaaaab, aaaaac, ... zzzzzz ... 999999. Lists/arrays just don't have enough memory to store all this stuff. If you write everything to a file in advance, the size will be too large ~14 GB. I can't generate these codes on the fly either, it takes too much time (I use nested for loops). So I chose generators, as they allow you to generate these codes as needed, without spending memory and time. – Bilbab Jun 03 '21 at 02:48
  • You can still use generators, but have the generator return a chunked result. Can you share the code you currently have? Specifically the generator you are using? I can propose a solution in an edit. Nested for() loops are probably not the most efficient way to generate the codes you want but without seeing what you're currently doing, I can't propose any improvements – myz540 Jun 03 '21 at 02:54
  • Forgot to mention, in python3, the built-in `map()` actually returns a generator so you could, in theory, `map()` the `my_function` to the full generator and get out a generator of results, then just process the result in chunks. Would probably be a little more elegant than what I posted but also wouldn't allow for drop-in usage of `multiprocessing` – myz540 Jun 03 '21 at 03:28