2

I have a DataFrame like this :

finalSondDF.show()
    +---------------+------------+----------------+
    |webService_Name|responseTime|numberOfSameTime|
    +---------------+------------+----------------+
    |    webservice1|          80|               1|
    |    webservice1|          87|               2|
    |    webservice1|         283|               1|
    |    webservice2|          77|               2|
    |    webservice2|          80|               1|
    |    webservice2|          81|               1|
    |    webservice3|          63|               3|
    |    webservice3|         145|               1|
    |    webservice4|         167|               1|
    |    webservice4|         367|               2|
    |    webservice4|         500|               1|
    +---------------+------------+----------------+  

and I want to get a result like this :

+---------------+------------+----------------+------+
|webService_Name|responseTime|numberOfSameTime|Result|
+---------------+------------+----------------+------+
|    webservice1|          80|               1|     1|
|    webservice1|          87|               2|     3|  ==> 2+1
|    webservice1|         283|               1|     4|  ==> 1+2+1
|    webservice2|          77|               2|     2|  
|    webservice2|          80|               1|     3|  ==> 2+1
|    webservice2|          81|               1|     4|  ==> 2+1+1
|    webservice3|          63|               3|     3|
|    webservice3|         145|               1|     4|  ==> 3+1
|    webservice4|         167|               1|     1|
|    webservice4|         367|               2|     3|  ==> 1+2
|    webservice4|         500|               1|     4|  ==> 1+2+1
+---------------+------------+----------------+------+  

here the result is the sum of numberOfSameTime inferior of the current responseTime
I can't find a logic to do that. Can any one help me !!

SCouto
  • 7,808
  • 5
  • 32
  • 49
jean-marc
  • 68
  • 4

2 Answers2

1

If your data is in increasing order with responseTime column for each group of webService_Name column then you can benefit from cumulative sum using Window function as below

import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("webService_Name").orderBy("responseTime")

import org.apache.spark.sql.functions._
df.withColumn("Result", sum("numberOfSameTime").over(windowSpec)).show(false)

and you should have

+---------------+------------+----------------+------+
|webService_Name|responseTime|numberOfSameTime|Result|
+---------------+------------+----------------+------+
|webservice1    |80          |1               |1     |
|webservice1    |87          |2               |3     |
|webservice1    |283         |1               |4     |
|webservice2    |80          |1               |3     |
|webservice2    |81          |1               |4     |
|webservice2    |77          |2               |2     |
|webservice3    |145         |1               |4     |
|webservice3    |63          |3               |3     |
|webservice4    |167         |1               |1     |
|webservice4    |367         |2               |3     |
|webservice4    |500         |1               |4     |
+---------------+------------+----------------+------+

Note that the responseTime as to be number type and in increasing order for each webService_Name for the above case to work

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
1

You can use Window function available in spark and calculate the cumulative sum as below.

  //dummy data 
  val d1 = spark.sparkContext.parallelize(Seq(
    ("webservice1", 80, 1),
    ("webservice1", 87, 2),
    ("webservice1", 283, 1),
    ("webservice2", 77, 2),
    ("webservice2", 80, 1),
    ("webservice2", 81, 1),
    ("webservice3", 63, 3),
    ("webservice3", 145, 1),
    ("webservice4", 167, 1),
    ("webservice4", 367, 2),
    ("webservice4", 500, 1)
  )).toDF("webService_Name","responseTime","numberOfSameTime")

  //window functionn 
  val window = Window.partitionBy("webService_Name").orderBy($"webService_Name")
    .rowsBetween(Long.MinValue, 0)

  // create new column for Result
  d1.withColumn("Result", sum("numberOfSameTime").over(window)).show(false)

Output:

+---------------+------------+----------------+------+
|webService_Name|responseTime|numberOfSameTime|Result|
+---------------+------------+----------------+------+
|webservice4    |167         |1               |1     |
|webservice4    |367         |2               |3     |
|webservice4    |500         |1               |4     |
|webservice2    |77          |2               |2     |
|webservice2    |80          |1               |3     |
|webservice2    |81          |1               |4     |
|webservice3    |63          |3               |3     |
|webservice3    |145         |1               |4     |
|webservice1    |80          |1               |1     |
|webservice1    |87          |2               |3     |
|webservice1    |283         |1               |4     |
+---------------+------------+----------------+------+

Hope this helps!

koiralo
  • 22,594
  • 6
  • 51
  • 72