When looking at my input data frame below, what I'm hoping to do is be able to select the timeframe for each month where Diff_from_50
is the lowest. If there are any ties in this value, it should look at the AvgWindSpeed
and select which ever has the lowest windspeed.
What would be the best way to do this in Scala? I've been working with the following code, but when I group by Month
I lose my other columns. I'm also not exactly sure how to approach comparing the differences in temperature and then select the one with the lowest WindSpeed if there are ties.
Any suggestions/tips would be appreciated.
Current Code:
val oshdata = osh.select(col("TemperatureF"),col("Wind SpeedMPH"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).withColumn("TemperatureF",when(col("TemperatureF").equalTo(-9999),null).otherwise(col("TemperatureF"))).withColumn("Wind SpeedMPH",when(col("Wind SpeedMPH").equalTo(-9999),null).otherwise(col("Wind SpeedMPH"))).withColumn("WindSpeed",when($"Wind SpeedMPH" === "Calm",0).otherwise($"Wind SpeedMPH"))
val ts = to_timestamp($"Date","MM/dd/yyyy hh:mm a")
val Oshmydata=oshdata.withColumn("ts",ts)
val OshgroupByWindow = Oshmydata.groupBy(window(col("ts"), "1 hour")).agg(avg("TemperatureF").as("avgTemp"),avg("WindSpeed").as("AvgWindSpeed")).select("window.start", "window.end", "avgTemp","AvgWindSpeed")
val Oshdaily = OshgroupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),date_format($"_tmp".getItem(1),"hh:mm:ss a").as("startTime"),$"end",$"avgTemp",$"AvgWindSpeed").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",date_format($"_tmp2".getItem(1),"hh:mm:ss a").as("EndTime"),$"avgTemp",$"AvgWindSpeed").withColumn("Diff_From_50",abs($"avgTemp"-50))
val OshfinalData = Oshdaily.select(col("*"),month(col("Date")).as("Month")).orderBy($"Month",$"StartTime")
OshfinalData.createOrReplaceTempView("oshView")
val testing = OshfinalData.select(col("*")).groupBy($"Month",$"StartTime").agg(avg($"avgTemp").as("avgTemp"),avg($"AvgWindSpeed").as("AvgWindSpeed"))
val withDiff = testing.withColumn("Diff_from_50",abs($"avgTemp"-50))
withDiff.select(col("*")).groupBy($"Month").agg(min("Diff_from_50")).show()
Input Data Frame:
+-----+-----------+------------------+------------------+-------------------+
|Month| StartTime| avgTemp| AvgWindSpeed| Diff_from_50|
+-----+-----------+------------------+------------------+-------------------+
| 1|01:00:00 AM|17.375469072164957| 8.336983230663929| 32.62453092783504|
| 1|01:00:00 PM| 23.70729813664597|10.294075601374567| 26.29270186335403|
| 1|02:00:00 AM| 17.17661058638331| 8.332715559474817| 32.823389413616695|
| 1|02:00:00 PM| 23.78028142954523|10.131929492774708| 26.21971857045477|
| 1|03:00:00 AM|16.979751170960192| 8.305847424684158| 33.02024882903981|
| 1|03:00:00 PM| 23.78028142954523|11.131929492774708| 26.21971857045477|
| 2|01:00:00 AM| 18.19221536796537| 8.104439935064937| 31.80778463203463|
| 2|01:00:00 PM|25.602093162953263|10.756156072520753| 24.397906837046737|
| 2|02:00:00 AM| 17.7650265755505| 8.142266514806375| 32.2349734244495|
| 2|02:00:00 PM|25.602093162953263|11.756156072520753| 24.397906837046737|
+-----+-----------+------------------+------------------+-------------------+
Expected output:
+-----+-----------+------------------+------------------+-------------------+
|Month| StartTime| avgTemp| AvgWindSpeed| Diff_from_50|
+-----+-----------+------------------+------------------+-------------------+
| 1|02:00:00 PM| 23.78028142954523|10.131929492774708| 26.21971857045477|
| 2|01:00:00 PM|25.602093162953263|10.756156072520753| 24.397906837046737|
+-----+-----------+------------------+------------------+-------------------+