I have a model.predict()
-method and 65536 rows of data which takes about 7 seconds to perform. I wanted to speed this up using the joblib.parallel_backend
tooling using this example.
this is my code:
import numpy as np
from joblib import load, parallel_backend
from time import clock as time
from urllib.request import urlopen
NN_model=load(urlopen("http://clima-dods.ictp.it/Users/tompkins/CRM/nnet_3var.jl"))
npt=65536
t=np.random.uniform(low=-1,high=1,size=npt)
u=np.random.uniform(low=-1,high=1,size=npt)
q=np.random.uniform(low=-1,high=1,size=npt)
X=np.column_stack((u,t,q))
t0=time()
out1=NN_model.predict(X)os.system('taskset -cp 0-%d %s' % (ncore, os.getpid()))
t1=time()
print("serial",t1-t0)
with parallel_backend('threading', n_jobs=-1):
out2=NN_model.predict(X)
t2=time()
print("parallel",t2-t1)
And these are my timings:
serial 6.481805
parallel 6.389198
I know from past experience that very small tasks are not speeded up by parallel shared memory techniques due to the overhead, as is also the posted answer here, but this is not the case here, as the job is 7 seconds and should far exceed any overhead. In fact, I traced the load on the machine and it seems to only be running in serial.
What am I doing wrong with the joblib
specification? How can I use threading on my desktop to parallelize this task with joblib
(or an alternative)?
Edit 1
From the post below, I was wondering if the application of joblib attempts to apply parallelization to model itself, rather than dividing up the rows of data into ncore batches to distribute to each core. Thus I decided that maybe I would need to do this division manually myself and farm the out the data "chunks" to each core. I've thus tried to use now Parallel and delay instead, chunking the data as per this post,
from joblib import Parallel, delayed
ncore = 8
nchunk = int( npt / ncore )
parallel = Parallel( n_jobs = ncore )
results = parallel( delayed( NN_model.predict )
( X[i*nchunk:(i+1)*nchunk,:] )
for i in range( ncore )
)
This now runs ncore
-instances on my machine, but they are all running at 1 / ncore
efficiency (as if it were gating?) and the wall-clock is still not improved...
Edit 2
As an alternative, I have now also tried to do the manual division of the dataset using the multiprocessing package,
import multiprocessing
def predict_chunk(Xchunk):
results=NN_model.predict(Xchunk)
return (results)
pool=multiprocessing.Pool(processes=ncore)
os.system('taskset -cp 0-%d %s' % (ncore, os.getpid()))
stats=pool.starmap(predict_chunk,([X[i*nchunk:(i+1)*nchunk,:]] for i in range(ncore)))
res=np.vstack(stats).flatten()
pool.close()
pool.join()
Apart from the overhead of dividing the input data up and restacking the results, the problem should be embarrassingly parallel. Then I recalled earlier posts, and was wondering if the issue with the slow performance was due to the task affinity issue upon importing numpy as reported here, so I added the os.system
command, but that doesn't seem to help, I still get each of 8 cores using around 12% of their CPU load and an overall timing that is now slightly slower than the serial solution due to the aforementioned overhead.
Edit 3
I've now tried to use ray instead
import ray
@ray.remote
def predict_chunk(Xchunk,start,end):
results=NN_model.predict(Xchunk[start:end,:])
return (results)
ray.init(num_cpus=ncore)
data_id=ray.put(X)
stats=ray.get([predict_chunk.remote(data_id,i*nchunk,(i+1)*nchunk) for i in range(ncore)])
res=np.vstack(stats).flatten()
Again, this creates 8 sub-processes, but they are all running on a single CPU and thus the parallel process is slower than the serial.
I'm almost certain this is related to the affinity issue referred to above, but the solutions don't seem to be working.
This is a summary of the architecture:
Linux hp6g4-clima-5.xxxx.it 4.15.0-124-generic #127-Ubuntu SMP Fri Nov 6 10:54:43 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux