I'm following this tutorial and try to pick/select top n
events, let's say 10 events/rows after assigning the predicted clusters to main df
and merge them and report it in the form of the spark data frame.
Let's say we I have main dataframe df1
contains the 3 features as below:
+-----+------------+----------+----------+
| id| x| y| z|
+-----+------------+----------+----------+
| row0| -6.0776997|-2.9096103|-1.5181729|
| row1| -1.0122601| 7.322841|-5.4424076|
| row2| -8.297007| 6.3228936| 1.1672047|
| row3| -3.5071216| 4.784812|-5.4449472|
| row4| -5.122823|-3.3220499|-0.5069805|
| row5| -2.4764006| 8.255791| 4.409478|
| row6| 7.3153954| -5.079449| -7.291215|
| row7| -2.0167463| 9.303454| 7.095179|
| row8| -0.2338185| -4.892681| 2.1228876|
| row9| 6.565442| -6.855994|-6.7983212|
|row10| -5.6902847|-6.4827404|-0.9246967|
|row11|-0.017986143| 2.7632365| -8.814824|
|row12| -6.9042625|-6.1491723|-3.5354295|
|row13| -10.389865| 9.537853| 0.674591|
|row14| 3.9688683|-6.0467844| -5.462389|
|row15| -7.337052|-3.7689247| -5.261122|
|row16| -8.991589| 8.738728| 3.864116|
|row17| -0.18098584| 5.482743| -4.900118|
|row18| 3.3193955|-6.3573766| -6.978025|
|row19| -2.0266335|-3.4171724|0.48218703|
+-----+------------+----------+----------+
now I have information out of the clustering algorithm in the form of the datafarame df2
as below:
print("==========================Short report==================================== ")
n_clusters = model.summary.k
#n_clusters
print("Number of predicted clusters: " + str(n_clusters))
cluster_Sizes = model.summary.clusterSizes
#cluster_Sizes
col = ['size']
df2 = pd.DataFrame(cluster_Sizes, columns=col).sort_values(by=['size'], ascending=True) #sorting
cluster_Sizes = df2["size"].unique()
print("Size of predicted clusters: " + str(cluster_Sizes))
clusterSizes
#==========================Short report====================================
#Number of predicted clusters: 10
#Size of predicted clusters: [ 486 496 504 529 985 998 999 1003 2000]
+-----+----------+
| |prediction|
+-----+----------+
| 2| 486|
| 6| 496|
| 0| 504|
| 8| 529|
| 5| 985|
| 9| 998|
| 7| 999|
| 3| 1003|
| 1| 2000|
| 4| 2000|
+-----+----------+
So here, the index column is predicted cluster labels. I could assign the predicted cluster labels into the main dataframe but not cluster size as below:
+-----+----------+------------+----------+----------+
| id|prediction| x| y| z|
+-----+----------+------------+----------+----------+
| row0| 9| -6.0776997|-2.9096103|-1.5181729|
| row1| 4| -1.0122601| 7.322841|-5.4424076|
| row2| 1| -8.297007| 6.3228936| 1.1672047|
| row3| 4| -3.5071216| 4.784812|-5.4449472|
| row4| 3| -5.122823|-3.3220499|-0.5069805|
| row5| 1| -2.4764006| 8.255791| 4.409478|
| row6| 5| 7.3153954| -5.079449| -7.291215|
| row7| 1| -2.0167463| 9.303454| 7.095179|
| row8| 7| -0.2338185| -4.892681| 2.1228876|
| row9| 5| 6.565442| -6.855994|-6.7983212|
|row10| 3| -5.6902847|-6.4827404|-0.9246967|
|row11| 4|-0.017986143| 2.7632365| -8.814824|
|row12| 9| -6.9042625|-6.1491723|-3.5354295|
|row13| 1| -10.389865| 9.537853| 0.674591|
|row14| 2| 3.9688683|-6.0467844| -5.462389|
|row15| 9| -7.337052|-3.7689247| -5.261122|
|row16| 1| -8.991589| 8.738728| 3.864116|
|row17| 4| -0.18098584| 5.482743| -4.900118|
|row18| 2| 3.3193955|-6.3573766| -6.978025|
|row19| 7| -2.0266335|-3.4171724|0.48218703|
+-----+----------+------------+----------+----------+
Now, want to include\report top n rows of each cluster in the form of the dataframe via the following function. What I have tried till now is using (multi-)conditional filtering:
print("==========================Short report==================================== ")
n_clusters = model.summary.k
#n_clusters
print("Number of predicted clusters: " + str(n_clusters))
cluster_Sizes = model.summary.clusterSizes
#cluster_Sizes
col = ['size']
clusterSizes = pd.DataFrame(cluster_Sizes, columns=col).sort_values(by=['size'], ascending=True) #sorting
cluster_Sizes = clusterSizes["size"].unique()
print("Size of predicted clusters: " + str(cluster_Sizes))
clusterSizes
from pyspark.sql.functions import max, min
def cls_report(df):
x1=df.select([min("x")]) # will return max value of each column
x2=df.select([max("y")])
x3=df.select([max("z")])
return x1,x2,x3
#pick top out clusters with minimum instances
km_1st_cls = clusterSizes.values[0]
km_2nd_cls = clusterSizes.values[1]
km_3rd_cls = clusterSizes.values[2]
print(km_1st_cls)
print(km_2nd_cls)
print(km_3rd_cls)
#F1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[0]
F1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]
F2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]
F3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]
L1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]
L2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]
L3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]
T1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]
T2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]
T3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]
print(F1)
print(F2)
print(F3)
print(L1)
print(L2)
print(L3)
print(T1)
print(T2)
print(T3)
df_anomaly_1st_cls = pddf_pred.filter(f"(prediction == {km_1st_cls})") \
.filter(f"y == {L1}") \
.filter(f"z == {T1}") \
.filter(f"x == {F1}")
display(df_anomaly_1st_cls)
I know that in KM algorithm in SciKit-learn we could use based on this post:
clusters=KMeans(n_clusters=5)
df[clusters.labels_==0]
But we don't have access to such labels_
in spark for the quick hack for this task.
Is there any elegant way (maybe define the function) to call it so that we can reflect the results of any clustering algorithm on the main dataframe for better reasoning?
Note: I'm not interested in hacking it by converting spark frame into Pandas datafarme using .toPandas()
update: I might need a function to automate filtering based on multi-conditions gets input dataframe and number of top cluster based on their instances and number of events/rows, in output in returns filtered/selected stacked results in which mock of the expected dataframe is following:
def filtering(df, top_out_cluster=2, top_events/rows=2):
# +-----+----------+------------+----------+----------+
# | id|prediction| x| y| z|
# +-----+----------+------------+----------+----------+
#1st top out cluster | row4| 3| -5.122823|-3.3220499|-0.5069805|
#conditions F1, L1, T1 |row10| 3| -5.6902847|-6.4827404|-0.9246967|
#2nd top out cluster | row8| 7| -0.2338185| -4.892681| 2.1228876|
#conditions F1, L1, T1 |row19| 7| -2.0266335|-3.4171724|0.48218703|
#3rd top out cluster |row18| 2| 3.3193955|-6.3573766| -6.978025|
#conditions F1, L1, T1 |row14| 2| 3.9688683|-6.0467844| -5.462389|
#1st top out cluster | row6| 5| 7.3153954| -5.079449| -7.291215|
#conditions F2, L2, T2 | row9| 5| 6.565442| -6.855994|-6.7983212|
#2nd top out cluster |row12| 9| -6.9042625|-6.1491723|-3.5354295|
#conditions F2, L2, T2 | row0| 9| -6.0776997|-2.9096103|-1.5181729|
#1st top out cluster | row1| 4| -1.0122601| 7.322841|-5.4424076|
#conditions F3, L3, T3 |row11| 4|-0.017986143| 2.7632365| -8.814824|
#2nd top out cluster |row13| 1| -10.389865| 9.537853| 0.674591|
#conditions F3, L3, T3 | row5| 1| -2.4764006| 8.255791| 4.409478|
# +-----+----------+------------+----------+----------+