2

I am working on a dataframe on Pyspark. One column is composed by integer value corresponding to time:

data.select('TIME').show(4)
+------------------+
|TIME              |
+------------------+
|               925|
|              2205|
|              2205|
|              2205|
+------------------+

I would like to round this time to the closest value with a time step of 15 minutes, in order to obtain:

+------------------+
|TIME_15_MIN_STEP  |
+------------------+
|               930|
|              2200|
|              2200|
|              2200|
+------------------+

Does anyone know how to do it?

Thank you very much!!

pault
  • 41,343
  • 15
  • 107
  • 149
Valentina
  • 289
  • 1
  • 4
  • 14
  • I guess for 925 that the algorithm should check if 925 is closer to 930 or to 915, that are the closest multiple values of 15. I don't understand why 2205 has to become 2200? 2205 is already a multiple of 15, which is the logic behind this rounding? – titiro89 May 15 '18 at 08:38
  • Because they represent time (2205 = 22:05 = 10:05 pm) . – Valentina May 15 '18 at 08:50
  • Are there some specific reasons for representing time as an integer (925) instead of a string ("09:25")? It is handy to represent time as an integer if it is a timestamp (e.g. milliseconds from epoch) or as a string if it is a hour or a date (and many operations can be performed in both situations) – titiro89 May 15 '18 at 09:06
  • 1
    Go with UDF. 1st step transform this time to normal `datetime` and then use it https://stackoverflow.com/questions/32723150/rounding-up-to-nearest-30-minutes-in-python?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa – vvg May 15 '18 at 09:14

2 Answers2

4

One way to do this without using a udf is to first convert your integer column into a dummy timestamp, and then do pretty much the same thing as outlined in my answer to a similar question. Finally convert the result back into an integer of the desired format.

More thorough example data

I created an example with some more variability to demonstrate that this method is working properly.

data = sqlCtx.createDataFrame([(925,), (2205,), (2210,), (2242,), (2255,)], ["TIME"])
data.show()
#+----+
#|TIME|
#+----+
#| 925|
#|2205|
#|2210|
#|2242|
#|2255|
#+----+

Converting Integer Column To dummy Timestamp

To convert the integer hour-minute column into a timestamp, we first use pyspark.sql.functions.format_string() to add leading zeros to the time when appropriate. Next we concatenate a dummy date (I used "2018-01-01") with the converted time, and add ":00" at the end (for seconds).

data = data.withColumn("time_string", f.format_string("%04d", f.col("TIME")))\
    .withColumn(
        "time_string",
        f.concat_ws(
            ":",
            f.array(
                [
                    f.substring(
                        "time_string",
                        1,
                        2
                    ),
                    f.substring(
                        "time_string",
                        3,
                        2
                    ),
                    f.lit("00")
                ]
            )
        )
    )\
    .withColumn("time_string", f.concat(f.lit("2018-01-01 "), f.col("time_string")))

data.show()
#+----+-------------------+
#|TIME|        time_string|
#+----+-------------------+
#| 925|2018-01-01 09:25:00|
#|2205|2018-01-01 22:05:00|
#|2210|2018-01-01 22:10:00|
#|2242|2018-01-01 22:42:00|
#|2255|2018-01-01 22:55:00|
#+----+-------------------+

Compute how many minutes to offset the timestamp

Use pyspark.sql.functions.minute() to get the minute from the dummy timestamp. We divide by 15, round, and then multiply by 15 to get the "new" minute. (This logic is explained in more detail in the linked answer.)

data = data.withColumn("minute", f.minute("time_string"))\
    .withColumn("new_minute", f.round(f.col("minute")/15)*15)\
    .withColumn("minute_add", f.col("new_minute") - f.col("minute"))\

data.show()
#+----+-------------------+------+----------+----------+
#|TIME|        time_string|minute|new_minute|minute_add|
#+----+-------------------+------+----------+----------+
#| 925|2018-01-01 09:25:00|    25|      30.0|       5.0|
#|2205|2018-01-01 22:05:00|     5|       0.0|      -5.0|
#|2210|2018-01-01 22:10:00|    10|      15.0|       5.0|
#|2242|2018-01-01 22:42:00|    42|      45.0|       3.0|
#|2255|2018-01-01 22:55:00|    55|      60.0|       5.0|
#+----+-------------------+------+----------+----------+

Add offset in seconds, convert back to integer

Multiply the minute_add column by 60 to get the offset in seconds. Add this to the time_string to get the "new" time.

data = data.withColumn(
        "new_time",
        f.from_unixtime(f.unix_timestamp("time_string") + f.col("minute_add")*60)
    )\
    .withColumn(
        "NEW_TIME",
        f.format_string("%02d%02d", f.hour("new_time"), f.minute("new_time")).cast("int")
    )
data.select("TIME", "NEW_TIME").show()
#+----+--------+
#|TIME|NEW_TIME|
#+----+--------+
#| 925|     930|
#|2205|    2200|
#|2210|    2215|
#|2242|    2245|
#|2255|    2300|
#+----+--------+
pault
  • 41,343
  • 15
  • 107
  • 149
  • You are a scholar and a gentleman. This is such a great answer, although the one below it looks good too. – rjurney Sep 26 '20 at 23:36
4

A nicer way to group by 15 minutes is to use pyspark.sql.functions.window on your timestamp:

df = df \
   .groupBy(F.window("timestamp", "15 minutes")) \
   .withColumn("timestamp", F.col("window.start"))

See documentation here

LN_P
  • 1,448
  • 4
  • 21
  • 37