4

I am trying to make a very simple item recommender system using # of times items bought together,

so first I created a item2item dictionary of Counter like

# people purchased A with B 4 times, A with C 3 times.
item2item = {'A': {'B': 4, 'C': 3}, 'B': {'A': 4, 'C': 2}, 'C':{'A': 3, 'B': 2}}
# recommend user who purchased A and C
samples_list = [['A', 'C'], ...]    

So for, samples = ['A', 'C'], I recommend maximum of item2item['A'] + item2item['C'].

However, merging is heavy for large matrix so I tried to use multi-processing as below

from operator import add
from functools import reduce
from concurrent.futures import ProcessPoolExecutor
from collections import Counter

with ProcessPoolExecutor(max_workers=10) as pool:
    for samples in samples_list:
        # w/o PoolExecutor
        # combined = reduce(add, [item2item[s] for s in samples], Counter())
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        combined = future.result()

However, this didn't speed up the process at all.

I suspect that Counter in reduce function is not shared as answered in Python multiprocessing and a shared counter, and https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes.

Any help is appreciated.

Sean
  • 489
  • 1
  • 8
  • 29

1 Answers1

6

The call combined = future.result() blocks until the result is completed so you are not submitting a subsequent request to the pool until the previous request completes. In other words, you never have more than one subprocess running. At the very least you should change your code to:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = []
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures.append(future) # save it
    results = [f.result() for f in the_futures()] # all the results

Another way:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = []
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures.append(future) # save it
    # you need: from concurrent.futures import as_completed
    for future in as_completed(the_futures): # not necessarily the order of submission
        result = future.result() # do something with this

Also, if you do not specify max_workers to the ProcessPoolExecutor constructor, it defaults to the number of processors on your machine. There is nothing to be gained by specifying a value greater than the number of processors that you actually have.

Update

If you want to process the results as soon as they are completed and need a way to tie a result back to the original request, you can store the futures as keys in a dictionary where the corresponding values represent the requests' arguments. In this case:

with ProcessPoolExecutor(max_workers=10) as pool:
    the_futures = {}
    for samples in tqdm(sample_list):
        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())
        the_futures[future] = samples # map future to request
    # you need: from concurrent.futures import as_completed
    for future in as_completed(the_futures): # not necessarily the order of submission
        samples = the_futures[future] # the request
        result = future.result() # the result
Booboo
  • 38,656
  • 3
  • 37
  • 60