0

I'm following this tutorial and try to pick/select top n events, let's say 10 events/rows after assigning the predicted clusters to main df and merge them and report it in the form of the spark data frame. Let's say we I have main dataframe df1 contains the 3 features as below:

+-----+------------+----------+----------+
|   id|           x|         y|         z|
+-----+------------+----------+----------+
| row0|  -6.0776997|-2.9096103|-1.5181729|
| row1|  -1.0122601|  7.322841|-5.4424076|
| row2|   -8.297007| 6.3228936| 1.1672047|
| row3|  -3.5071216|  4.784812|-5.4449472|
| row4|   -5.122823|-3.3220499|-0.5069805|
| row5|  -2.4764006|  8.255791|  4.409478|
| row6|   7.3153954| -5.079449| -7.291215|
| row7|  -2.0167463|  9.303454|  7.095179|
| row8|  -0.2338185| -4.892681| 2.1228876|
| row9|    6.565442| -6.855994|-6.7983212|
|row10|  -5.6902847|-6.4827404|-0.9246967|
|row11|-0.017986143| 2.7632365| -8.814824|
|row12|  -6.9042625|-6.1491723|-3.5354295|
|row13|  -10.389865|  9.537853|  0.674591|
|row14|   3.9688683|-6.0467844| -5.462389|
|row15|   -7.337052|-3.7689247| -5.261122|
|row16|   -8.991589|  8.738728|  3.864116|
|row17| -0.18098584|  5.482743| -4.900118|
|row18|   3.3193955|-6.3573766| -6.978025|
|row19|  -2.0266335|-3.4171724|0.48218703|
+-----+------------+----------+----------+

now I have information out of the clustering algorithm in the form of the datafarame df2 as below:

print("==========================Short report==================================== ")

n_clusters = model.summary.k
#n_clusters
print("Number of predicted clusters: " + str(n_clusters))

cluster_Sizes = model.summary.clusterSizes
#cluster_Sizes 

col = ['size']
df2 = pd.DataFrame(cluster_Sizes, columns=col).sort_values(by=['size'], ascending=True)  #sorting
cluster_Sizes = df2["size"].unique()
print("Size of predicted clusters: " + str(cluster_Sizes))
clusterSizes

#==========================Short report==================================== 
#Number of predicted clusters: 10
#Size of predicted clusters: [ 486  496  504  529  985  998  999 1003 2000]

+-----+----------+
|     |prediction|
+-----+----------+
|    2|       486|
|    6|       496|
|    0|       504|
|    8|       529|
|    5|       985|
|    9|       998|
|    7|       999|
|    3|      1003|
|    1|      2000|
|    4|      2000|
+-----+----------+

So here, the index column is predicted cluster labels. I could assign the predicted cluster labels into the main dataframe but not cluster size as below:

+-----+----------+------------+----------+----------+
|   id|prediction|           x|         y|         z|
+-----+----------+------------+----------+----------+
| row0|         9|  -6.0776997|-2.9096103|-1.5181729|
| row1|         4|  -1.0122601|  7.322841|-5.4424076|
| row2|         1|   -8.297007| 6.3228936| 1.1672047|
| row3|         4|  -3.5071216|  4.784812|-5.4449472|
| row4|         3|   -5.122823|-3.3220499|-0.5069805|
| row5|         1|  -2.4764006|  8.255791|  4.409478|
| row6|         5|   7.3153954| -5.079449| -7.291215|
| row7|         1|  -2.0167463|  9.303454|  7.095179|
| row8|         7|  -0.2338185| -4.892681| 2.1228876|
| row9|         5|    6.565442| -6.855994|-6.7983212|
|row10|         3|  -5.6902847|-6.4827404|-0.9246967|
|row11|         4|-0.017986143| 2.7632365| -8.814824|
|row12|         9|  -6.9042625|-6.1491723|-3.5354295|
|row13|         1|  -10.389865|  9.537853|  0.674591|
|row14|         2|   3.9688683|-6.0467844| -5.462389|
|row15|         9|   -7.337052|-3.7689247| -5.261122|
|row16|         1|   -8.991589|  8.738728|  3.864116|
|row17|         4| -0.18098584|  5.482743| -4.900118|
|row18|         2|   3.3193955|-6.3573766| -6.978025|
|row19|         7|  -2.0266335|-3.4171724|0.48218703|
+-----+----------+------------+----------+----------+

Now, want to include\report top n rows of each cluster in the form of the dataframe via the following function. What I have tried till now is using (multi-)conditional filtering:

print("==========================Short report==================================== ")

n_clusters = model.summary.k
#n_clusters
print("Number of predicted clusters: " + str(n_clusters))

cluster_Sizes = model.summary.clusterSizes
#cluster_Sizes 

col = ['size']
clusterSizes = pd.DataFrame(cluster_Sizes, columns=col).sort_values(by=['size'], ascending=True)  #sorting
cluster_Sizes = clusterSizes["size"].unique()
print("Size of predicted clusters: " + str(cluster_Sizes))
clusterSizes

from pyspark.sql.functions import max, min

def cls_report(df):
  x1=df.select([min("x")])        # will return max value of each column
  x2=df.select([max("y")])
  x3=df.select([max("z")])
  return x1,x2,x3

#pick top out clusters with minimum instances
km_1st_cls = clusterSizes.values[0]
km_2nd_cls = clusterSizes.values[1]
km_3rd_cls = clusterSizes.values[2]
print(km_1st_cls)
print(km_2nd_cls)
print(km_3rd_cls)


#F1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[0]
F1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]
F2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]
F3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]

L1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]
L2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]
L3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]

T1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]
T2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]
T3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]

print(F1)
print(F2)
print(F3)

print(L1)
print(L2)
print(L3)

print(T1)
print(T2)
print(T3)


df_anomaly_1st_cls = pddf_pred.filter(f"(prediction == {km_1st_cls})") \
                                .filter(f"y == {L1}") \
                                .filter(f"z == {T1}") \
                                .filter(f"x == {F1}") 

display(df_anomaly_1st_cls)

I know that in KM algorithm in SciKit-learn we could use based on this post:

clusters=KMeans(n_clusters=5)
df[clusters.labels_==0] 

But we don't have access to such labels_ in spark for the quick hack for this task. Is there any elegant way (maybe define the function) to call it so that we can reflect the results of any clustering algorithm on the main dataframe for better reasoning?

Note: I'm not interested in hacking it by converting spark frame into Pandas datafarme using .toPandas()

update: I might need a function to automate filtering based on multi-conditions gets input dataframe and number of top cluster based on their instances and number of events/rows, in output in returns filtered/selected stacked results in which mock of the expected dataframe is following:

def filtering(df, top_out_cluster=2, top_events/rows=2):

#                            +-----+----------+------------+----------+----------+
#                            |   id|prediction|           x|         y|         z|
#                            +-----+----------+------------+----------+----------+
#1st top out cluster         | row4|         3|   -5.122823|-3.3220499|-0.5069805|
#conditions F1, L1, T1       |row10|         3|  -5.6902847|-6.4827404|-0.9246967|
    
#2nd top out cluster         | row8|         7|  -0.2338185| -4.892681| 2.1228876|
#conditions F1, L1, T1       |row19|         7|  -2.0266335|-3.4171724|0.48218703|

#3rd top out cluster         |row18|         2|   3.3193955|-6.3573766| -6.978025|
#conditions F1, L1, T1       |row14|         2|   3.9688683|-6.0467844| -5.462389|
                            

#1st top out cluster         | row6|         5|   7.3153954| -5.079449| -7.291215|
#conditions F2, L2, T2       | row9|         5|    6.565442| -6.855994|-6.7983212|

#2nd top out cluster         |row12|         9|  -6.9042625|-6.1491723|-3.5354295|
#conditions F2, L2, T2       | row0|         9|  -6.0776997|-2.9096103|-1.5181729|

#1st top out cluster         | row1|         4|  -1.0122601|  7.322841|-5.4424076|
#conditions F3, L3, T3       |row11|         4|-0.017986143| 2.7632365| -8.814824|
                            
#2nd top out cluster         |row13|         1|  -10.389865|  9.537853|  0.674591|
#conditions F3, L3, T3       | row5|         1|  -2.4764006|  8.255791|  4.409478|
#                            +-----+----------+------------+----------+----------+
Mario
  • 1,631
  • 2
  • 21
  • 51
  • It is not clear what you're trying to achieve and what your expected output is. Furthermore, what is `df_km_pred` and where have you defined it? Am I missing something? Also, where are `Freq`, `Length` and `Token_number` created? – Ric S Sep 07 '21 at 07:53
  • Also, what do you mean by "top" events? Why is a certain record "top"? Sorry for all these questions, I'm trying to comprehend what you are trying to achieve – Ric S Sep 07 '21 at 08:43
  • I updated the post, but I provided the [colab notebook](https://colab.research.google.com/drive/1DMBMlICT-iq5_i5Oz-NC5WS4eBPRdgrB?usp=sharing) kindly for quick debugging. Long story short, after assigning the clustering algorithm results on the main dataframe, I want to select the top n rows=events of each cluster and stack them and report in the form of the dataframe. – Mario Sep 07 '21 at 10:38
  • Thank you for the additional explanation. I still don't get what you mean by "top": what is the rationale that identify the "top" rows for each identified cluster? – Ric S Sep 07 '21 at 10:51
  • I want to report the results of predicted clusters that indicate outliers/anomalies in the main dataframe. `df.head()` this provides top events and rows or `df.sample()` provides random events or let's say incidents in data! Looking for an elegant way to automate this job! – Mario Sep 07 '21 at 10:58
  • What is `pred_Kmeans_PCA`? Also, outliers/anomalies are indicated with F1/F2/F3/L1/...? I don't understand what those metrics are. Also, why are you comparing to sklearn since it doesn't have a way to detect anomalies? – Ric S Sep 07 '21 at 12:30
  • I updated the rest part to make it much clear `pred_Kmeans_PCA` which I replaced by `prediction` in new update of post is the column which shows *predicted cluster label* after KMeans clustering algorithm on vector of PCA algorithm. F1/F2/F3/L1/... are exact maximum values I extract from final dataframe means `pddf_pred` for highly variance features columns `x` or `Freq`, `y` or `Length`, `z` or `Token_number`. Let's say F1, F2, F3 are minimum value of column named `x` or `Freq` for filtering in 1st, 2nd & 3rd clusters respectively. L1, L2, L3 are maximum values of top 3 clusters, etc – Mario Sep 07 '21 at 12:51
  • are you expecting to get a dataframe show min(x), max(y), max(z) for each cluster in pddf_pred dataframe?. If not kindly share a mock of the expected dataframe. – hprakash Sep 08 '21 at 06:16
  • @hprakash Please see the update at the end of the post. It would be great if you offer your solution in [colab notebook](https://colab.research.google.com/drive/1DMBMlICT-iq5_i5Oz-NC5WS4eBPRdgrB?usp=sharing) for quick debugging and test. – Mario Sep 08 '21 at 08:32
  • check the notebook, let me know – hprakash Sep 08 '21 at 12:36
  • @hprakash I updated the notebook plz check it out. – Mario Sep 08 '21 at 14:07
  • @Mario, check cell 52, 53, 54 of the notebook – hprakash Sep 08 '21 at 19:24
  • 1
    Thanks for your solution. It looks fine and It would be great if you form your solution in the form of function as I mentioned in the update of the post at the end of the question. Please form your final answer here so that I can accept it. – Mario Sep 08 '21 at 20:25

1 Answers1

2

as per your requirements discussed over colab, following is the code.

from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql.functions import row_number
winspec = Window.partitionBy("prediction").orderBy("prediction")

def get_top_n_clusters(model, top_out_cluster: int):
  n_clusters = model.summary.k
  cluster_size = model.summary.clusterSizes
  
  if top_out_cluster > n_clusters:
    raise ValueError(f"n value cannot be greater than cluster_size")

  col = ['size']
  cluster_size = pd.DataFrame(cluster_size, columns=col).sort_values(by=['size'], ascending=True)  #sorting
  return list(cluster_size.head(top_out_cluster).index)
  

def filtering(df, labels: list, top_records: int):
  winspec = Window.partitionBy("prediction").orderBy("prediction")
  return (
      df.filter(f.col("prediction").isin(labels))
        .withColumn("rowNum", f.row_number().over(winspec))
        .withColumn("minX", min(f.col("x")).over(winspec))
        .withColumn("maxY", max(f.col("y")).over(winspec))
        .withColumn("maxZ", max(f.col("z")).over(winspec))
        .filter(f.col("rowNum")<=top_records)
        .selectExpr("id", "prediction", "minX as x", "maxY as y", "maxZ as z")
  )
cluster_labels = get_top_n_clusters(model, top_out_cluster=4)
fdf = filtering(df_pred, labels=cluster_labels, top_records=1)
fdf.show()

+-----+----------+----------+----------+-----------+
|   id|prediction|         x|         y|          z|
+-----+----------+----------+----------+-----------+
|row15|         7|-10.505684|-1.6820424|-0.32242402|
| row4|         9| -9.426199| 0.5639291|  3.5664654|
| row0|         2| -8.317323|-1.3278837|-0.33906546|
|row14|         4|-0.9338185|-3.6411285| -3.7280529|
+-----+----------+----------+----------+-----------+

Mario
  • 1,631
  • 2
  • 21
  • 51
hprakash
  • 452
  • 2
  • 10
  • There is no iteration done in my solution. If u can share the detailed exception, i can try to fix it – hprakash Sep 10 '21 at 04:04