19

I have a data frame in Pyspark. In this data frame I have a column which is of timestamp data type. Now I want to add extra 2 hours for each row of the timestamp column without creating any new columns.

For Example: This is sample data

df

id  testing_time            test_name

1   2017-03-12 03:19:58     Raising
2   2017-03-12 03:21:30     sleeping
3   2017-03-12 03:29:40     walking
4   2017-03-12 03:31:23     talking
5   2017-03-12 04:19:47     eating  
6   2017-03-12 04:33:51     working

I want to have something like below.

df1

id  testing_time            test_name

1   2017-03-12 05:19:58     Raising
2   2017-03-12 05:21:30     sleeping
3   2017-03-12 05:29:40     walking
4   2017-03-12 05:31:23     talking
5   2017-03-12 06:19:47     eating  
6   2017-03-12 06:33:51     working 

How can I do that?

Vzzarr
  • 4,600
  • 2
  • 43
  • 80
User12345
  • 5,180
  • 14
  • 58
  • 105

3 Answers3

29

One approach, that doesn't require explicit casting and uses Spark interval literals (with arguable readability advantages):

df = df.withColumn('testing_time', df.testing_time + F.expr('INTERVAL 2 HOURS'))
df.show()
+---+-------------------+---------+
| id|       testing_time|test_name|
+---+-------------------+---------+
|  1|2017-03-12 05:19:58|  Raising|
|  2|2017-03-12 05:21:30| sleeping|
|  3|2017-03-12 05:29:40|  walking|
|  4|2017-03-12 05:31:23|  talking|
|  5|2017-03-12 06:19:47|   eating|
|  6|2017-03-12 06:33:51|  working|
+---+-------------------+---------+

Or, in full:

import pyspark.sql.functions as F
from datetime import datetime

data = [
  (1, datetime(2017, 3, 12, 3, 19, 58), 'Raising'),
  (2, datetime(2017, 3, 12, 3, 21, 30), 'sleeping'),
  (3, datetime(2017, 3, 12, 3, 29, 40), 'walking'),
  (4, datetime(2017, 3, 12, 3, 31, 23), 'talking'),
  (5, datetime(2017, 3, 12, 4, 19, 47), 'eating'),
  (6, datetime(2017, 3, 12, 4, 33, 51), 'working'),
]

df = sqlContext.createDataFrame(data, ['id', 'testing_time', 'test_name'])
df = df.withColumn('testing_time', df.testing_time + F.expr('INTERVAL 2 HOURS'))
df.show()
+---+-------------------+---------+
| id|       testing_time|test_name|
+---+-------------------+---------+
|  1|2017-03-12 05:19:58|  Raising|
|  2|2017-03-12 05:21:30| sleeping|
|  3|2017-03-12 05:29:40|  walking|
|  4|2017-03-12 05:31:23|  talking|
|  5|2017-03-12 06:19:47|   eating|
|  6|2017-03-12 06:33:51|  working|
+---+-------------------+---------+
eddies
  • 7,113
  • 3
  • 36
  • 39
17

You can convert testing_time column to bigint in seconds using unix_timestamp function, add 2 hours (7200 s) and then cast the result back to timestamp:

import pyspark.sql.functions as F

df.withColumn("testing_time", (F.unix_timestamp("testing_time") + 7200).cast('timestamp')).show()
+---+-------------------+---------+
| id|       testing_time|test_name|
+---+-------------------+---------+
|  1|2017-03-12 05:19:58|  Raising|
|  2|2017-03-12 05:21:30| sleeping|
|  3|2017-03-12 05:29:40|  walking|
|  4|2017-03-12 05:31:23|  talking|
|  5|2017-03-12 06:19:47|   eating|
|  6|2017-03-12 06:33:51|  working|
+---+-------------------+---------+
Psidom
  • 209,562
  • 33
  • 339
  • 356
  • Given that this is of course Pyspark, and thus Python running on top of Java, wouldn't this be slower than using the "INTERVAL" expression above? – Marco Jan 15 '20 at 10:49
2

Based on @Psidom answer,

Since that in my case the column testing_base has quite changeling time-format, instead of using in my case F.unix_timestamp("testing_time", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") which works for records with milliseconds but returns null for records with seconds granularity... I handled it in this way:

import pyspark.sql.functions as F

df.withColumn("testing_time", 
    (F.unix_timestamp(F.col("testing_time").cast("timestamp")) + 7200).cast('timestamp'))

In this way whatever the time-format is for the field testing_time, it is handled by the cast function provided by Pyspark.

Vzzarr
  • 4,600
  • 2
  • 43
  • 80