1

I have a long list of user(about 200,000) and a corresponding data frame df with their attributes. Now I'd like to write a for loop to measure pair-wise similarity of the users. The code is following:

df2record = pd.DataFrame(columns=['u1', 'u2', 'sim'])
for u1 in reversed(user_list):
    for u2 in reversed(list(range(1, u1))):
        sim = measure_sim(df[u1], df[u2]))
        if sim < 0.6:
            continue
        else:
            df2record = df2record.append(pd.Series([u1, u2, sim], index=['u1', 'u2', 'sim']), ignore_index=True)

Now I wanna run this for loop with multiprocessing and I have read some tutorial. But I still have no idea to handle it properly. Seems that I should set reasonable number of processes first, like 6. And then I should feed each loop into one process. But the problem is how can I know the task in a certain process has been done so that a new loop can begin? Could you help me with this? Thanks you in advance!

user5779223
  • 1,460
  • 3
  • 21
  • 42

2 Answers2

1

You can use multiprocessing.Pool which provides method map that maps pool of processes over given iterable. Here's some example code:

def pairGen():
    for u1 in reversed(user_list):
        for u2 in reversed(list(range(1, u1))):
            yield (u1, u2)

def processFun(pair):
    u1, u2 = pair
    sim = measure_sim(df[u1], df[u2]))
    if sim < 0.6:
        return None
    else:
        return pd.Series([u1, u2, sim], index=['u1', 'u2', 'sim'])

def main():
    with multiprocessing.Pool(processes=6) as pool:
       vals = pool.map(processFun, pairGen())

    df2record = pd.DataFrame(columns=['u1', 'u2', 'sim'])
    for v in vals:
       if vals != None:
           df2record = df2record.append(v, ignore_index=True)
Marqin
  • 1,096
  • 8
  • 17
  • Thanks for your answer! But I got such an error: `TypeError: processFun() missing 1 required positional argument: 'pair'` Do you know how can I map the output properly? Thanks! – user5779223 Oct 03 '16 at 13:23
  • Strange, [this](https://gist.github.com/Marqin/292fd8b1cbbf5fbd24ba4a6935747b15) is working for me on Python 3.4.2 and 3.5.2. What version of Python do you use? – Marqin Oct 03 '16 at 13:56
  • My version is python 3.5 – user5779223 Oct 03 '16 at 15:24
  • Thanks! Sorry that I made a stupid mistake. I fixed it. But the kernel is dead. But it is not dead when with only one process, though it is quite slow. Do you have any idea about it? Thanks! – user5779223 Oct 03 '16 at 15:56
  • It's dead, because your python processes are occupying your **whole** CPU. If you want your PC to be reponsible best will be to use N-1 processes, where N is number of your CPU cores. It's put as `processes` argument to `multiprocessing.Pool` constructor. – Marqin Oct 03 '16 at 16:12
  • Hi Marqin, your answer is really helpful. Given that my computer has only 2 cores. I `ssh` to my school's server and run the code. But unluckily the python version there is `2.7` and I got such an error: `TypeError: unsupported operand type(s) for +: 'set' and 'set'` as I changed the code `with multiprocessing.Pool(processes=6) as pool:` to `with closing(Pool(processes=6)) as pool:`. Could you please take a look? Thanks a lot! – user5779223 Oct 04 '16 at 10:17
  • Hmm, maybe the `with` is causing problems? Try just: `pool = Pool(processes=6) vals = pool.map(processFun, pairGen())` Also, what is that `closing()`? – Marqin Oct 04 '16 at 10:44
  • I referenced this [answer](http://stackoverflow.com/questions/25968518/python-multiprocessing-lib-error-attributeerror-exit) – user5779223 Oct 04 '16 at 15:19
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/124943/discussion-between-marqin-and-user5779223). – Marqin Oct 04 '16 at 22:30
0

1st of all i would not recommend to use multiprocessing on such a small data. and especially when you are working with data frame. because data frame has it's own lot functionality which can help you in many ways. you just need to write proper loop.

Use: multiprocessing.Pool

just pass list of user as iterator(process_size=list_of_user) to pool.map() . you just need to create your iterator in a little tweak.

from multiprocessing import Pool
with Pool() as pool:
     pool = multiprocessing.Pool(processes=6)
     pool.map(function, iterator)