0

I am have a file and I want to process it in a parallelized manner using Python's multiprocessing class. My current code is:

class rand:
    def __init__(self):
        self.rando = "world"

def do_work2(obj, line):
    return line + obj.rando

if __name__ == "__main__":

    num_workers = cpu_count() - 2
    pool = Pool(num_workers)
    ran = rand()
    with open("sample.txt") as f:
        # chunk the work into batches of 4 lines at a time
        results = pool.starmap(do_work2, zip(ran,f), 4)

    print(results)

I expect to see all the lines in my file with a concatenated "world" in the end. However when I run this code I get:

TypeError: 'rand' object is not iterable

I get why it is happening, but I am just wondering if there is a way by which I can send class objects to a function and then use class object inside that function, all this while multiprocessing.

Can someone help me please ?

R_Moose
  • 103
  • 9
  • You could do something like `pool.starmap(do_work2, zip(itertools.repeat(ran), f), 4)`, although that's starting to look like it should be rethought. Will `obj` being passed to `do_word2` always be the same per a given call to `starmap`? If so, I'd just `functools.partial` `do_work2` with `ran` pre-supplied as the first argument, then just use `pool.map` instead of `starmap`. – Carcigenicate Oct 22 '20 at 22:07
  • Can you put this in a code ? I dont see what you mean. Sounds like partial will work. – R_Moose Oct 23 '20 at 15:21
  • Give me like half an hour. I'm just getting some cleaning done now. – Carcigenicate Oct 23 '20 at 15:38

3 Answers3

1

Yes, you can pass a class object to a multiprocessing function, but zip requires iterable arguments be passed to it. Something like this may be a little more intuitive:

args = [(ran, f) for i in range(10)]
results = pool.starmap(do_work2, args)

See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap

Keep in mind that a copy of the object is made and sent to each process

Michael
  • 2,344
  • 6
  • 12
1

As Michael notes, the error is coming about because zip expects that each of its arguments are iterable, while your rand class is not. While Chems' fix works, it needlessly takes up memory, and doesn't account for how large the file is. I'd prefer this way:

from itertools import repeat

pool.starmap(do_work2, zip(repeat(ran), f), 4)

repeat produces an infinite number of ran objects (until you quit asking for them). This means that it will produce as many rans as f has lines, without taking up memory in a separate list before being given to zip, and without needing to calculate how many lines f has.

I'd just scrap using pool.starmap and use normal pool.map though. You can wrap your function in another function, and supply ran as the first argument. There's two ways of doing this. The quick-and-dirty lambda way:

pool.map(lambda line: do_work2(ran, line), f, 4)

Or, the arguably more correct way of using partial:

from functools import partial

pool.map(partial(do_work2, obj=ran), f, 4)

See here for why you may want to prefer partial to a plain lambda.

Carcigenicate
  • 43,494
  • 9
  • 68
  • 117
0

Try this:


ran = rand()
with open("sample.txt") as f:
    # repeat the ran
    ran_lines = [ran for _ in range(len(f))]
    # chunk the work into batches of 4 lines at a time
    results = pool.starmap(do_work2, zip(rand_lines, f), 4)
Chems Eddine
  • 153
  • 6