Today, I was asking me the same. The multiprocessing module offers a ThreadPool
, which is spawning a few threads for you and hence runs the jobs in parallel. First instantiate the functions, then create the Pool, and then map
it over the range you want to iterate.
In my case, I was calculating these WSSSE numbers for different numbers of centers (hyperparameter tuning) to get a "good" k-means clustering ... just like it is outlined in the MLSpark documentation. Without further explanations, here are some cells from my IPython worksheet:
from pyspark.mllib.clustering import KMeans
import numpy as np
c_points are 12dim arrays:
>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
array([-2, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
array([ 7, -1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])]
In the following, for each i
I'm computing this WSSSE value and returning it as a tuple:
def error(point, clusters):
center = clusters.centers[clusters.predict(point)]
return np.linalg.norm(point - center)
def calc_wssse(i):
clusters = KMeans.train(c_points, i, maxIterations=20,
runs=20, initializationMode="random")
WSSSE = c_points\
.map(lambda point: error(point, clusters))\
.reduce(lambda x, y: x + y)
return (i, WSSSE)
Here starts the interesting part:
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)
Run it:
wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points
gives:
[(1, 195318509740785.66),
(2, 77539612257334.33),
(3, 78254073754531.1),
...
]