One way to do this without using a udf
is to first convert your integer column into a dummy timestamp, and then do pretty much the same thing as outlined in my answer to a similar question. Finally convert the result back into an integer of the desired format.
More thorough example data
I created an example with some more variability to demonstrate that this method is working properly.
data = sqlCtx.createDataFrame([(925,), (2205,), (2210,), (2242,), (2255,)], ["TIME"])
data.show()
#+----+
#|TIME|
#+----+
#| 925|
#|2205|
#|2210|
#|2242|
#|2255|
#+----+
Converting Integer Column To dummy Timestamp
To convert the integer hour-minute column into a timestamp, we first use pyspark.sql.functions.format_string()
to add leading zeros to the time when appropriate. Next we concatenate a dummy date (I used "2018-01-01"
) with the converted time, and add ":00"
at the end (for seconds).
data = data.withColumn("time_string", f.format_string("%04d", f.col("TIME")))\
.withColumn(
"time_string",
f.concat_ws(
":",
f.array(
[
f.substring(
"time_string",
1,
2
),
f.substring(
"time_string",
3,
2
),
f.lit("00")
]
)
)
)\
.withColumn("time_string", f.concat(f.lit("2018-01-01 "), f.col("time_string")))
data.show()
#+----+-------------------+
#|TIME| time_string|
#+----+-------------------+
#| 925|2018-01-01 09:25:00|
#|2205|2018-01-01 22:05:00|
#|2210|2018-01-01 22:10:00|
#|2242|2018-01-01 22:42:00|
#|2255|2018-01-01 22:55:00|
#+----+-------------------+
Compute how many minutes to offset the timestamp
Use pyspark.sql.functions.minute()
to get the minute from the dummy timestamp. We divide by 15, round, and then multiply by 15 to get the "new" minute. (This logic is explained in more detail in the linked answer.)
data = data.withColumn("minute", f.minute("time_string"))\
.withColumn("new_minute", f.round(f.col("minute")/15)*15)\
.withColumn("minute_add", f.col("new_minute") - f.col("minute"))\
data.show()
#+----+-------------------+------+----------+----------+
#|TIME| time_string|minute|new_minute|minute_add|
#+----+-------------------+------+----------+----------+
#| 925|2018-01-01 09:25:00| 25| 30.0| 5.0|
#|2205|2018-01-01 22:05:00| 5| 0.0| -5.0|
#|2210|2018-01-01 22:10:00| 10| 15.0| 5.0|
#|2242|2018-01-01 22:42:00| 42| 45.0| 3.0|
#|2255|2018-01-01 22:55:00| 55| 60.0| 5.0|
#+----+-------------------+------+----------+----------+
Add offset in seconds, convert back to integer
Multiply the minute_add
column by 60 to get the offset in seconds. Add this to the time_string
to get the "new" time.
data = data.withColumn(
"new_time",
f.from_unixtime(f.unix_timestamp("time_string") + f.col("minute_add")*60)
)\
.withColumn(
"NEW_TIME",
f.format_string("%02d%02d", f.hour("new_time"), f.minute("new_time")).cast("int")
)
data.select("TIME", "NEW_TIME").show()
#+----+--------+
#|TIME|NEW_TIME|
#+----+--------+
#| 925| 930|
#|2205| 2200|
#|2210| 2215|
#|2242| 2245|
#|2255| 2300|
#+----+--------+