0

I want to change the number of workers in the pool that are currently used. My current idea is

while True:
    current_connection_number = get_connection_number()
    forced_break = False
    with mp.Pool(current_connection_number) as p:
        for data in p.imap_unordered(fun, some_infinite_generator):
            yield data
            if current_connection_number != get_connection_number():
                forced_break = True
                break
    if not forced_break:
        break

The problem is that it just terminates the workers and so the last items that were gotten from some_infinite_generator and weren't processed yet are lost. Is there some standard way of doing this?

Edit: I've tried printing inside some_infinite_generator and it turns out p.imap_unordered requests 1565 items with just 2 pool workers even before anything is processed, how do I limit the number of items requested from generator? If I use the code above and change number of connections after just 2 items, I will loose 1563 items

userqwerty1
  • 887
  • 2
  • 9
  • 23

1 Answers1

0

The problem is that the Pool will consume the generator internally in a separate thread. You have no way to control that logic.

What you can do, is feeding to the Pool.imap_unordered method a portion of the generator and get that consumed before scaling according to the available connections.

CHUNKSIZE = 100

while True:
    current_connection_number = get_connection_number()
    with mp.Pool(current_connection_number) as p:
        while current_connection_number == get_connection_number():
            for data in p.imap_unordered(fun, grouper(CHUNKSIZE, some_infinite_generator)):
                yield data

def grouper(n, iterable):
    it = iter(iterable)
    while True:
        chunk = tuple(itertools.islice(it, n))
        if not chunk:
            return
        yield chunk

It's a bit less optimal as the scaling happens every chunk instead of every iteration but with a bit of fine tuning of the CHUNKSIZE value you can easily get it right.

The grouper recipe.

noxdafox
  • 14,439
  • 4
  • 33
  • 45