0

my input is a Spark dataframe :

EventTime,Signal
0,-65
10,-63
20,-71
40,-65
50,-62
80,-81
90,-84
100,-81
...
85460,-71
85480,-66
85490,-89
85500,-80

I would like to get the mean of Signal for each 900 seconds of EventTime, the output looks like this :

EventTime, MeanSignal
0, mean 
900, mean 
1800, mean
...
85500, mean

My problem is that there is not a regular step of EventTime in the regular data, so I can't split the dataframe in same length parts...

zero323
  • 322,348
  • 103
  • 959
  • 935
SimbaPK
  • 566
  • 1
  • 7
  • 26
  • Please read [How to make good reproducible Apache Spark Dataframe examples](https://stackoverflow.com/q/48427185) and [edit] your question accordingly. – zero323 Oct 16 '18 at 10:22
  • Possible duplicate of [How to group by time interval in Spark SQL](https://stackoverflow.com/questions/37632238/how-to-group-by-time-interval-in-spark-sql) – zero323 Oct 16 '18 at 10:26
  • Also [PySpark Numeric Window Group By](https://stackoverflow.com/q/48467215) might be useful – zero323 Oct 16 '18 at 10:29

2 Answers2

0

You can add a new column as EventTime/900 and group by based on that column. Something like this.

val map = Map(0 -> -65 , 10 -> -63 , 20 -> -71 , 40 -> -65 , 50 -> -62 , 80 -> -81 , 90 -> -84 , 100 -> -81 , 85460 -> -71 , 85480 -> -66 , 85490 -> -89 , 85500 -> -80)

val df = map.toSeq.toDF("EventTime", "Signal")
  .withColumn("EventTimeGrp", (col("EventTime")/lit(900)).cast("int"))
  .groupBy("EventTimeGrp").agg(mean("Signal").as("MeanSignal"))
  .withColumn("EventTime", col("EventTimeGrp")*lit(900))
  .drop("EventTimeGrp")

The results looks like this. The EventTime 0 represents values between 0-899 and so on.

+------------------+---------+ | MeanSignal|EventTime| +------------------+---------+ |-75.33333333333333| 84600| | -71.5| 0| | -80.0| 85500| +------------------+---------+

Apurba Pandey
  • 1,061
  • 10
  • 21
0

Ok here's my solution thank's to other posts : I created a column Bucket associated to the modulo of EventTime to create categories, and then i groupby buckets and take the mean

    val df = data_input.withColumn("Bucket", toBucketUDF(col("EventTime")))

    val finalDF = df.groupBy("Bucket")
      .agg(mean("RSSI"))
      .withColumnRenamed("avg(RSSI)", "RSSI")
      .orderBy("Bucket")
      .withColumn("EventTime", getTimeUDF(col("Bucket")))
      .drop("Bucket")



    finalDF

  }

  def toBucket(input:Int): Int = {
    val Bucket = input/900
    return Bucket
  }
  def getTime(input: Int): Int = {
    val time = (input+1) * 900
    return time
  }

  val toBucketUDF = udf(toBucket _)
  val getTimeUDF = udf(getTime _)
SimbaPK
  • 566
  • 1
  • 7
  • 26