I have used multiprocessing Pool to get some performance benefit over my sequential approach. However result is just opposite and Pool takes more time than sequential:
import multiprocessing as mp
import datetime
class A:
def __init__(self):
self.result_list = []
# parallel processing function
def foo_pool(self, data):
for d in data:
d[0] = d[0] * 10
return data
# sequential function
def foo_seq(self, data):
data[0] = data[0] * 10
return data
def log_result(self, result):
# This is called whenever foo_pool(i) returns a result.
self.result_list.extend([result])
def apply_async_with_callback(self):
pool = mp.Pool(8)
# Data Creation
lst = []
for i in range(100000):
lst.append([i, i + 1, i + 2])
print('length of data ', len(lst))
dtStart = datetime.datetime.now()
print('start time:', str(datetime.datetime.now()))
# Multiprocessing takes 2 secs
for data in self.chunks(lst, 1000):
pool.apply_async(self.foo_pool, args=(data,),
callback=self.log_result)
# Sequential. It is 10x faster than pool
# for d in lst:
# self.result_list.extend([self.foo_seq(d)])
pool.close()
pool.join()
print('output data length:', len(self.result_list))
dtEnd = datetime.datetime.now()
print('end time:', str(datetime.datetime.now()))
print('Time taken:', str(dtEnd - dtStart))
# Divide big data into chunks
def chunks(self, data, n):
for i in range(0, len(data), n):
res = data[i:i + n]
yield res
if __name__ == '__main__':
a = A()
a.apply_async_with_callback()
In above python code, in apply_async_with_callback(). If you un-comment the sequential code and run, result would get 10 times faster then multiprocessing Pool code.
Can someone help me understand, what is the wrong thing i am doing?
Edit: After applying the code provided in Why is multiprocessed code in given code taking more time than usual sequential execution?
sequential is now only 2 times faster than parallel processing code. Updated code below:
import multiprocessing as mp
import datetime
class A:
def __init__(self):
self.result_list = []
# parallel processing function
def foo_pool(self, data):
for d in data:
d[0] = d[0] * float(10) + 10 * (float(d[0]) / 100)
return data
def log_result(self, result):
# This is called whenever foo_pool(i) returns a result.
self.result_list.extend([result])
def flatten(self, ll):
lst = []
for l in ll:
lst.extend(l)
return lst
def square(self, x):
return x * x
def squareChunk(self, chunk):
return self.foo_pool(chunk) #[self.foo_pool(x) for x in chunk]
def apply_async_with_callback(self):
# Data Creation
lst = []
for i in range(1000000):
lst.append([i, i + 1, i + 2])
print('length of data ', len(lst))
chunked = self.chunks(lst, 10000) # split original list in decent sized chunks
pool = mp.Pool(2)
dtStart = datetime.datetime.now()
print('start time:', str(datetime.datetime.now()))
results = self.flatten(pool.map(self.squareChunk, chunked))
pool.close()
pool.join()
print('output data length:', len(results))
dtEnd = datetime.datetime.now()
print('end time:', str(datetime.datetime.now()))
print('multi proc Time taken:', str(dtEnd - dtStart))
def chunks(self, l, n):
n = max(1, n)
return (l[i:i + n] for i in range(0, len(l), n))
if __name__ == '__main__':
a = A()
a.apply_async_with_callback()
I can see the difference of using Pool.map instead of Pool.apply_async. Code is faster now. Earlier it was 10 times slower than sequential, now it is 2 times slower. But... slower....
This is how multiprocessing behaves? Then what is the point of using multiprocessing? Or am i still doing something wrong?