TL;DR: A sequential classifier is fine, since python map()
is also sequential.
Besides, multiprocessing.Pool.map
may be an overkill, since it introduces inter-process communication overhead. Use this if you're sure that data
is extremely long.
Here is my profile code:
sequential
: an oneliner sequential classifier
multi
: process based map.
from funcy import print_durations
from collections import defaultdict
from multiprocessing import Pool
DATA = [
{
'name':"Ram",
'vendor':1,
},
{
'name':"Shaam",
'vendor':2
},
{
'name':"Mohan",
'vendor':1
},
{
'name':"Sohan",
'vendor':3
},
{
'name':"Aman",
'vendor':2
}
]
def sequential(data):
ans = defaultdict(lambda: [])
for d in data:
ans[d["vendor"]].append(d)
return dict(ans)
def multi(data, worker=10):
def k_fold(myList, N):
# Ref: https://stackoverflow.com/questions/2130016/splitting-a-list-into-n-parts-of-approximately-equal-length
return [myList[(i*len(myList))//N:((i+1)*len(myList))//N] for i in range(N)]
def merge_all(dicts):
ans = defaultdict(lambda: [])
for d in dicts:
for k, v in d.items():
ans[k].extend(v)
return ans
data = k_fold(data, worker)
with Pool(worker) as P:
ans = P.map(sequential, data)
return dict(merge_all(ans))
def timing(data_size=1000):
print("timing for data size {:.0E}".format(data_size))
data = DATA * data_size
@print_durations()
def timing_seq():
sequential(data)
@print_durations()
def timing_multi():
multi(data)
s = timing_seq()
m = timing_multi()
assert s == m
for i in range(3, 8):
timing(10**i)
Output in my laptop, with python 3.10.2:
$ python test.py
timing for data size 1E+03
475.65 mks in timing_seq()
36.63 ms in timing_multi()
timing for data size 1E+04
4.52 ms in timing_seq()
22.28 ms in timing_multi()
timing for data size 1E+05
46.35 ms in timing_seq()
84.26 ms in timing_multi()
timing for data size 1E+06
459.60 ms in timing_seq()
510.74 ms in timing_multi()
timing for data size 1E+07
4.34 s in timing_seq()
3.00 s in timing_multi()
Sadly, multi
performs better only when data reach size of 10^7 level.
But it scales, if you give it more workers like this:
# ...
# Same script
data_size = 10**7
print("timing for data size {:.0E}".format(data_size))
for i in range(1, 6):
worker = 2 ** i
print("worker = {}".format(worker))
data = DATA * data_size
@print_durations()
def timing_multi():
multi(data, worker)
timing_multi()
Output shows the performance scales, but too many workers will introduce overhead which cancel the effect of speed up ;)
timing for data size 1E+07
worker = 2
3.89 s in timing_multi()
worker = 4
2.91 s in timing_multi()
worker = 8
2.47 s in timing_multi()
worker = 16
2.52 s in timing_multi()
worker = 32
3.43 s in timing_multi()
Ref: Is there a simple process-based parallel map for python?