I have time series data per category and I want to cluster them. To get the optimal number of clusters, I have to calculate the sum of squared distances of each data point from its centroid. Since this takes time, I want to parallelized it using Pandas UDF using the number of clusters K. However, it is not being parallelized. The data has more than 200k categories. It is not skewed and also I tested with repartitioning it though it did not make any change. I checked the Spark UI and it shows one stage with only task running forever. I have used Pandas UDF for parallelization multiple times before and it was working fine. I am not sure what is happening now. Can the issue be with the TimeSeriesKMeans
function? I would appreciate any suggestions. Here repartition has been suggested. However, in my case, (1) the data size is not small (2) the default number of partitions is more than 30 but still I increased it using repartition, but did not bring any improvement. As shown in the screenshot below, it is using only 2 worker nodes out of a maximum of 45.
# Read data
stream = spark.read.format('delta').load('/mnt/datadrop-test/fw_fish/stream_bronze/clustering/cvr_cpa_clicks_4hourly_smoothed')
# Create an array per category
stream_array = stream.groupBy(['advertiser_id', 'campaign_name', 'keyword_text', 'keyword_id'])\
.agg(F.collect_list('cvr_cpa_clicks').alias('cvr_cpa_clicks'))
# create a dataframe with the number of clusters to try
# these are the possible values of K
n_clusters = spark.range(5, 500, 5).withColumnRenamed("id","n_cluster")
# cross join them
cross_joined1 = stream_array.crossJoin(F.broadcast(n_clusters)).repartition('n_cluster')
def get_sum_of_squared_distances(df):
k = df.n_cluster.values[0]
df_array = df.cvr_cpa_clicks.values
nrows = df_array.shape[0]
df_array = np.concatenate(df_array).reshape(nrows, -1)
km = TimeSeriesKMeans(n_clusters = k,
max_iter = 500,
n_init = 50,
metric = "euclidean",
n_jobs = -1,
verbose = False,
max_iter_barycenter = 500,
random_state = 0,
init = 'k-means++')
km = km.fit(df_array)
result_df = pd.DataFrame({'n_clusters': [k], 'sum_of_squared_distances': [km.inertia_]})
return result_df
# Get the sum of squared distances for each k(number of clusters)
sum_of_squared_distances = cross_joined1.groupBy('n_cluster').applyInPandas(get_sum_of_squared_distances, schema = "n_clusters long, sum_of_squared_distances double")
display(sum_of_squared_distances)