0

I used this and this to run 2 function calls in parallel, but the times are barely improving. This is my code:

Sequential:

from nltk import pos_tag

def posify(txt):
    return ' '.join([pair[1] for pair in pos_tag(txt.split())])

df1['pos'] = df1['txt'].apply(posify)  # ~15 seconds
df2['pos'] = df2['txt'].apply(posify)  # ~15 seconds
# Total Time: 30 seconds

Parallel:

from nltk import pos_tag
import multiprocessing

def posify(txt):
    return ' '.join([pair[1] for pair in pos_tag(txt.split())])

def posify_parallel(ser, key_name, shared_dict):
    shared_dict[key_name] = ser.apply(posify)

manager = multiprocessing.Manager()
return_dict = manager.dict()
p1 = multiprocessing.Process(target=posify_parallel, args=(df1['txt'], 'df1', return_dict))
p1.start()
p2 = multiprocessing.Process(target=posify_parallel, args=(df2['txt'], 'df2', return_dict))
p2.start()
p1.join(), p2.join()
df1['pos'] = return_dict['df1']
df2['pos'] = return_dict['df2']
# Total Time: 27 seconds

I would expect the total time to be about 15 seconds, but I'm getting 27 seconds.
If it makes any difference, I have an i7 2.6GHz CPU with 6 cores (12 logical).

Is it possible to achieve something around 15 seconds? Does this have something to do with the pos_tag function itself?


EDIT:

I ended up just doing the following and now it's 15 seconds:

with Pool(cpu_count()) as pool:
    df1['pos'] = pool.map(posify, df1['txt'])
    df2['pos'] = pool.map(posify, df2['txt'])

I think this way the lines run sequentially, but each of them runs in parallel internally. As long as it's 15 seconds, that's fine with me.

Alaa M.
  • 4,961
  • 10
  • 54
  • 95

1 Answers1

1

The more usual way of passing data back from processes is via a multiprocessing.Queue instance. Not knowing the particular details of your dataframe data and the results of your processing, I cannot quantify how much performance will be improved by switching from a managed dictionary, but the use of a queue should be more performant.

from nltk import pos_tag
import multiprocessing

def posify(txt):
    return ' '.join([pair[1] for pair in pos_tag(txt.split())])

def posify_parallel(ser, which_df, q):
    # Pass back the results along with which dataframe the results are for:
    q.put((which_df, ser.apply(posify)))

q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=posify_parallel, args=(df1['txt'], 1, q))
p1.start()
p2 = multiprocessing.Process(target=posify_parallel, args=(df2['txt'], 2, q))
p2.start()
# Get the results:
for _ in range(2):
    # Must do the gets before joing the processes!
    which_df, results = q.get()
    if which_df == 1:
        df1['pos'] = results
    else:
        # assert(which_df == 2)
        df2['pos'] = results
p1.join()
p2.join()

To use a multiprocessing pool:

from nltk import pos_tag
import multiprocessing

def posify(txt):
    return ' '.join([pair[1] for pair in pos_tag(txt.split())])

def posify_parallel(ser):
    return ser.apply(posify)

pool = multiprocessing.Pool(2)
results1 = pool.apply_async(posify_parallel, args=(df1['txt'],))
results2 = pool.apply_async(posify_parallel, args=(df2['txt'],))
df1['pos'] = results1.get()
df2['pos'] = results2.get()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Thanks I tried your second option and it got me 21-22 seconds – Alaa M. Nov 11 '21 at 21:14
  • 1
    So there is always going to be overhead in moving data from one address space (i.e. process) to another that you did not have in the non-parallel version, which could be an issue that only goes away when the the original data is in shared memory. I couldn't say, not being familiar with `nltk`, whether you have an additional issue. – Booboo Nov 11 '21 at 21:29